Sunday, January 29, 2012

Row Count : HBase Aggregation example

With the coprocessors HBase 0.92 introduces a new way to process data directly on a region server. As a user this is definitively a very exciting feature : now you can easily define your own distributed data services.

This post is not intended to help you how to define them (i highly recommend you to watch this presentation if you want to do so) but to quickly presents the new aggregation service shipped with HBase 0.92 that is built upon the endpoint coprocessor framework.

1. Enable AggregationClient coprocessor

You have two choices :

You can enable aggregation coprocessor on all your tables by adding the following lines to hbase-site.xml :
 <property>
   <name>hbase.coprocessor.user.region.classes</name>
   <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
 </property>
or ...you can enable coprocessor only on a table throught the HBase shell :

1. disable the table
hbase> disable 'mytable'

2. add the coprocessor
hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'

3. re-enable the table

hbase> enable 'mytable'


2. Increase RPC timeout (opt)
In some cases the following exception could be raised :
java.net.SocketTimeoutException: Call to node2/55.37.68.154:60020 failed on socket timeout exception
On large table or if you are expecting slow I/O this might be necessary:
 <property>
   <name>hbase.rpc.timeout</name>
   <value>300000</value>
 </property>
n.b. : you can also specify this property directly in the client source code, see next section
3. ''RowCount' Snippet
The following snippet uses the "row count" feature of the aggregation service.

public class MyAggregationClient {

    private static final byte[] TABLE_NAME = Bytes.toBytes("mytable");
    private static final byte[] CF = Bytes.toBytes("d");

    public static void main(String[] args) throws Throwable {

        Configuration customConf = new Configuration();
        customConf.setStrings("hbase.zookeeper.quorum",
                "node0,node1,node2");

        // Increase RPC timeout, in case of a slow computation
        customConf.setLong("hbase.rpc.timeout", 600000);
        // Default is 1, set to a higher value for faster scanner.next(..)
        customConf.setLong("hbase.client.scanner.caching", 1000);
        Configuration configuration = HBaseConfiguration.create(customConf);
        AggregationClient aggregationClient = new AggregationClient(
                configuration);
        Scan scan = new Scan();
        scan.addFamily(CF);
        long rowCount = aggregationClient.rowCount(TABLE_NAME, null, scan);
        System.out.println("row count is " + rowCount);

    }
}

 
4. What else ?

The full list of functions provided by the aggregation client can be found here : http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.html

See also https://issues.apache.org/jira/browse/HBASE-5123 for discussion about forthcoming functions.

Note that this version is designed to work on data whose type is Long. If you plan to use other data types you should provide your own column interpreter. See https://issues.apache.org/jira/browse/HBASE-5122

Credits : thanks to Gary Helmling for his contribution on the HBase mailing list

No comments:

Post a Comment