Java 8 est réactif !

le 03/06/2014 par Philippe Prados
Tags: Software Engineering

Programmation réactive

Parmi les nombreuses évolutions que nous propose Java8, l’une d’entre-elles attire particulièrement notre attention. Il s’agit de la présence de la classe [CompletableFuture<>](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html). Mine de rien, cette classe va bouleverser les applications Java. De nouvelles architectures seront proposées, de nouveaux frameworks vont apparaître pour remplacer les anciens, etc. C’est une classe majeure de Java 8.

La classe Future<> propose de déclencher un traitement en tâche de fond et de récupérer le résultat plus tard, via la méthode get(). Le développeur possède alors une référence vers un résultat futur. Il peut l'interroger pour savoir si le résultat est disponible, ou bien le demander immédiatement car il en a besoin. La méthode get() bloque alors le processus actuel jusqu'à la fin du traitement asynchrone.

C’est justement la grande différence avec la classe CompletableFuture<>. Normalement, seul le thread associé au future peut alimenter le résultat pour débloquer les invocations à get(). Avec la nouvelle classe, il est possible d’alimenter le résultat d'un futur depuis n'importe quel thread. Il est alors possible de réagir à un événement asynchrone pour débloquer un CompletableFuture. Il est également possible d’enregistrer des call-back ou des closures qui seront déclenchées lorsqu’un résultat ou une exception sera disponible.

Pour comprendre comment cela s’organise et s’exécute, imaginons un enchaînement de traitements et de transformations faisant intervenir des API bloquantes. Tout d’abord, proposons un CompletableFuture<Reader> avec le flux d’une page Web. L'objectif est de demander le chargement d'une page Web et d'avoir accès au Reader lorsque la négociation avec le serveur est terminé. Pendant ce temps, le programme peut faire autre chose comme invoquer une base de donnée ou faire des calculs pour produire une page web.

CompletableFuture<> getAsyncURL(URL url) {
   final CompletableFuture<Reader> result = new CompletableFuture<Reader>();
   // Démarre un soft-thread avec un traitement alimentant un CompletableFuture
   Executors.callable(() -> {
      // Executé dans un autre thread
      URLConnection con;
      try {
         con = url.openConnection();
         // Complète le future avec un Reader
         result.complete(new InputStreamReader(con.getInputStream(),
            getEncoding(con)));
      } catch (Exception e) {
         // Signal l'erreur dans le future
         result.completeExceptionally(e);
      }
   });
   return result;
}

Dès la disponibilité du Reader, récupérons le flux en chaîne de caractères. Cela s'effectue également en tâche de fond. Le future est complet dès que la page HTML est intégralement disponible.

CompletableFuture<String> page = getURLAsync(url).thenApply((in) -> {
   try {
      // Transforme le Reader en String
      return IOUtils.toString(in);
   } catch (Exception e) {
      return "";
   }
});
// ... execution d'autres traitements 
// pendant le chargement de la page ...
// puis lecture bloquante de la page
System.out.println(page.get());

Comme la méthode get() est bloquante, la fonction println() n'est pas exécutée tous de suite.

Notez qu'il n’est pas possible de propager l’exception dans thenApply, car la closure s’exécute dans un thread différent du thread principal. C’est une des difficultés avec les différentes méthodes de la classe CompletableFuture<>.

Tant que le pool de thread n'est pas saturé, nous pouvons charger des pages Web en mémoire en tâche de fond. Mais dès sa saturation, il faut attendre le chargement des pages précédentes avant d'en demander de nouvelles.

Maintenant, rendons cela réactif. Nous utilisons l’API AsyncHTTPClient. Nous devons encore utiliser une inner classe avec deux méthodes onCompleted() et onThrowable(). En effet, le framework n'est pas encore compatible avec Java 8. Dans la nouvelle version de la méthode getURLAsync(), nous mappons juste ces méthodes vers leurs équivalents de CompletableFuture<>.

CompletableFuture<Reader> getURLAsync(URL url) throws IOException {
   CompletableFuture<Reader> rc = new CompletableFuture<Reader>();
   AsyncCompletionHandler handler = new AsyncCompletionHandler<Response>() {
      @Override
      public Response onCompleted(Response response) throws Exception {
         rc.complete(new InputStreamReader(response.getResponseBodyAsStream(),
            getEncoding(response.getContentType())));
         return response;
      }

      @Override
      public void onThrowable(Throwable t){
         rc.completeExceptionally(t);
      }
   };
   asyncHttpClient.prepareGet(url.toString()).execute(handler);
   return rc;
}

Et voilà. Notre application est maintenant non bloquante lors de la lecture de la page HTML. Nous n'avons plus besoin de l'instance Executors. L'application peut gérer une multitude de connexions sans avoir à ajouter de soft-threads. En effet, le framework utilise les API asynchrone de Java pour récupérer simultanément toutes les pages demandées.

CompletableFuture<String> page = getURLAsync(url).thenApply((in) -> {
   try {
      return IOUtils.toString(in);
   } catch (Exception e) {
      return "";
   }
});
// ... execution d'autres traitements 
// pendant le chargement de la page ...
// Lecture bloquante de la page
System.out.println(page.get());

Pour garder le coté asynchrone, il est préférable de ne plus utiliser get() mais d'utiliser les call-backs du CompletableFuture<>. C'est le moment de réveiller une servlet asynchrone des spécifications Servlet 3.0.

Cette classe est la porte ouverte aux architectures réactives que nous avons décrites dans d’autres articles. D’autres frameworks proposaient des classes équivalentes. Guava propose [ListenableFuture<>](http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/ListenableFuture.html), Async http client propose [FutureCallback<>](https://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/org/apache/http/concurrent/FutureCallback.html?is-external=true), etc.

La révolution vient de l’intégration de cette classe dans la norme. Ainsi, tous les frameworks vont pouvoir s’appuyer sur cette classe. Il devient possible de composer des traitements complexes asynchrones, avec des données venant de différentes sources (un web service, une base de données, un calcul GPU, etc.).

Nous prédisons qu’une alternative à JDBC sera proposée, s’appuyant sur cette classe. Le dernier maillon pour généraliser les approches réactives dans Java sera alors franchi. Pour les bases de données NoSQL, c’est déjà possible.

CompletableFuture<> propose tout un tas de méthodes pour enchaîner des traitements dès qu’une donnée est disponible. Par défaut, un pool de hard-thread est alors utilisé pour distribuer au mieux les jobs suivant les capacités du ou des processeurs.

Pour synthétiser les différentes méthodes proposées par cette classe et leurs usages, nous vous proposons un tableau à la fin de l'article. À gauche, vous trouverez les données d’entrées, au milieu le nom de la méthode et le paramètre principal à valoriser avec son type. Si une closure est nécessaire, les types de ses paramètres ainsi que le type de retour est indiqué. e représente une exception. Enfin, la dernière colonne indique le résultat de sortie. La plupart des méthodes acceptent d’être suffixées par Async pour exécuter le traitement dans une autre tâche, et un Executors pour utiliser un pool différent de celui par défaut.

Le plus grand problème de la classe CompletableFuture<> est la taille de son nom. Elle a vocation à être utilisée massivement. Un nom plus court aurait été le bienvenu.

EntréeMéthodeParamètre princ.Sortie
Écriture
Valorise un futur avec une valeur.
CF<T>,tcompleteTCF<T>
Valorise un futur avec une exception.
CF<T>,ecompleteExceptionalyeCF<T>
Valorise en tâche de fond via la création d’un CF.
CF.supplyAsync()->TCF<T>
Lecture
Bloque le thread jusqu’à obtention de la valeur ou d’une exception. Attention, bloquant !
CF<T>getT
Bloque le thread jusqu’à obtention de la valeur, d’une exception ou l’expiration d’un délai. Attention, bloquant !
CF<T>getTimeOutdelaisT
Retourne la valeur disponible ou une valeur par défaut.
CF<T>getNowdefaultT
Se synchronise sur l’un d’eux et retourne sa valeur en Object.
CF<T> CF<U> anyOfCF<Object>
Se synchronise sur plusieurs CF. Il faut les consulter individuellement ensuite.
CF<T> CF<U> …allOfCF<Void>
Transformation
Transforme en valeur depuis une valeur ou une exception
CF<T>whenComplete(T,e) -> CF<T>CF<T>
Transforme un résultat en un autre.
CF<T>thenApply(T) -> UCF<U>
Transforme une exception en valeur.
CF<T>exceptionally(e)->TCF<T>
Applique une transformation avec l’un ou l’autre.
CF<T> CF<T>applyEither(T)-> UCF<U>
Combine deux résultats pour en faire un troisième.
CF<T> CF<U>thenCombine(T,U) -> VCF<V>
Transforme une donnée via un traitement retournant un CF.
CF<T>thenComposeT -> CF<U>CF<U>
Terminaison
Exécute après une valorisation ou une exception.
CF<T>handle(T,e) -> {…}
Exécute dès la valeur disponible.
CF<T>thenAccept(T)-> {…}
Exécute après l’un et l’autre.
CF<T> CF<U>thenAcceptBoth(T,U) -> {…}
Exécute après l’un ou l’autre.
CF<T> CF<T>acceptEither(T) -> {…}

Philippe PRADOS et l'équipe "Réactive"