Multitâche sans thread 4/5 – Pipeline/composition


Programmation réactive

Le modèle réactif propose de n’utiliser que des hard-threads (exploitant les cœurs  des processeurs) à la place des soft-threads (simulant des traitements parallèles). Nous avons regardé comment les langages de développement évoluent pour proposer différents modèles permettant de proposer des traitements concurrents, sans pour autant utiliser des soft-threads.

Nous continuons notre exploration avec l’approche que l’on peut appeler pipeline ou composition.

Cette approche est une exploitation astucieuse du modèle de programmation fonctionnel. Avant de commencer, un rappel sur ce modèle de programmation qui se banalise.

La majorité des langages utilisés pour le moment sont des langages impératifs. C’est-à-dire que les instructions décrivent des actions à exécuter immédiatement, entraînant des effets de bord sur les valeurs des variables. Les traitements suivants exploitent alors les nouvelles valeurs pour effectuer de nouvelles transformations et ainsi de suite. L’exécution d’un programme est une succession de modifications de variables.

Ce modèle est assez éloigné du modèle mathématique. En effet, l’égalité en mathématiques est toujours vraie, à tout moment. Dans une démonstration mathématique, toutes les lignes sont vraies simultanément, indépendamment de leur ordre d’apparition. Pour que cela soit toujours vrai, il ne doit jamais y avoir d’effet de bord. Toutes les variables doivent être immuables.

On retrouve cette notion dans la différence entre l’opérateur = et l’opérateur ==. (Le langage Pascal utilise := et =).

Les langages fonctionnels proposent d’utiliser des fonctions mathématiques pour transformer les données. Comme il n’y a pas d’effet de bord, les mêmes entrées donneront toujours le même résultat. Garder le résultat dans une constante, appliquer le traitement en paresseux ou invoquer la fonction à nouveau ne change rien au programme, si ce n’est ses performances.

Concernant les conteneurs, des fonctions de première classe permettent d’appliquer des transformations sur l’ensemble des données pour produire de nouveaux conteneurs et ainsi de suite. Il est alors facile d’enchaîner les transformations pour filtrer, transformer ou synthétiser les données d’un conteneur.

La fonction map() permet d’appliquer un traitement à chaque membre d’un conteneur pour alimenter un nouveau conteneur. flatMap() est un dérivé permettant de produire une map d’un seul niveau à partir d’un arbre de map. Une map de map devient une map plate.

Cela étant posé, nous pouvons introduire l’approche de gestion des hard-threads avec ce modèle. L’idée est d’appliquer les mêmes fonctions de première classe, mais sur des Future. Ainsi, les transformations s’effectuent en parallèle sur des cœurs différents, au fur et à mesure que des données sont disponibles.

Voici un exemple en Scala

1
2
3
4
5
6
7
8
9
10
// Scala
val rateQuote = future {
  conn.getCurrentValue(USD)
}
val purchase: Future[Int] =
  rateQuote map {
    quote => conn.buy(amount, quote)
  } recover {
    case QuoteChangedException() => 0
  }

Un équivalent en Javascript

1
2
3
4
5
6
7
8
9
rateQuote().then(function (quotes) {
  return Q.all(quotes.map(function (quote) {
      return buy(amount, quote);
    }))
    }).then(function(purchases) {
      // all purchases completed
    }, function (error) {
    // got an error
  })

Et un autre en Java8 (qui est réactif)

1
2
3
4
5
6
7
8
9
10
// Java8
CompletableFuture f1 = //...
CompletableFuture f2 =
  f1.thenApply(Integer::parseInt)
    .thenApply(r -> r * r * Math.PI);
CompletableFuture docFuture = //...
CompletableFuture f =
  docFuture.thenApply(this::calculateRelevance);
CompletableFuture relevanceFuture =
  docFuture.thenCompose(this::calculateRelevance);

La méthode thenApply() est équivalente à la méthode map() en Scala et thenCompose() est l’équivalente de la méthode flatMap() de Scala. Des variantes permettent de lancer plusieurs Future et de les attendre tous ou seulement l’un d’entre eux.

Chaque invocation d’une méthode de première classe est réellement déclenchée lorsque le Future est résolu.

La distribution des traitements n’est pas sous le contrôle du développeur. Un pool de hard-threads est utilisé pour exploiter au mieux le parallélisme disponible par la plate-forme, sans pour autant utiliser des soft-threads.

Cette approche permet d’exploiter des traitements parallèles sans effet de bord. Bien entendu, il ne faut pas qu’une transformation soit bloquante, sinon le hard-thread associé est bloqué et les autres traitements ne peuvent être exécutés. Nous publions un outil permettant de détecter ces situations.

Si par malheur, un traitement bloquant doit avoir lieu lors d’une transformation, il est possible de le signaler au pool de hard-threads (ForkJoinThreadPool). En Scala, cela s’effectue simplement.

1
2
3
blocking {
  blockingCall()
}

Cela signale au pool de thread que l’un d’entre eux entre dans une zone pouvant être bloquante. Le pool accepte alors de créer un soft-thread complémentaire le temps de sortir de ce bloc. Le soft-thread en excès sera détruit dès que possible.

En Java classique, il faut utiliser l’interface ManagedBlocker.

La proposition Promise de Javascript permettra de faire la même chose en s’appuyant sur des call-backs.

1
2
3
4
5
6
7
8
9
10
11
12
// javascript
Parse.User.logIn("user", "pass")
  .then(function(user) {
    return query.find()
  })
  .then(function(results) {
    return results[0].save(
      { key: value })
    })
  .then(function(result) {
    // the object was saved.
  });

Ce modèle de développement, à l’aide d’un pool unique de hard-threads, partagé par tout le programme, est une approche de développement qui se généralise. Scala le propose depuis un moment, Java8 ou .NET le proposent également.

Cette approche peut être complétée par des transformations appliquées sur les containers. Avec un modèle de donnée immuable, il est possible de filtrer, de synthétiser ou de transformer un container par morceau, en parallèle. C’est l’objectif des nouvelles API de Java8 comme parallelStream().

1
2
3
4
5
6
double average = roster
    .parallelStream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .mapToInt(Person::getAge)
    .average()
    .getAsDouble();

Pour cela, il faut que les traitements appliqués ne soit pas impératifs (exécuté immédiatement), mais paresseux (juste déclaré). Par exemple, dans l’exemple précédant, filter() ou mapToInt() décrivent des traitements à appliquer en parallèle sur le stream, après l’avoir découpé en tranche et distribué sur différents threads du pool de threads. Chaque thread effectue une partie de la transformation pour permettre à average() de synthétiser les résultats partiels.

Le développeur ne s’occupe plus de la gestion des threads. Il décrit les transformations à appliquer. C’est le framework qui se charge de distribuer au mieux les traitements. Cette distribution est différente suivant les capacités des nœuds, suivant le nombre de cœurs disponibles.

Cette approche ne fonctionne qu’avec des données immuables, garantissant qu’une donnée ne sera pas modifiée par plusieurs threads. Cela explique la vague de l’approche fonctionnelle dans les différents langages de développements : permettre une utilisation simple du multitâche.

Ce modèle peut être étendu pour être appliqué sur plusieurs nœuds d’un cluster. C’est l’approche que propose le framework Apache Spark. Mis à part quelques initialisations, les traitements sont décrits par une suite de transformation sur les conteneurs. Puis les descriptions sont distribuées sur les nœuds du cluster pour être exécuté en parallèle sur de gros volume de donnée. Tous cela s’effectue essentiellement en mémoire.

1
2
3
4
5
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

L’approche « pipeline » est un enchaînement de modifications asynchrones orientées « programmation fonctionnelle ».

Le tableau suivant résume les avantages et inconvénients des quatre premières approches.

Générateur Continuation Coroutine Pipeline
Usages Dans une boucle Pour continuer après un traitement Mise en pause d’un traitement Enchaînement en pipe
Limitations Retourne un Itérateur Local à une fonction Pile / Exception
Boucles
JVM Pile /
Gestion des Exceptions
Points forts Généré Généré Garde la pile d’appel
Compatible exceptions
Élégance
Multi-nœud

Dans le prochain et dernier article de la série, nous verrons comment les langages de développement évoluent plus en profondeur pour permettre une écriture linéaire des traitements, alors que chaque bloc de code peut être exécuté avec un cœur différent.

Philippe PRADOS et l’équipe « Réactive »