Délégation de tâches avec ZeroMQ

le 30/01/2012 par David Rousselie
Tags: Software Engineering

La délégation de tâches en asynchrone est un moyen efficace d'alléger la charge que subissent nos systèmes. En effet, de nombreux cas d'utilisation ne nécessitent pas d'être exécutés de façon synchrone lorsqu'un utilisateur effectue une action ou qu'un événement extérieur intervient.

Par exemple, lorsqu'il n'est pas nécessaire de restituer la dernière version des données et que le traitement avant restitution est coûteux en ressources, il est possible de renvoyer des données préalablement mises en cache et de déporter en asynchrone une tâche de rafraîchissement de ce cache.

Un autre exemple concerne les systèmes qui mêlent des requêtes fortement consommatrices en ressources (CPU, mémoire, ...) et des requêtes peu consommatrices pour lesquelles on va vouloir garantir une latence faible même lorsque des requêtes consommatrices sont en cours de traitement. Pour cela, déléguer les traitements coûteux à des workers via une file de messages peut aider à garantir des temps de réponse faibles pour les autres requêtes. Cette séparation est d'autant plus importante sur des systèmes qui s'appuient sur des technologies monothreadées telles que Ruby ou NodeJS (ce dernier a néanmoins l'avantage de ne pas consommer de ressources/process lorsqu'il effectue des I/O).

Pour résoudre cette problématique de délégation de tâches, on utilise habituellement une solution comme ActiveMQ ou RabbitMQ. Seulement, lorsqu'il est nécessaire :

  • d'être tolérant à la panne
  • d'être scalable que ce soit en ajoutant des publishers (qui soumettent les tâches asynchrones) ou des workers (qui consomment et exécutent ces tâches)
  • de pouvoir intégrer des systèmes hétérogènes s'exécutant sur différentes plateformes (Java, Ruby, NodeJS, C, ...)

aucune de ces deux solutions ne permet de satisfaire l'ensemble de ces critères. Cet article présente une solution possible à base de ZeroMQ, écrite en NodeJS et utilisée pour traiter les deux exemples précédents.

ZeroMQ : Super sockets

ZeroMQ est une bibliothèque réseau écrite en C++ et qui offre des bindings dans de nombreux langages. Elle procure une couche d'abstraction au-dessus des sockets TCP classiques pour ajouter quelques fonctionnalités :

  • une communication par message. Contrairement à TCP ou UDP, un message est reçu dans son intégralité ou n'est pas reçu du tout, jamais partiellement. En effet, lorsqu'on utilise des sockets TCP, pour réceptionner des données, il est nécessaire de déterminer la taille des messages ou de rechercher un délimiteur. ZeroMQ le fait pour nous, il nous renvoie le message dans sa globalité. Les messages ZeroMQ sont composés de frames qui permettent de segmenter les informations passées.
  • la gestion des reconnexions lors d'une coupure.
  • la gestion d'une file de messages au niveau de la socket. Ainsi, lors de la perte d'une connexion, aucun message ne sera perdu, ZeroMQ se chargera de transmettre les messages en file d'attente lorsque la connexion aura été ré-établie. ZeroMQ offre ainsi la possibilité de créer un système de messagerie sans broker en connectant les éléments du système en peer-to-peer. La notion de broker pourra toutefois être réintroduite suivant les besoins.
  • l'abstraction du protocole de transport des messages:
    • TCP pour une communication sur le réseau,
    • PGM pour une communication en multicast sur le réseau,
    • IPC pour une communication inter-process au sein d'un même hôte,
    • INPROC, pour une communication au sein d'un même processus (entre plusieurs threads par exemple).
  • le support de différents patterns de communication suivant l'association des types de sockets ZeroMQ :
    • PUB-SUB : l'association des sockets PUB et SUB permet de distribuer des messages d'un publisher (socket PUB) vers N consommateurs (socket SUB) qui reçoivent tous les mêmes messages,
    • REQ-REP : le pattern classique de requête/réponse (1 publisher, 1 consommateur),
    • PUSH-PULL : la socket de type PUSH distribue les messages en round-robin dans la file de messages des sockets de type PULL connectées : 1 publisher et N consommateurs qui se répartissent la consommation des messages.

Pour utiliser ces patterns de communication, une socket ZeroMQ est capable de se connecter à plusieurs autres sockets ZeroMQ. Le comportement lors de l'envoi d'un message sur une socket ZeroMQ connectée à plusieurs sockets dépendra du type de cette socket.

Délégation de tâches scalable et tolérante à la panne

Bien que ZeroMQ fonctionne sans broker, il peut être nécessaire de réintroduire dans l'architecture un élément de ce type afin de limiter le nombre de connexions réseau. Par exemple, dans une architecture de délégation de tâches, avec 100 publishers et 100 workers, sans broker, il faudra établir 10 000 connexions (100 publishers * 100 workers) qui risquent d'engorger le réseau. Avec 2 brokers (pour supporter la panne de l'un d'entre eux), il faudra 400 connexions (100 publishers * 2 brokers + 100 workers * 2 brokers).

ZeroMQ propose 2 autres types de sockets qui permettent notamment le routage de messages :

  • ROUTER : cette socket ajoute aux messages entrant l'identifiant de la socket ZeroMQ source dans une nouvelle frame qui devient la première frame du message. Ensuite, elle utilise cet identifiant des messages sortants (en supprimant la première frame du message) pour sélectionner la socket destination du message (si elle est connectée bien sûr). En modifiant cette frame, nous sommes capables de personnaliser le routage des messages sortants via cette socket. L'identifiant d'une socket peut être spécifié à la création sinon il sera généré aléatoirement par ZeroMQ.
  • DEALER : cette socket permet de recevoir et envoyer des messages (contrairement à la socket PUSH qui peut seulement en envoyer). L'envoi de messages se faisant en round-robin aux sockets connectées.

Avec ces différents types de sockets nous pouvons les assembler de la façon suivante pour obtenir notre système de délégation de tâches asynchrones avec les caractéristiques suivantes :

  • chaque publisher peut publier une tâche sur n'importe quel broker,
  • chaque worker peut récupérer une tâche et publier le résultat sur n'importe quel broker,
  • les brokers sont capables de router le résultat d'une tâche vers le publisher qui l'a publiée.

Distribution de tâches avec ZeroMQ

NB : Nous utilisons ici 3 brokers, pour éviter que les workers envoient systématiquement le même type de message au même broker (ce qui aurait été le cas avec 2 brokers car le worker envoie alternativement un message READY et un message RESULT).

Cette architecture permet de supporter la perte de n'importe quel composant mais aussi d'ajouter des publishers, des brokers ou des workers suivant la charge de tâches à traiter.

  • Publication des tâches

    En utilisant une socket de type DEALER du côté des publishers, celle-ci étant connectée à tous les brokers, les messages sont envoyés en round-robin (par ZeroMQ) vers les brokers (étapes 1, 2 et 3 à partir des publishers). Si un broker venait à disparaitre, la socket DEALER continuerait à dialoguer avec les brokers restants et lorsque le broker perdu serait de nouveau accessible, ZeroMQ se chargera de se reconnecter sur ce broker qui réintègrera alors le cycle de round-robin d'envoi de messages. Les messages envoyés ne contiennent qu'une seule frame contenant les données de la tâche à exécuter :

    Format du message envoyé par un publisher

    Code du Publisher :

    var broker = zeromq.createSocket('dealer');
    ["tcp://broker1:5555",
     "tcp://broker2:5555",
     "tcp://broker3:5555"].forEach(function(address) {
      broker.connect(address);
    });
    ...
    broker.send("task data");
    
  • Les messages provenant des publishers sont reçus sur une socket ROUTER sur un des brokers. Celle-ci ajoute une frame contenant l'identifiant de la socket DEALER du publisher qui a envoyé le message. Ceci permettra, si le message est envoyé via une socket ROUTER connectée à ce publisher, de router le message vers le bon publisher. Le broker a la charge de gérer une file de messages en mémoire, dans un fichier ou dans une base de données selon les besoins (garantie de ne pas perdre de message, performances, ...). Les messages traités par le broker contiennent donc 2 frames :

    Format du message traité par le broker

    Code du broker :

    var publishers = zeromq.createSocket('router');
    publishers.on('message', function() {
      // Process messages from publishers
      var publisher_id = arguments[0],
          task_data = arguments[1];
    
      // Store in a queue
      ...
    });
    publishers.bindSync("tcp://*:5555");
    
  • Notification d'un worker prêt à traiter une tâche

    Du côté workers, ces derniers se connectent à tous les brokers via une socket DEALER. Ils envoient un message de type READY (en utilisant une frame) pour récupérer une tâche disponible sur les brokers (étapes 1 et 4 à partir des workers). Le type du message est géré dans le code applicatif et non pas par ZeroMQ. Il permet au code applicatif du broker de différencier les messages en provenance des workers :

    • READY pour préciser que le worker est prêt à traiter une tâche,
    • RESULT pour préciser que le message contient le résultat d'une tâche qui devra être transmis au publisher ayant publié la tâche.

    En utilisant une socket DEALER, les messages seront donc alternativement envoyés sur l'un ou l'autre des brokers et la socket gérera la perte d'un broker. Le message envoyé par un worker pour récupérer une tâche ressemble donc à :

    Format du message READY

    Le code du worker :

    var broker = zeromq.createSocket('dealer');
    broker.on('message', function() {...});
    ["tcp://broker1:5555",
     "tcp://broker2:5555",
     "tcp://broker3:5555"].forEach(function(address) {
      broker.connect(address);
    });
    broker.send(READY);
    
  • De la même manière que du côté publisher, un message READY se verra ajouter une frame contenant l'identifiant du worker qui l'a envoyé par la socket ROUTER du broker :

    Format du message READY sur le broker

  • Avec un message contenant une tâche à exécuter (provenant d'un publisher) et d'un message READY (provenant d'un worker disponible), le broker peut constituer un message contenant la tâche en plaçant l'identifiant d'un worker disponible dans la première frame afin que le message soit routé vers ce worker par la socket ROUTER (qui supprimera au passage cette première frame):

    Format du message contenant la tâche et envoyé par le broker au worker

    Le code du broker devient alors :

    var workers = zeromq.createSocket('router');
    workers.on('message', function() {
      // Process messages from workers
      var worker_id = arguments[0],
          message_type = arguments[1]; // READY
    
      if (message_type == READY) {
        // dequeue tasks
        var task, publisher_id = ...;
        workers.send(worker_id, publisher_id, task);
      }
    });
    workers.bindSync("tcp://*:5555");
    
  • Récupération d'une tâche par un worker prêt

    Le worker (celui qui a envoyé un message READY) reçoit alors le message (étape 2 à partir du broker1) suivant contenant la tâche à exécuter :

    Format du message reçu par un worker

    Le handler de réception d'un message sur le worker :

    broker.on('message', function() {
      // Process task messages from broker
      var publisher_id = arguments[0],
          task_data = arguments[1];
      // process task
      ...
    });
    

Cette architecture permet aux workers de renvoyer une réponse au publisher si l'on veut par exemple que le publisher réponde de façon synchrone sans pour autant accaparer le CPU et bloquer les requêtes suivantes (cas de NodeJS ici). Le cheminement inverse s'effectue alors de la façon suivante :

  • Envoi du résultat d'une tâche par un worker

    Le worker envoie un message contenant le résultat via sa socket DEALER vers les brokers (toujours en round-robin et suivant la disponibilité de ces derniers) en utilisant une frame pour spécifier un type de message RESULT (étape 3 à partir des workers) :

    Format d'un message résultat envoyé par un worker

    Code du handler de traitement des messages par un worker :

    broker.on('message', function() {
      // Process task messages from broker
      var publisher_id = arguments[0],
          task_data = arguments[1];
      // process task
      ...
    
      // Send a result to the publisher
      broker.send(RESULT, publisher_id, result);
      // Send READY message to get a new task
      broker.send(READY);
    });
    
  • Un des brokers reçoit le message via sa socket ROUTER qui ajoute une frame avec l'identifiant du worker. Dans ce cas-ci, l'identifiant nous sera inutile.

    Format du message de résultat reçu par le broker

    L'identifiant du publisher à l'origine de la tâche étant toujours présent dans le message, il est possible de lui envoyer le message suivant sur la socket ROUTER sur laquelle sont connectés les publishers. La socket routera alors le message vers le bon publisher :

    Format du message résultat envoyé par le broker au publisher

    Le handler des messages en provenance des workers devient alors :

    workers.on('message', function() {
      // Process messages from workers
      var worker_id = arguments[0],
          message_type = arguments[1], // READY or RESULT
          publisher_id = arguments[2],
          result = arguments[3];
    
      if (message_type == READY) {
        // dequeue tasks
        var task, publisher_id = ...;
        workers.send(worker_id, publisher_id, task);
      } else if (message_type == RESULT) {
        publishers.send(publisher_id, result);
      }
    });
    
  • Transmission du résultat d'une tâche au publisher

    le publisher reçoit alors le message contenant le résultat sur sa socket DEALER (étape 4 à partir du broker2) :

    Format du message de résultat reçu par un publisher

    Le handler de messages d'un publisher :

    broker.on('message', function() {
      // Process messages from broker
      var result = arguments[0];
      ...
    });
    

Conclusion

Comme nous avons pu le constater, ZeroMQ permet d'élaborer une solution adaptée à ses besoins en combinant les différents types de sockets disponibles (d'autres exemples sont accessibles dans le guide ZeroMQ). Cette souplesse vient tout de même au prix d'une certaine complexité puisqu'il faut élaborer son propre protocole (en utilisant les frames des messages ZeroMQ) et traiter quelques sujets qui n'ont pas été abordés dans cet article comme la gestion des timeouts. Par exemple, lorsqu'un broker reçoit avec succès un message READY mais ne peut répondre au worker en raison d'une coupure réseau ou d'un crash du broker, dans ce cas là, sans timeout, le worker pourrait attendre indéfiniment que le broker lui renvoie une tâche à exécuter. ZeroMQ offre aussi de très bons résultats concernant les performances, que ce soit la latence ou le débit de transmission des messages. Ceci en fait une bonne solution pour les systèmes devant traiter de gros volumes de messages (en nombre ou en taille). Il est cependant moins adapté lorsque l'on ne peut pas se permettre de perdre des messages : dans l'exemple de cet article, la file de messages est n'est pas persisté sur le broker et ZeroMQ ne gère pas de transaction; un message envoyé aux brokers n'est donc pas garantir d'atteindre un worker ou un publisher si un broker crash.