Spark summit 2015

spar summit

Tous les slides des conférences seront ici (tous ne sont pas disponibles).

Des nombreuses conférences auxquelles nous avons assisté ressortent plusieurs lignes de forces dans l’écosystème Spark. Avec le Bigdata, gagner un cycle, c’est gagner des heures dans les traitements. Les données doublant tous les 2 ans, il faut retarder le jour où nous ne seront plus en capacité de les traiter. Pour cela, toute optimisation est bonne à prendre.

Dataframe et DataSets

Après avoir consolidé les fondations du framework, il est temps de monter le niveau conceptuel. Dorénavant, le développement se fera bien moins en Scala qu’en Spark SQL.

De nombreux formats sont pris en compte  par ce composant (Avro, CSV, Elastic Search, dBase, HBase, Cassandra, Apache Redshift, memsql, JSON, JDBC, Parquet, Hive, MySQL, PostgreSQL, HDFS, S3, H2, text, ORC).

Le Dataframe prend du galon et devient le format principal de la stack, même s’il sera toujours possible d’avoir des RDD ou des DStream de tous types d’objets.

Pourquoi cela ? Car le moteur Catalyst d’optimisation des Dataframe dans Spark SQL est le centre de toutes les optimisations. Il est très ouvert.

Comment Catalyst optimise ?

Il commence par analyser la requête SQL pour la transformer en arbre syntaxique. L’arbre peut être enrichi de code Scala, utilisant l’API de Spark SQL. C’est l’intégralité de l’arbre de la requête qui sera optimisé. L’utilisation du parseur SQL est strictement équivalente à l’écriture à la main du code.

Puis, différentes phases d’optimisations sont appliquées sur la syntaxe, pour la simplifier au maximum et réécrire la requête plus efficacement. Par exemple, des expressions régulières triviales peuvent être remplacée par des contains(), startWith(), etc.. Ce processus continu tant qu’il y a encore des optimisations appliquées sur l’arbre. Ensuite, l’arbre est transformé en un plan d’exécution logique.

A partir de cette étape, différents plugins permettent d’informer le moteur que les bases de données source de la requête sont plus ou moins capables d’effectuer une partie des traitements comme des jointures ou des filtres. Ainsi, l’arbre est modifié pour descendre autant que possible les traitements complexes vers les bases de données. L’objectif est de remonter le minimum de données dans Spark.

Enfin, plusieurs plans d’exécutions physiques sont produits. Un calcul d’estimation de métrique est alors effectué sur chaque plan, afin de sélectionner le plus efficace, théoriquement.

Le modèle actuel ne permet pas d’indiquer que les bases de données sous-jacentes sont capables d’effectuer des calculs localement plus efficaces (moyenne, min, max, etc). Des propositions d’extensions permettent d’améliorer le moteur pour lui permettre d’être encore plus proche des spécificités des bases de données sous-jacentes.

Enfin, le plan en main, le moteur génère du code Scala pour le distribuer sur les workers Spark. Le code produit est de plus en plus optimisé (Projet Tungsten). Il utilise des structures de données avec tous les champs les uns à coté des autres, pour les placer hors heap. Cela présente plusieurs avantages : les lignes de cache L1 des processeurs possèdent l’intégralité des enregistrements. Il n’est pas nécessaire de parcourir toute la mémoire pour consulter les chaînes de caractères par exemple. De plus, le code produit peut exploiter les offsets des différents enregistrements afin d’éviter de devoir ramener dans le mémoire de la JVM, les enregistrements présents hors heap.

Certains proposent des extensions pour exécuter des traitements directement dans les GPU, après une distribution des différents champs des enregistrements en colonne. Cela permet au GPU d’être plus efficace.

Dataframes typés

Comme les Dataframes ne sont pas typés, il existe maintenant les DataSets. Une transformation immédiate permet de générer des case class (ds.as[Person]). Le mariage entre Spark SQL et le code Scala est devenu trivial.

Spark Streaming n’est pas en reste avec cette évolution. Il est prévu des ponts pour utiliser des DataFrames avec des streams, pour faire du SQL en micro-batch.

L’intégration pour GraphX est également prévue dans un slide, même si aucune conférence n’en fait mention.

On voit bien que le moteur est le centre de toutes les attentions. Le code produit sera probablement toujours plus efficace qu’un code « à la main ».

Demain, ou après demain, des technologies de DRAM comme 3D XPoint (DRAM Latency, SSD capacity, …) permettront peut-être de se passer de la sérialisation, en utilisant un disque SSD directement mappé en mémoire.

Évolution de Scala pour Spark

Spark cherche à sérialiser les closures pour les envoyer vers les Workers. Il y a de nombreux attributs inutiles qui risquent de faire partie de la sérialisation (via le pointeur this de la méthode déclarant la closure). Dans Spark core, une grande partie du code cherche à optimiser les closures afin de couper les références vers des objets non utilisés par les méthodes des closures. Ainsi, la sérialisation des closures est plus petite. Mais ces traitements consomment de la mémoire, et n’est pas toujours efficace. Des erreurs subtiles de sérialisation peuvent alors advenir lors de l’utilisation des Checkpoints. Comment trouver la cause d’une sérialisation de closure impossible ?

Pour détecter ces erreurs subtiles lors de la sérialisation des closures, une macro du langage Scala permettra de vérifier l’isolation d’une closure en vue de sa sérialisation (SIP 21). Avec ce nouveau composant clé, le développeur est obligé de déclarer des val intermédiaires pour les variables à embarquer dans la sérialisation. Sinon, le compilateur indique une erreur. Cela ressemble un peu aux variables final de Java, utilisées dans les inner classes.

Le langage Scala évolue également avec Spark. Des méthodes des conteneurs Spark seront présent dans les conteneurs Scala (cache() par exemple).

Les évolutions de la JVM dans Java8 seront également exploités par les prochaines versions de Scala. Par exemple la génération dynamique des classes des Closures que propose Java8, sera utilisé par Scala, afin de diminuer drastiquement le nombre de classes générées. Des travaux sont également en cours pour utiliser les implémentations par défaut des méthodes de Java8 pour certaines méthodes des traits Scala. Plusieurs avantage : ne pas devoir tous recompiler les classes d’implémentations des traits lors de l’évolution de méthodes ; réduire la taille des classes.

Comme le code Spark doit être envoyé vers les Workers, il est important d’optimiser la taille des Jars. Le projet Dotty Linker permet d’analyser un Jar pour l’optimiser en profondeur (suppression du code mort, transformation en value-class, remplacement des var en val, etc).

Toutes ces évolutions, et bien d’autres à venir (SSD storage, faster in-memory cache, better code generation), permettront d’optimiser les traitements, la consommation mémoire, en réduisant l’impact des traitements n’ayant pas de valeur (sérialisation, GC, etc.) Nous pensons qu’il ne faudra pas plus d’un an avoir de voir apparaître des améliorations notables.

Langages utilisés

Scala reste majoritaire (71%), devant Python (58%) et Java (31%).

L’intégration du langage R, privilégié par les statisticiens, est un autre signe de cette évolution du framework. R étant mono-thread et travaillant tout en mémoire, il ne peut être utilisé simplement dans une architecture Spark. Pour marier ces deux approches, la librairie R se charge de convertir les commandes R en Spark SQL pour les faire exécuter par un Cluster Spark. Ainsi, un utilisateur de R bénéficie d’APIs très proches de ce qu’il utilise habituellement, mais dont l’exécution est déportée.

La même approche sera proposée pour différents langages. Il est maintenant conseillé d’utiliser Python en front-end de Spark SQL. En effet, l’intégration de Python consistant à lancer un processus python dans chaque Worker, avec sérialisation Python/Scala pour chaque job, n’est plus recommandé.

Le moteur Spark SQL est alors vu comme une sorte de base de filtrage/transformation/agrégation présente dans un cluster.

Sécurité

Cloudera propose de sécuriser Spark via l’intégration de Kerberos, HDFS et YARN. C’est très bien avec la stack Hadoop (Hive, HBase, etc) mais pour les autres ? Il est nécessaire d’utiliser des tokens de délégation (Spark 1.4, restricted to HDFS), pour éviter que Kerberos pense avoir à faire une attaque de Déni de service, lorsque tous les workers demandent un token.

Différentes propositions permettent de chiffrer les flux ou les données (Voir SPARK-5682, SPARK-2750, SPARK-6017, SPARK-5682, SPARK-5342).

RecordService permettra d’unifier la sécurité sur Hadoop, en façade des stockages (HDFS, HBase, etc.). Il sera alors possible de vérifier les privilèges accordés à chaque colonne (via Apache Sentry).

YARN ou Mesos ?

Je souhaitais avoir des éléments d’appréciation entre YARN et Mesos. Ce n’est pas évident.

YARN est utilisé par 40% des utilisateurs, contre 11% pour Mesos. Cela ne fait pas un argument technique.

Typesafe propose la stack technique SMACK (Spark, Mesos, Akka, Cassandra, Kafka) qui est utilisé sur des projets à très gros débits.

Nous n’avons pas eu de réponse satisfaisante pour sélectionner un composant où un autre.

Interface Web

Pour rendre accessible ces technologies aux data scientists, nous avons vu de nombreux NoteBooks différents (Apache Zepelin par exemple). Il s’agit de pages Web regroupant des sources de code dans différents langages de programmation (Scala, SQL, shell, etc). Un bouton permet de l’exécuter dans le cluster. Le résultat est alors analysé et présenté sous différentes formes (texte, tableau, graphique, etc.) Certains Notebooks permettent une connexion immédiate à un cluster Spark, d’autres sont capables d’afficher les flux au fur et à mesure de leur exécution, etc.

Pour packager des outils faciles à vendre, les vendeurs de solutions proposent généralement des clickodromes. Des interfaces “graphiques”, des boites à relier, des icônes, y-a-qu’à-cliquer, etc. C’est plus facile à vendre, mais inutile et contre productif. En effet, rien de plus efficace qu’une syntaxe pour exprimer un traitement.

Les Notebooks y répondent parfaitement. (Et quelle efficacité pour les démos !)

Déverminage

La nouvelle console de Spark 1.5 permet d’avoir une vue précise des différentes étapes, d’identifier les Shuffle, etc.

Quelques conseils :

  • Éviter les produits cartésiens ;
  • Utiliser un parallélisme supérieur ou égal au nombre de slots.

Back-pressure

Le back-pressure est une approche permettant de signaler à un producteur, que le consommateur n’est plus en capacité de gérer le flux de messages. Cela permet de demander au producteur de réduire son débit.

Typesafe a ajouté une technologie de back-pressure à Spark 1.5. Ainsi, même après une pause importante, les micro-batchs ne doivent pas traiter des milliards de messages pour rattraper le retard. L’approche utilisée va ajuster le nombre de messages à traiter par rapport au temps constaté dans les micro-batchs précédant. Ainsi, la gestion de la pression est lissé dans le temps, en évitant de crasher la plate-forme.

Quelques conférences ont insisté sur le besoin d’activer le back pressure. Il faut l’ajuster après avoir simulé un crash de machine (maxRate). En effet, il y a un risque d’emballement lorsque les capacités diminuent soudainement.

Bases de données

FiloDB est une base de donnée, qui est implémenté en front-end à Cassandra et utilisant Spark. C’est une base de donnée orientée colonne très rapide pour de l’analytique temps réel.

Apache Drill est une exposition de type “base de données SQL” répartie, s’appuyant sur tous types de sources de données. Seule les lectures sont possibles. Le composant expose des drivers JDBC pour, par exemple, requêter directement dans les logs d’une application. La conférence ne nous a pas convaincus de son utilité, lorsqu’on a Spark SQL. C’est plutôt un concurrent de Spark SQL qu’un composant complémentaire. Quid des performances comparée ? Nous n’avons probablement pas saisie tous les avantages de la solution.

Tachyon, une base en RAM avec backup sur disque, a été présentée par son auteur. Il manque une vue architecture de déploiement pour bien comprendre comment la déployer. C’est une alternative à Hadoop pour la gestion de CheckPoint de Spark Streaming.

Retour d’expériences

Un projet devant gérer 2+ TB par jour, traçant des millions de devices, avec 1 million d’écritures par seconde a été présenté (détection d’attaque informatique). Il mélange Akka stream pour l’asynchronisme et Spark pour l’analyse. Les formats binaires Avro et Protobuf sont indispensables pour éviter l’analyse textuelle à tous les niveaux et bénéficier d’un typage fort. Pour simplifier une Lambda architecture, le projet utilise un join entre le stream et les données statiques. Le résultat est sauvé en base de données, en cache chaud. Ainsi, les douleurs de duplication de code, de réconciliation des résultats des requêtes des architectures Lambda n’existent plus.

stream.transform { event => events.join(staticData) }.saveToCassandra()

Une application de référence (KillrWeather) permet de tester cette approche.

Pour conclure

Spark est un écosystème qui est en évolution rapide. De nombreuses entreprises travaillent à son amélioration, au niveau performance, usage, sécurité, intégration avec d’autres technologies. Gageons que le Spark que nous utilisons aujourd’hui n’a rien avoir avec celui que nous utiliserons dans deux ans.

C’est pourquoi chez OCTO, pour se préparer dès à présent, nous préconisons de se rapprocher autant que possible des Dataframes, afin de pouvoir bénéficier des évolutions et des optimisations à venir de la stack.