Parallélisation, distribution partie 3 : comment tirer parti des processeurs multi-coeurs à travers l'API de concurrence de Java 7 ?

le 24/12/2008 par Marc Bojoly
Tags: Software Engineering

Cet article sera à la fois le troisième de la série sur la parallélisation et la distribution et l'un des articles sur les nouvelles fonctionnalités sur la concurrence introduites par Java 7. Nous allons présenter les limitations des solutions actuelles puis la nouvelle API définie par la jsr166y. La version de prévisualisation 1.7.0-ea-b37 ne contenant pas l'API, je me suis basé sur le framework fj, porté par le responsable de la JSR Doug Lea. Les dernières informations de devoxx confirment par contre que cette fonctionnalité sera bien incluse dans Java 7.

Concurrence : Les threads dans java

Comme le précisait le premier article de la série, la parallélisation devient un enjeu majeur pour mieux tirer parti des architectures multi-coeurs. Mais la notion de thread, partie intégrante du langage java, reste bas niveau et complexe à mettre en oeuvre. Il faut gérer la concurrence d'accès aux données via des locks, comme le permet par exemple le mot clé synchronized.

Ces threads ont d'abord été principalement utilisés dans les clients lourds Swing pour ne pas bloquer l'affichage lors d'appels à des traitements longs. On parlera dans ce cas plus d'asynchronisme que de parallélisation. Cela permet à l'écran de ne pas avoir à attendre la réponse d'une requête en base de données avant de mettre à jour l'affichage d'un bouton. La recherche du parallélisme c'est à dire l'exécution de plusieurs tâches en parallèle afin de les traiter plus rapidement est utilisée principalement côté serveur. Par exemple un serveur web affectera un thread à chaque requête http permettant au serveur de servir plusieurs utilisateurs en parallèle.

Dans les deux cas, il s'agit de développements pointus, dont les bugs peuvent être difficiles à résoudre. Certains outils sont apparus pour répondre à ces difficultés.

Parallélisation à forte granularité avec Java 5

La version 5 de java a introduit le package java.util.concurrency qui sera complété avec Java 7. Ce dernier fournit un framework qui facilite l'utilisation de threads en fournissant des composants de plus haut niveau qui encapsulent les besoins courants. Ce framework java.util.concurrency contient des outils facilitant la gestion de la concurrence comme les classes atomiques (AtomicInteger). Les files et doubles files concurrentes (BlockingQueue et BlockingDequeue par exemple) poursuivent également le même objectif. Ces conteneurs offrent des accès concurrents à des données. Ils constituent des outils très bien optimisés pour les cas courants d'utilisation.

Ce package a également introduit les pools d'exécution via l'interface Executor qui facilite la gestion de l'ensemble des threads de l'application. Prenons un exemple très simple qui montre comment paralléliser plusieurs requêtes :

Executor pool = Executors.newFixedThreadPool(2);
for(int i=0;i<100;i++) {
  pool.execute(new Runnable() {
  public void run() {/*Intensive computing*/}
  });
}//End for

Les instances à exécuter s'accumuleront dans une queue et seront traitées par un thread dès que celui-ci aura terminé son travail courant. Le nombre de threads doit là encore être choisi avec soin en fonction de l'architecture de la machine cible. Le code est donc dépendant de la topologie. Ces outils sont très efficaces pour gérer un certain nombre de tâches en parallèle et constituent aujourd'hui les briques de base de la parallélisation. Par exemple, le serveur Tomcat gère plusieurs requêtes en parallèle en affectant un thread à chacune. Cette fonction est implémentée par un Executor dont l'implémentation standard StandardThreadExecutor utilise en interne ces API de Java.

Ces mécanismes sont particulièrement utiles pour traiter des tâches indépendantes. Ils conviennent pour les serveurs dans lesquels la requête sert de tâche unitaire. Chaque requête HTTP est indépendante - les informations qu'elle contient suffisent à la traiter - ce qui autorise à la voir comme une tâche. Mais comme le précisait le premier article, la loi de Moore est aujourd'hui réalisée, non pas en augmentant la fréquence des processeurs mais le nombre de coeurs. Ce nombre est donc amené à augmenter. Il va rapidement devenir plus important que le nombre de tâches indépendantes à traiter en parallèle. Cela est déjà courant sur les PC où les bi-coeurs sont très faiblement utilisés, mais cela va rapidement devenir le cas sur les serveurs, des machines à 8 coeurs sont courantes en entrée de gamme aujourd'hui. Sur des applications métier, il ne peut y avoir que 60 utilisateurs en ligne et donc quelques requêtes par seconde : les 8 coeurs seront potentiellement sous-utilisés même si chaque requête est coûteuse et mériterait d'être exécutée plus rapidement.

Réaliser certains calculs en parallèle pourrait permettre d'offrir un temps de réponse plus faible aux utilisateurs. Pour mieux tirer parti de la puissance de calcul il faut pouvoir paralléliser des tâches de granularité plus fine que la requête. Il faut mettre en oeuvre d'autres mécanismes, et les nouveaux outils comme ceux introduits dans Java 7 permettent de le réaliser plus facilement.

L'introduction du Fork/Join avec Java 7

Java 7 propose d'introduire un certain nombre de compléments au niveau de la parallélisation : de nouveaux conteneurs (LinkedTransferQueue, ConcurrentReferenceHashMap), de nouveaux outils pour gérer la concurrence d'accès (Phaser etnd Fence) et le concept de fork/join. Ce concept repose sur l'association d'une file d'attente - un pool - de tâches à chaque thread. Le framework gère :

  • un pool de plusieurs threads
  • l'affectation des tâches entre ces threads
  • la synchronisation des accès aux résultats des tâches

Cela permet d'utiliser des traitements à plus faible granularité car le coût de la synchronisation entre tâche est réduit. Diverses optimisations permettent par ailleurs de réaffecter les tâches à d'autres threads qui sont inactifs. Nous allons maintenant en détailler le fonctionnement.

Définir des tâches à plus faible granularité n'est pas immédiat, ni parfois même possible avec tous les algorithmes. Les algorithmes dits "divide and conquere" sont ceux qui s'y prêtent le mieux. Le principe général est le suivant

Si le problème est important
Le décomposer en deux problèmes plus petits
Réappliquer la méthode sur chaque petit problème
Déduire la solution globale à partir des deux solutions
Sinon
Résoudre le problème directement

Les problèmes récursifs répondent parfaitement à cette définition. Certains problèmes portant sur le traitement d'un grand ensemble de données peuvent également utiliser cette technique en appliquant le problème récursivement sur des sous-ensembles des données. Je reprendrai un exemple couramment utilisé en illustration : la recherche du maximum dans un tableau de grande taille. Cela donne :

Pour une taille > LIMITE

Découper le tableau en deux sous-tableaux égaux

Rechercher le maximum dans chacun d'eux

Renvoyer le maximum des maximums

Sinon

Rechercher le maximum en parcourant les éléments du tableau (recherche linéaire)

Ce type de problème peut parfaitement être résolu de manière concurrente avec les outils actuels. Mais de la même façon que l'API java.util.concurrent a apporté une simplicité d'utilisation dans la gestion de la concurrence, le framework fork-join facilite l'implémentation de tels algorithmes.

Une limitation des implémentations d'Executors est qu'une fois une tâche affectée à un thread, elle le sera jusqu'à son achèvement. Même si celle-ci est bloquée et doit attendre, le thread restera lui aussi bloqué. Pour les serveurs d'application où l'attente est liée aux entrées/sorties réseaux, il suffit de définir un nombre de threads supérieur au nombre de coeurs. Les threads bloqués laissent leur place à d'autres traitements de requêtes sur les processeurs car la parallélisation des entrées/sorties est réalisée directement par le système d'exploitation. Mais dans le cas de notre algorithme de recherche chaque thread sera en attente de tous ses fils, qui eux aussi ont besoin de solliciter le processeur. Augmenter le nombre de threads conduirait uniquement à augmenter le nombre d'interruptions et de permutations de thread actif ce qui serait contre-productif.

Par ailleurs, cette décomposition récursive va rapidement conduire à un nombre très important de threads : potentiellement pour un tableau de taille n et une recherche linéaire pour les tableaux de taille inférieure à p, 2(n-p) threads seraient requis. Or créer un thread est consommateur en ressources.

La solution apportée par le fork/join est de séparer totalement la notion de tâche de la notion de thread. De cette façon une tâche peut être mise en attente et le thread correspondant peut effecter d'autres tâches pendant ce temps. Cela permet également de laisser au framework la possibilité d'affecter une nouvelle tâche au même thread ou à un thread différent. Le coût de synchronisation entre deux tâches traitées par un même thread est très faible. Le coût global de synchronisation entre tâches est réduit. Il est désormais possible de créer un grand nombre de tâches de faible granularité afin de faciliter la parallélisation sans être pénaliser par des coûts de synchronisation prohibitifs.

Concrètement prenons le tri d'un tableau de 8 éléments avec une limite pour la décomposition récursive à 2 éléments : l'implémentation sera la suivante

public class FindMaxWithFJ extends RecursiveAction {

 private final int threshold;
 private final FindMaximumProblem problem;
 public int result;
 private final static int ARRAY_SIZE = 50000000;

 public FindMaxWithFJ(final FindMaximumProblem problem, final int threshold) {
   this.problem = problem;
   this.threshold = threshold;
 }

 @Override
 protected void compute() {
            //Implémentation de l'algorithme
 }

 public static void main(String[] args) throws InterruptedException {
   //Instanciation du framework
 }
}

La classe MaxWithFJ est un POJO : sa création est indépendante de la création d'un thread. Sa méthode principale compute implémente l'algorithme que nous avons écrit.

@Override
protected void compute() {

 if(problem.size > threshold) {
   int middle = problem.size / 2;
   FindMaxWithFJ left = new FindMaxWithFJ(problem.subproblem(0, middle), threshold);
   FindMaxWithFJ right = new FindMaxWithFJ(
            problem.subproblem(middle + 1, problem.size), 
            threshold);
   forkJoin(left, right);
   result = Math.max(left.result, right.result);
  }

  else {

   result = problem.solveSequentially();
  }
}

La méthode main() instancie une implémentation de ForkJoinExecutor qui contiendra une réserve de ForkJoinWorkerThreads. Ceux-ci sont des threads possédant une file de tâches à exécuter.

public static void main(String[] args) throws InterruptedException {
 Thread.currentThread().sleep(10000);
 int[] numbers = new int[ARRAY_SIZE];
 Random random = new Random();
 for(int i=0; i < ARRAY_SIZE; i++) {
   numbers[i] = random.nextInt(ARRAY_SIZE);
 }
 System.out.println(numbers);
 System.out.println("begin");

 final int threshold = 5000000;

 final int nThreads = Runtime.getRuntime().availableProcessors();
 ForkJoinExecutor fjPool =  new ForkJoinPool(nThreads);
 long startTime = System.currentTimeMillis();
 for(long i = 0 ; i < 1000; i++) {
   final FindMaximumProblem problem = new FindMaximumProblem(
   numbers, 0, ARRAY_SIZE);
   FindMaxWithFJ mfj = new FindMaxWithFJ(problem, threshold);
   fjPool.invoke(mfj);
   int result = mfj.result;
   System.out.println(result);
 }
long endTime = System.currentTimeMillis();
 System.out.println("Elapsed (ms.) " + (endTime-startTime));
}

Le schéma ci-dessous illustre deux threads avec les pools de tâches et servira de base à l'étude du fonctionnement :

Illustration fork/join 2 threads 5 tâches

La première tâche de recherche sur un tableau à 8 éléments, que je note M[0..7] va être affectée à un thread qui va l'exécuter. La méthode compute() va créer 2 tâches M[0..3] et M [4..7]. Le framework FJ réinsère alors les tâches dans la file. L'appel de la méthode forkjoin() permet l'exécution de deux tâches en parallèle (fork) mais attend la fin de chacune d'elles (join) pour rendre la main au traitement principal. Cela est indispensable pour que la tâche principale calcule le maximum des deux résultats.

Dans ce cas le ForkJoinWorkerThread va prélever en tête de la pile la tâche la plus récente. Dans notre cas l'exécution de M[0..3]. Cela va insérer en tête de la pile M[0..1] et M[2..3]. Le thread va continuer à prélever d'autres tâches tant que les deux tâches à joindre (M[0..3] et M[4..7]) ne sont pas terminées. Le thread exécutera M[0..1] linéairement. M[0..3] attend toujours M[2..3]. Le thread dépile la tâche suivante qui est M[2..3] et l'exécute. M[0..3] peut alors s'achever.

Le choix de la granularité, ici la limite à un tableau à 2 éléments pour arrêter la décomposition, reste un sujet d'optimisation. Cependant, un chiffre raisonnable - quelques centaines d'éléments pour un calcul réel - donne des résultats satisfaisants dans la majorité des cas.

Le framework permet une autre optimisation à travers le vol de tâche. Je dispose d'un processeur bi-coeur et j'ai donc créé deux threads. Le second thread n'ayant pas de tâche dans sa pile il va chercher à prélever une tâche sur le premier thread au niveau de l'arrière de la queue (la tâche la plus ancienne). Dans notre cas il va prélever la tâche M[4..7]. Ce mode de fonctionnement a deux avantages :

  • la probabilité d'avoir une concurrence d'accès entre deux threads est limitée au cas où la queue possède une seule tâche.
  • dans un algorithme de décomposition les tâches les plus anciennes sont à plus forte granuralité que les tâches récentes. En prélevant la tâche M[4..7] le second thread va créer deux tâches M[4..5] et M[6..7] à traiter ce qui parallélisera plus de traitement.

Notons enfin que la classe FindMaxWithFJ a encapsulé toutes les données dont elle avait besoin. Les seules données externes sont extraites d'autres tâches. Le besoin en synchronisation est totalement déporté sur le framework et encapsulé dans l'appel à la méthode forkjoin().

Les lecteurs du précédent article sur les grilles de calcul auront trouvé certains points communs entre le pattern fork/join et le pattern map/reduce. Il s'agit de deux patterns très proches dont la trame sous-jacente est de réaliser en parallèle un même traitement sur plusieurs données. Dans les deux cas, la parallélisation est donc efficace sur des algorithmes qui manipulent un grand nombre de données. Plus que dans le nom utilisé pour le pattern, la principale différence réside dans le fait que FJ ne réalise une parallélisation qu'à l'intérieur d'une même JVM tandis qu'une grille parallélise sur plusieurs machines. La parallélisation sur une grille et la parallélisation à l'intérieur d'un ordinateur diffèrent fondamentalement dans le temps d'accès à la mémoire partagée. Dans le cas d'une grille, toute tâche envoyée sur un noeud, donc une machine distante, nécessite de faire transiter les données par le réseau. Dans le cas d'une grille, les accès aux données devront être optimisés en partitionnant au mieux la donnée envoyée à chaque noeud. Dans le cas du framework FJ, les tâches peuvent partager des données situées dans la mémoire vive, voire une mémoire L3 du processeur multi-coeur. Comme dans notre exemple on cherchera à utiliser au maximum les références si l'algorithme n'a pas besoin de modifier les données sources - ici le tableau à trier - pour ne pas recopier les données. L'accès aux données sera dans les deux cas un point important mais devra être géré différemment.

Quels bénéfices en attendre ?

Ces avancées facilitent donc grandement l'écriture de code parallélisable mais avec des concepts encore assez techniques. En l'état, un tel code doit obligatoirement être encapsulé pour séparer ces responsabilités du code métier et pour pouvoir tester indépendament l'algorithme métier et la logique de parallélisation. Java 7 proposera également cette encapsulation pour un des besoins les plus courants à travers la classe ParallelArray. Celle-ci offre pour des besoins de traitement une encapsulation de l'algorithme de décomposition vu plus haut. Il s'agit d'un tableau de données qu'il est possible de trier, dont on peut extraire le maximum et sélectionner certaines parties en fonction de critères.

Réaliser la parallélisation vue plus haut reviendra donc pour le développeur métier à écrire :

Integer[] ints = //Construct the array
ForkJoinExecutor fje = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
ParallelArray pa = ParallelArray.create(ints.length, Integer.class, fje);
pa.addAll(ints);
int result = pa.max();

On notera que la syntaxe devient presque déclarative, à la manière du SQL on décrit le résultat attendu : la recherche du max, le framework détermine le traitement à réaliser en fonction du nombre de threads disponibles pour y parvenir.

En ce sens ce framework apporte donc une solution pour tirer parti de la puissance processeur disponible avec une complexification maîtrisée du code. Tous les traitements de données en mémoire pourront tirer parti de cette classe. Il pourrait par exemple devenir plus intéressant de trier les données d'un tableau dans la couche métier plutôt que de requêter de nouveau la base.

Notons pour finir que le framework fork/join ne se limite pas au type d'algorithme présenté : il propose outre la classe RecursiveAction, d'autres tâches pour différents types d'algorithmes. La classe LinkedAsyncAction pourra ainsi être utilisée pour parcourir un arbre de façon parallélisée.

Quel avenir pour la parallélisation?

Java a introduit dans le langage la notion de thread qui a servi pour l'asynchronisme. J2EE a, côté serveur, permis d'encapsuler ces threads, en gérant des pools d'EJB traitant des requêtes de façon parallèle. Le framework fork-join permet, au sein d'une seule JVM, d'exécuteur désormais plusieurs tâches de granularité fine en parallèle en tirant parti des architectures multi-processeurs et multi-coeurs.

Il y a donc une vulgarisation des environnements d'exécution parallèle. Cela permettra de réaliser des applications performantes - par exemple de type batch - sans forcément nécessiter une stack applicative complète si la distribution entre machines n'est pas indispensable. C'est un pas important pour développer l'utilisation de la parallélisation à tous les niveaux, ce qui est indispensable pour ne pas sous-exploiter la puissance disponible des architectures matérielles actuelles et futures.

Références

http://cs.oswego.edu/pipermail/concurrency-interest/2008-December/005671.html

http://gee.cs.oswego.edu/dl/classes/index.html?EDU/oswego/cs/dl/util/concurrent/FJTaskRunner.html

http://gee.cs.oswego.edu/dl/concurrency-interest/

http://www.ibm.com/developerworks/java/library/j-jtp11137.html

http://www.ibm.com/developerworks/java/library/j-jtp03048.html

http://www.devoxx.com/display/JV08/Home