Sunday, January 29, 2012

Row Count : HBase Aggregation example

With the coprocessors HBase 0.92 introduces a new way to process data directly on a region server. As a user this is definitively a very exciting feature : now you can easily define your own distributed data services.

This post is not intended to help you how to define them (i highly recommend you to watch this presentation if you want to do so) but to quickly presents the new aggregation service shipped with HBase 0.92 that is built upon the endpoint coprocessor framework.

1. Enable AggregationClient coprocessor

You have two choices :

You can enable aggregation coprocessor on all your tables by adding the following lines to hbase-site.xml :
 <property>
   <name>hbase.coprocessor.user.region.classes</name>
   <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
 </property>
or ...you can enable coprocessor only on a table throught the HBase shell :

1. disable the table
hbase> disable 'mytable'

2. add the coprocessor
hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'

3. re-enable the table

hbase> enable 'mytable'


2. Increase RPC timeout (opt)
In some cases the following exception could be raised :
java.net.SocketTimeoutException: Call to node2/55.37.68.154:60020 failed on socket timeout exception
On large table or if you are expecting slow I/O this might be necessary:
 <property>
   <name>hbase.rpc.timeout</name>
   <value>300000</value>
 </property>
n.b. : you can also specify this property directly in the client source code, see next section
3. ''RowCount' Snippet
The following snippet uses the "row count" feature of the aggregation service.

public class MyAggregationClient {

    private static final byte[] TABLE_NAME = Bytes.toBytes("mytable");
    private static final byte[] CF = Bytes.toBytes("d");

    public static void main(String[] args) throws Throwable {

        Configuration customConf = new Configuration();
        customConf.setStrings("hbase.zookeeper.quorum",
                "node0,node1,node2");

        // Increase RPC timeout, in case of a slow computation
        customConf.setLong("hbase.rpc.timeout", 600000);
        // Default is 1, set to a higher value for faster scanner.next(..)
        customConf.setLong("hbase.client.scanner.caching", 1000);
        Configuration configuration = HBaseConfiguration.create(customConf);
        AggregationClient aggregationClient = new AggregationClient(
                configuration);
        Scan scan = new Scan();
        scan.addFamily(CF);
        long rowCount = aggregationClient.rowCount(TABLE_NAME, null, scan);
        System.out.println("row count is " + rowCount);

    }
}

 
4. What else ?

The full list of functions provided by the aggregation client can be found here : http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.html

See also https://issues.apache.org/jira/browse/HBASE-5123 for discussion about forthcoming functions.

Note that this version is designed to work on data whose type is Long. If you plan to use other data types you should provide your own column interpreter. See https://issues.apache.org/jira/browse/HBASE-5122

Credits : thanks to Gary Helmling for his contribution on the HBase mailing list

Tuesday, January 24, 2012

Sortie de HBase 0.92

Après 4 "release candidate" et plus d'un an de développement la version 0.92.0 de HBase succède à la branche 0.90.x.Malgré le faible écart entre les deux numéros de version le travail accompli pour aboutir à cette version a été très important.

Les nouveautés majeures :

Implémentation des co-processeurs

Les coprocesseurs permettent de réaliser des traitements directement au sein des JVM des serveurs qui gèrent les données.
Deux types de coprocesseurs sont disponibles :
  • Les “observers” qui permettent le déclenchement d’actions sur des évènements affectant les données ou les tables. Les observers peuvent être chainés, executés par ordre de priorité et poser un veto sur une opération.
  • Les “endpoints” quant à eux permettent d’étendre les services RPC par des procédures personnalisées. Livrés avec cette version on trouvera ainsi des services d'agrégation permettant, entre autres, de déterminer les valeurs minimales ou maximales des données ou encore de réaliser des opérations arithmétiques de base.

HFile V2

HFile désigne le format des fichiers utilisé par HBase pour stocker les données sur le système de fichier. Cette contribution est le fruit du travail des ingénieurs de Facebook et permet d’améliorer l’utilisation de la mémoire faite pour les écrire et y accéder. Il permet d’accélérer l’ouverture des fichiers et donc de diminuer le temps d’indisponibilité des données qui y sont stockées lors du démarrage du cluster ou suite à la défaillance d’un noeud. (cette optimisation sera d'autant plus notable si vous utilisez les Bloom Filters)
A noter qu'aucune migration n'est nécessaire, HBase 0.92 sait lire les fichier V1 et les convertira en V2 à la première "compaction".

Distributed log splitting

Lors d’un redémarrage du cluster ou en cas de défaillance d’un nœud la prise en charge des données en instance d’intégration est distribuée sur l’ensemble des nœuds du cluster encore disponibles, la coordination étant assurée par Zookeeper. Cette action dans les versions précédentes était principalement effectuée par un seul serveur.

Intégration de la sécurité

Cette version apporte des mécanismes de sécurité basés sur ceux offert par Hadoop (authentification via Kerberos, isolation des données sur HDFS....) On trouve donc des fonctionnalités telles que :
  • Des appels RPC clients sécurisés
  • La mise oeuvre de listes de contrôle d’accès (ACL) sur les tables et les colonnes
  • Une connexion sécurisé au quorum ZooKeeper
Correctifs et optimisations diverses

Parmi les améliorations et les nombreux correctifs on notera aussi l’activation par défaut du mécanisme de gestion de la mémoire MemStore-LAB, ce dernier permet de réduire la fragmentation mémoire de la JVM et donc de diminuer le travail du garbage collector.
L’IHM permet quant à elle de visualiser en plus des informations habituelles (statistiques des noeuds, distribution des données...) les opérations en cours sur le cluster (split, compaction....)



Voir la liste des changements