Thursday, November 21, 2013

Weave devient Apache Twill et entre en incubation



Ceux qui s'intéressent à YARN, le nouveau gestionnaire de ressources d'Hadoop, savent que son potentiel est énorme pour ceux qui font du BigData : spécifiez les ressources nécessaires à votre programme distribué (CPU, mémoires) et YARN se charge de trouver les nœuds de votre cluster possédant les ressources disponibles pour l’exécuter. Le tout bien entendu sur les pétaoctets hébergés par le système de fichier distribué d'Hadoop : HDFS.
Emporté par la dynamique Hadoop YARN est en train de devenir le socle de nombreux projets de traitement de gros volumes données : on retrouve le traditionnel Map/Reduce mais aussi Storm porté par Yahoo ! ou Stinger d'Hortonworks pour faire du SQL à (très) grande échelle

Cependant écrire un programme qui exploite les capacités de YARN n'est pas une sinécure, on se retrouve vite à copier / coller l'exemple du DistributedShell, à refaire les mêmes choses et à retomber dans les mêmes problématiques ... bref inutile d'être un génie pour comprendre qu'il y avait matière à factoriser.

Il y avait Kitten de chez Cloudera, voici celle de Continuuity, Weave, jusqu'ici hébergée chez Github qui vient d'être acceptée par la fondation Apache pour incubation. Au passage il change de nom et devient Twill histoire de ne pas le confondre avec Apache Wave (plateforme de travail collaborative, pas grand chose à voir avec le BigData)
L'idée derrière Weave Twill est donc bien de simplifier le développement d'applications distribuées qui reposent sur YARN en offrant par exemple une interface WeaveRunnable tel que le fait le JDK avec l'interface Runnable sauf qu'il ne s'agit plus de fonctionner au sein d'un pool de thread mais de passer à l'échelle supérieure : le pool de serveurs (physique).
Weave tente aussi de simplifier la gestion de l'application comme par exemple la gestion des logs qui est toujours délicate lorsque l'on a affaire à des applications qui s’exécutent sur plusieurs machines.

Si vous avez commencé à jouer avec YARN jetez un coup d’œil à Weave.

Les alternatives potentielles :
Kitten
Spring YARN

Friday, December 28, 2012

Zookeeper, Netflix Curator and ACLs

If you have one or more Zookeeper "multi-tenant" clusters you may want to protect znodes against unwanted modifications.
Here is a very simple and short introduction to the ACL and custom authentication features.
This post is not intended to give you best practices about security and Zookeeper, the only goal is to give you a complete example of a custom authentication handler.

Complete source code with JUnit test is available here :
https://github.com/barkbay/zookeeper-acl-sample/

Use case

Let say that your Zookeeper cluster is used by several users. In order to restrict user actions you have decided that each user must prefix all paths with the first letter of his name.
  • User foo is only allowed to create, read, delete and update znodes under the /f znode.
  • User bar is only allowed to create, read, delete and update znodes under the /b znode.

Get client authentication data on the server side

Zookeeper client authentication can be easily customized , all you have to do is to create a class that extends org.apache.zookeeper.server.auth.AuthenticationProvider :
Once it is done you must tell Zookeeper server to use it, this can be done by setting the Java system property zookeeper.authProvider.x (where x is an integer)
You can do this in a startup script but we want to use a JUnit test :

Let Zookeeper client automatically set ACLs with Curator

Curator helps you to automatically set right ACLs on znodes. This can be done providing an ACLProvider implementation :
Register you ACL provider using the Curator client builder :
Add client authentication data using the previous builder : Complete JUnit test with a Zookeeper testing server using the Curator framework is available here

Tuesday, September 18, 2012

Zookeeper watchers and events, a (very) short summary

Here is a summary of what events are expected if you set ChildrenWatcher, DataWatcher or ExistsWatcher on a znode :

WARNING : none means that no event is triggered, do no confuse with EventType.None

Do not forget this basic rule : Watches are one time trigger, once a watch has been triggered you must register it once again if you want to be notified. Moreover due to network latency you can miss one or several changes that happen just before the watch is re-registered.

An expired zookeeper session also means that all your watches are gone, you have to create them once again.

More informations here.

Hope it helps...

Wednesday, June 20, 2012

Orientée colonnes ?

Les bases NoSQL sont arrivées avec leur cortège de nouveautés et pour certaines d'entre elles une notion héritée de BigTable : celle de base de donnée orientée colonne.

Cependant faire le lien entre l'article de Wikipedia et comprendre ce que permet réellement un base de donnée comme HBase n'est pas une chose évidente. En effet le simple fait de définir cette notion ne suffit pas toujours a bien comprendre quels sont les principes de conception du monde SQL qui peuvent être oubliés et ceux qui doivent être appris.

Colonne or not colonne ?

Prenons un modèle très simple de donnée et essayons de le transposer dans un modèle "orienté colonne":
Comme on peut le voir on est passé d'un modèle à 2 dimensions (ligne x colonne) vers un modèle où une valeur est accédée au travers de 2 coordonnées qui sont ici (ligne, colonne) Cette notion de coordonnées est  importante (c'est pour ça que je la met en gras 2 fois de suite) si l'on veut concevoir une gestion des données avec une base orientées colonnes. Plus vite vous intégrerez le fait qu'une colonne n'est qu'un élément d'une coordonnée et que vous pouvez y mettre ce que bon vous semble (même de la donnée !) mieux vous vous porterez. Notez que la clé primaire dans la colonne Id est devenue ce que l'on appelle une row dans l'API de HBase. Cette appellation dans l'API est un peu déroutante on parlera plus souvent de row key.

Avantages :
  • L'ajout de données se fait sur une seule dimension ce qui est techniquement plus simple et plus rapide : les données sont concaténées les unes à la suite des autres, HBase supportent ainsi des débits en écritures bien plus élevés que ses consœurs avec des temps de latence très réduits.
  • On devient à la mode en étant "scalable" (utilisez votre meilleur accent anglais pour faire bien) :  comme le développement des données ne se fait que sur une seule dimension leurs partitionnements est plus simple à réaliser et on peut les distribuer sur plusieurs serveurs. (dans le cas de HBase on va appeler ça des Regions)
  • Ajouter une colonne devient trivial car il s'agit au final seulement d'ajouter un nouveau tuple, on peut même se permettre le luxe (ou l'horreur, c'est selon) de n'ajouter qu'une seule colonne à une ligne en particulier : là où certaines bases de données vont réserver de la place pour toutes les autres lignes cela ne coutera rien d'autre que la valeur et ses coordonnées avec HBase.
Inconvénient :
  • La notion de "coordonnées" suppose que ces dernières permettent de retrouver de manière univoque la donnée, comme on peut le voir dans l'exemple la colonne appelée Id est une clé primaire, il s'agit d'une pratique fréquente dans la conception des schéma de base. C'est aussi le cas dans une base orientée colonne mais ici elle revêt un rôle de tout premier ordre puisque que la clé primaire est obligatoire et qu'elle sera le moyen le plus efficace de retrouver la donnée. Autrement dit : il y a une seul clé, seul champ véritablement indexé, les recherches sur les autres colonnes seront moins efficaces. (attention aux bases NoSQL qui vous promettent des indexes secondaires, lisez bien les petites lignes du contrat)
  • Si ce type de base de données est très efficace en écriture elle l'est moins en lecture, non seulement on se retrouve avec une seule clé mais on voit aussi assez vite que si l'on veut être performant et pouvoir distribuer les données il va falloir mettre un peu d'ordre en les triant et user d'outil tels que les filtres de M. Bloom afin d'optimiser les recherches.
  • Une mise à jour n'écrase pas directement l'ancienne valeur, il faut donc prévoir à un moment ou un autre un processus appelé "compaction" pour réellement supprimer les données. Noter que les suppressions de données se font exactement comme les mises à jour, on place juste ce que l'on appelle une pierre tombale ("tombstone") à la place de la donnée proprement dite.

Comment est-ce qu'HBase implémente ce principe ?

En réalité HBase ne va pas se compliquer la vie avec les notions de "ligne" et de "colonne" telles qu'on peut l'entendre dans le monde relationnel. Si vous avez bien compris le point précédent une donnée devient accessible grâce à des coordonnées. HBase stocke systématiquement une donnée avec l'ensemble de ses coordonnées. Avec ce modèle HBase n'est plus limité à un espace à 2 dimensions et elle en profite pour ajouter une notion de famille de colonne et un horodatage qui va permettre de gérer plusieurs versions d'une donnée :
La position d'une donnée dans HBase peut donc être décrite par les coordonnées :
(row, column family, column qualifier, timestamp)

L'objet qui stocke à la fois les coordonnées et la donnée est org.apache.hadoop.hbase.KeyValue. (cependant comme vous pourrez le voir l'ensemble est sérialisé dans un tableau de byte ce n'est donc pas très palpitant à regarder). Les milliards de KeyValue qui représentent vos données sont stockés dans un type de fichier propre à HBase appelé HFile (dont la version 2 est une contribution de Facebook). Ces fichiers sont eux même stockées par le système de fichier sous-jacent (en règle générale il s'agit de HDFS qui assure la réplication des données).

Est-ce que cela veut dire qu'une partie des coordonnées est dupliquée pour chaque nouvelle donnée insérée ? Oui, il s'agit là d'un des défaut de cette implémentation, certains éléments comme la ligne ou le nom de la famille de colonne sont dupliquées. Si la clé de la ligne fait 20 bits (c'est un exemple) et qu'elle possède 11 "colonnes" alors elle sera dupliquée d'autant (dans cet exemple on peut grossièrement quantifier cette "surcharge" à 200 bits). Moralité : essayez de contenir la longueur des lignes et des noms de famille de colonne. Vous pouvez activer le Data Block Encoding arrivé avec HBase 0.94 qui pallie ce "problème" et activer une compression comme Snappy ou LZO.

Autre remarque très importante : les KeyValue sont toujours stockées dans l'ordre au sein d'un HFile, il est donc très performant de lire plusieurs KeyValue contiguës. Le format HFile dispose de nombreux artifices (regroupement en bloc, indexation, filtre de Bloom, compression...) afin d'optimiser les accès en lecture mais comme ce n'est pas le sujet de ce post je ne m'étendrai pas dessus.

Comment exploiter ce principe lors de la conception d'un schéma ?

Un exemple valant mieux qu'un long discours je vais prendre celui de l’excellent OpenTSDB.

Qu'est ce qu'OpenTSDB ?

OpenTSDB est un logiciel qui permet de tracer des métriques au fil du temps :
  • Le type de métrique en lui même importe peu si l'on est capable de lui attribuer une valeur numérique (e.g. : la mémoire disponible sur une machine sera exprimée sous la forme d'un entier) et un nom (e.g. free.memory)
  • Vous pouvez associer une ou plusieurs étiquettes (tag) à un métrique (e.g. le nom d’hôte, sa localisation géographique...).
  • La précision peut descendre à la seconde.
 L'originalité d'OpenTSDB est d'utiliser HBase pour conserver les métriques collectés, le volume de données qui peut être ingurgité par le système est donc très important, pour paraphraser le site officiel : "collect many thousands of metrics from thousands of hosts and applications".

Le schema de OpenTSDB

  1. Ligne (row) : Elles sont la concaténation du métrique, de la date arrondie à l'heure et du (des) tag(s). Dit autrement une ligne va contenir toutes les données collectées dans un intervalle d'une heure pour un métrique et un ou plusieurs tags donnés. Notez que la date n'est placée qu'en deuxième position afin d'éviter qu'à un instance t toutes les requêtes en écritures arrivent sur la même Region
  2. La nom de la famille de colonne (column family) : il est fixé à 't' , plus le nom de la famille de colonne est petit mieux c'est.
  3. Les noms de colonnes  (qualifiers)  : le nom de chaque colonne est le décalage en secondes entre la date arrondie et la date effective de capture du métrique
Sur un exemple simple voici de manière un peu simplifié comment OpenTSDB stocke les données dans HBase :


L'originalité de ce schéma est d'utiliser le nom de la colonne pour y stocker de l'information, il s'agit là d'une manière très élégante d'utiliser les capacités d'une base de données orientée colonnes. Encore une fois il est intéressant de raisonner en terme de coordonnées plutôt qu'en terme de colonnes.
Vous trouverez une description plus complète et précise du schéma ici : http://opentsdb.net/schema.html

Par où continuer ?

Je n'ai volontairement pas abordé certains points comme celui des familles de colonnes, de la distribution des lignes sur les nœuds qui compose un cluster HBase, etc.... Si vous souhaitez aller plus loin avec Hbase il y a le livre "HBase : The Definitive Guide" de Lars George. Lars a aussi dirigé une conférence web dédiée à cette problématique, elle est disponible sur Youtube : HBase Schema Design - Things you need to know
Enfin en ligne le HBase reference Guide est aussi un bon point de départ.

A noter SalesForce qui a réalisé une très bonne présentation sur le sujet de la conception de schéma lors de la HBaseCon 2012 : la vidéo est ici et la présentation est là : 

Sunday, June 17, 2012

HBase 0.96 + Eclipse + Maven

Since HBase-4336 (and HBase 0.96) the HBase source code has been split into multiple maven modules.
The post is no more related to a specific operating system, you can follow these steps on Linux or Windows.

0. Requirements

1. Checkout sources

Use your favorite Subversion client to checkout the HBase source code :

$ svn checkout http://svn.apache.org/repos/asf/hbase/trunk hbase

 (check http://hbase.apache.org/source-repository.html for more details)



2. Install M2Eclipse plugin

  • Select the menu : Help / "Install New Software"
  • In the 'Work with' field type : http://download.eclipse.org/technology/m2e/releases (press Enter)
  • Select m2e - Maven Integration for Eclipse

3. Import HBase source code

File - Import... - Maven / Existing Maven Projects and select the directory where sources have been checked out at step 1 :


Some java sources need to be generated, right click on  the hbase project, Run As and select "Maven generate-sources" :

4. Create Run configuration

 Create a new run configuration, name it 'HBase (start)', slect the hbase-server project and set org.apache.hadoop.hbase.master.HMaster as the main class :


In the Arguments  tab add the program arguments start :

Give it a try, click on the Run button :


You can also try the HBase web interface http://localhost:60010 :


5. Create HBase Shell Run configuration

Create a new Run configuration, set the Name to Shell, and select org.jruby.Main as the main class :
 In the Arguments tab :
  1. Add the path to the bin/hirb.rb file as the program argument
  2. Set the Java variable hbase.ruby.sources to the path src/main/ruby path (e.g. -Dhbase.ruby.sources=D:\HBASE\hbase-trunk\hbase-server\src\main\ruby)

Sunday, January 29, 2012

Row Count : HBase Aggregation example

With the coprocessors HBase 0.92 introduces a new way to process data directly on a region server. As a user this is definitively a very exciting feature : now you can easily define your own distributed data services.

This post is not intended to help you how to define them (i highly recommend you to watch this presentation if you want to do so) but to quickly presents the new aggregation service shipped with HBase 0.92 that is built upon the endpoint coprocessor framework.

1. Enable AggregationClient coprocessor

You have two choices :

You can enable aggregation coprocessor on all your tables by adding the following lines to hbase-site.xml :
 <property>
   <name>hbase.coprocessor.user.region.classes</name>
   <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
 </property>
or ...you can enable coprocessor only on a table throught the HBase shell :

1. disable the table
hbase> disable 'mytable'

2. add the coprocessor
hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'

3. re-enable the table

hbase> enable 'mytable'


2. Increase RPC timeout (opt)
In some cases the following exception could be raised :
java.net.SocketTimeoutException: Call to node2/55.37.68.154:60020 failed on socket timeout exception
On large table or if you are expecting slow I/O this might be necessary:
 <property>
   <name>hbase.rpc.timeout</name>
   <value>300000</value>
 </property>
n.b. : you can also specify this property directly in the client source code, see next section
3. ''RowCount' Snippet
The following snippet uses the "row count" feature of the aggregation service.

public class MyAggregationClient {

    private static final byte[] TABLE_NAME = Bytes.toBytes("mytable");
    private static final byte[] CF = Bytes.toBytes("d");

    public static void main(String[] args) throws Throwable {

        Configuration customConf = new Configuration();
        customConf.setStrings("hbase.zookeeper.quorum",
                "node0,node1,node2");

        // Increase RPC timeout, in case of a slow computation
        customConf.setLong("hbase.rpc.timeout", 600000);
        // Default is 1, set to a higher value for faster scanner.next(..)
        customConf.setLong("hbase.client.scanner.caching", 1000);
        Configuration configuration = HBaseConfiguration.create(customConf);
        AggregationClient aggregationClient = new AggregationClient(
                configuration);
        Scan scan = new Scan();
        scan.addFamily(CF);
        long rowCount = aggregationClient.rowCount(TABLE_NAME, null, scan);
        System.out.println("row count is " + rowCount);

    }
}

 
4. What else ?

The full list of functions provided by the aggregation client can be found here : http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.html

See also https://issues.apache.org/jira/browse/HBASE-5123 for discussion about forthcoming functions.

Note that this version is designed to work on data whose type is Long. If you plan to use other data types you should provide your own column interpreter. See https://issues.apache.org/jira/browse/HBASE-5122

Credits : thanks to Gary Helmling for his contribution on the HBase mailing list

Tuesday, January 24, 2012

Sortie de HBase 0.92

Après 4 "release candidate" et plus d'un an de développement la version 0.92.0 de HBase succède à la branche 0.90.x.Malgré le faible écart entre les deux numéros de version le travail accompli pour aboutir à cette version a été très important.

Les nouveautés majeures :

Implémentation des co-processeurs

Les coprocesseurs permettent de réaliser des traitements directement au sein des JVM des serveurs qui gèrent les données.
Deux types de coprocesseurs sont disponibles :
  • Les “observers” qui permettent le déclenchement d’actions sur des évènements affectant les données ou les tables. Les observers peuvent être chainés, executés par ordre de priorité et poser un veto sur une opération.
  • Les “endpoints” quant à eux permettent d’étendre les services RPC par des procédures personnalisées. Livrés avec cette version on trouvera ainsi des services d'agrégation permettant, entre autres, de déterminer les valeurs minimales ou maximales des données ou encore de réaliser des opérations arithmétiques de base.

HFile V2

HFile désigne le format des fichiers utilisé par HBase pour stocker les données sur le système de fichier. Cette contribution est le fruit du travail des ingénieurs de Facebook et permet d’améliorer l’utilisation de la mémoire faite pour les écrire et y accéder. Il permet d’accélérer l’ouverture des fichiers et donc de diminuer le temps d’indisponibilité des données qui y sont stockées lors du démarrage du cluster ou suite à la défaillance d’un noeud. (cette optimisation sera d'autant plus notable si vous utilisez les Bloom Filters)
A noter qu'aucune migration n'est nécessaire, HBase 0.92 sait lire les fichier V1 et les convertira en V2 à la première "compaction".

Distributed log splitting

Lors d’un redémarrage du cluster ou en cas de défaillance d’un nœud la prise en charge des données en instance d’intégration est distribuée sur l’ensemble des nœuds du cluster encore disponibles, la coordination étant assurée par Zookeeper. Cette action dans les versions précédentes était principalement effectuée par un seul serveur.

Intégration de la sécurité

Cette version apporte des mécanismes de sécurité basés sur ceux offert par Hadoop (authentification via Kerberos, isolation des données sur HDFS....) On trouve donc des fonctionnalités telles que :
  • Des appels RPC clients sécurisés
  • La mise oeuvre de listes de contrôle d’accès (ACL) sur les tables et les colonnes
  • Une connexion sécurisé au quorum ZooKeeper
Correctifs et optimisations diverses

Parmi les améliorations et les nombreux correctifs on notera aussi l’activation par défaut du mécanisme de gestion de la mémoire MemStore-LAB, ce dernier permet de réduire la fragmentation mémoire de la JVM et donc de diminuer le travail du garbage collector.
L’IHM permet quant à elle de visualiser en plus des informations habituelles (statistiques des noeuds, distribution des données...) les opérations en cours sur le cluster (split, compaction....)



Voir la liste des changements