Délégation de tâches avec ZeroMQ

La délégation de tâches en asynchrone est un moyen efficace d’alléger la charge que subissent nos systèmes. En effet, de nombreux cas d’utilisation ne nécessitent pas d’être exécutés de façon synchrone lorsqu’un utilisateur effectue une action ou qu’un événement extérieur intervient.

Par exemple, lorsqu’il n’est pas nécessaire de restituer la dernière version des données et que le traitement avant restitution est coûteux en ressources, il est possible de renvoyer des données préalablement mises en cache et de déporter en asynchrone une tâche de rafraîchissement de ce cache.

Un autre exemple concerne les systèmes qui mêlent des requêtes fortement consommatrices en ressources (CPU, mémoire, …) et des requêtes peu consommatrices pour lesquelles on va vouloir garantir une latence faible même lorsque des requêtes consommatrices sont en cours de traitement. Pour cela, déléguer les traitements coûteux à des workers via une file de messages peut aider à garantir des temps de réponse faibles pour les autres requêtes. Cette séparation est d’autant plus importante sur des systèmes qui s’appuient sur des technologies monothreadées telles que Ruby ou NodeJS (ce dernier a néanmoins l’avantage de ne pas consommer de ressources/process lorsqu’il effectue des I/O).

Pour résoudre cette problématique de délégation de tâches, on utilise habituellement une solution comme ActiveMQ ou RabbitMQ. Seulement, lorsqu’il est nécessaire :

  • d’être tolérant à la panne
  • d’être scalable que ce soit en ajoutant des publishers (qui soumettent les tâches asynchrones) ou des workers (qui consomment et exécutent ces tâches)
  • de pouvoir intégrer des systèmes hétérogènes s’exécutant sur différentes plateformes (Java, Ruby, NodeJS, C, …)

aucune de ces deux solutions ne permet de satisfaire l’ensemble de ces critères. Cet article présente une solution possible à base de ZeroMQ, écrite en NodeJS et utilisée pour traiter les deux exemples précédents.

ZeroMQ : Super sockets

ZeroMQ est une bibliothèque réseau écrite en C++ et qui offre des bindings dans de nombreux langages. Elle procure une couche d’abstraction au-dessus des sockets TCP classiques pour ajouter quelques fonctionnalités :

  • une communication par message. Contrairement à TCP ou UDP, un message est reçu dans son intégralité ou n’est pas reçu du tout, jamais partiellement. En effet, lorsqu’on utilise des sockets TCP, pour réceptionner des données, il est nécessaire de déterminer la taille des messages ou de rechercher un délimiteur. ZeroMQ le fait pour nous, il nous renvoie le message dans sa globalité. Les messages ZeroMQ sont composés de frames qui permettent de segmenter les informations passées.
  • la gestion des reconnexions lors d’une coupure.
  • la gestion d’une file de messages au niveau de la socket. Ainsi, lors de la perte d’une connexion, aucun message ne sera perdu, ZeroMQ se chargera de transmettre les messages en file d’attente lorsque la connexion aura été ré-établie. ZeroMQ offre ainsi la possibilité de créer un système de messagerie sans broker en connectant les éléments du système en peer-to-peer. La notion de broker pourra toutefois être réintroduite suivant les besoins.
  • l’abstraction du protocole de transport des messages:
    • TCP pour une communication sur le réseau,
    • PGM pour une communication en multicast sur le réseau,
    • IPC pour une communication inter-process au sein d’un même hôte,
    • INPROC, pour une communication au sein d’un même processus (entre plusieurs threads par exemple).
  • le support de différents patterns de communication suivant l’association des types de sockets ZeroMQ :
    • PUB-SUB : l’association des sockets PUB et SUB permet de distribuer des messages d’un publisher (socket PUB) vers N consommateurs (socket SUB) qui reçoivent tous les mêmes messages,
    • REQ-REP : le pattern classique de requête/réponse (1 publisher, 1 consommateur),
    • PUSH-PULL : la socket de type PUSH distribue les messages en round-robin dans la file de messages des sockets de type PULL connectées : 1 publisher et N consommateurs qui se répartissent la consommation des messages.

Pour utiliser ces patterns de communication, une socket ZeroMQ est capable de se connecter à plusieurs autres sockets ZeroMQ. Le comportement lors de l’envoi d’un message sur une socket ZeroMQ connectée à plusieurs sockets dépendra du type de cette socket.

Délégation de tâches scalable et tolérante à la panne

Bien que ZeroMQ fonctionne sans broker, il peut être nécessaire de réintroduire dans l’architecture un élément de ce type afin de limiter le nombre de connexions réseau. Par exemple, dans une architecture de délégation de tâches, avec 100 publishers et 100 workers, sans broker, il faudra établir 10 000 connexions (100 publishers * 100 workers) qui risquent d’engorger le réseau. Avec 2 brokers (pour supporter la panne de l’un d’entre eux), il faudra 400 connexions (100 publishers * 2 brokers + 100 workers * 2 brokers).

ZeroMQ propose 2 autres types de sockets qui permettent notamment le routage de messages :

  • ROUTER : cette socket ajoute aux messages entrant l’identifiant de la socket ZeroMQ source dans une nouvelle frame qui devient la première frame du message. Ensuite, elle utilise cet identifiant des messages sortants (en supprimant la première frame du message) pour sélectionner la socket destination du message (si elle est connectée bien sûr). En modifiant cette frame, nous sommes capables de personnaliser le routage des messages sortants via cette socket. L’identifiant d’une socket peut être spécifié à la création sinon il sera généré aléatoirement par ZeroMQ.
  • DEALER : cette socket permet de recevoir et envoyer des messages (contrairement à la socket PUSH qui peut seulement en envoyer). L’envoi de messages se faisant en round-robin aux sockets connectées.

Avec ces différents types de sockets nous pouvons les assembler de la façon suivante pour obtenir notre système de délégation de tâches asynchrones avec les caractéristiques suivantes :

  • chaque publisher peut publier une tâche sur n’importe quel broker,
  • chaque worker peut récupérer une tâche et publier le résultat sur n’importe quel broker,
  • les brokers sont capables de router le résultat d’une tâche vers le publisher qui l’a publiée.


Distribution de tâches avec ZeroMQ

NB : Nous utilisons ici 3 brokers, pour éviter que les workers envoient systématiquement le même type de message au même broker (ce qui aurait été le cas avec 2 brokers car le worker envoie alternativement un message READY et un message RESULT).

Cette architecture permet de supporter la perte de n’importe quel composant mais aussi d’ajouter des publishers, des brokers ou des workers suivant la charge de tâches à traiter.

  • Publication des tâches
    En utilisant une socket de type DEALER du côté des publishers, celle-ci étant connectée à tous les brokers, les messages sont envoyés en round-robin (par ZeroMQ) vers les brokers (étapes 1, 2 et 3 à partir des publishers). Si un broker venait à disparaitre, la socket DEALER continuerait à dialoguer avec les brokers restants et lorsque le broker perdu serait de nouveau accessible, ZeroMQ se chargera de se reconnecter sur ce broker qui réintègrera alors le cycle de round-robin d’envoi de messages.
    Les messages envoyés ne contiennent qu’une seule frame contenant les données de la tâche à exécuter :
    Format du message envoyé par un publisher
    Code du Publisher :

    var broker = zeromq.createSocket('dealer');
    ["tcp://broker1:5555",
     "tcp://broker2:5555",
     "tcp://broker3:5555"].forEach(function(address) {
      broker.connect(address);
    });
    ...
    broker.send("task data");
  • Les messages provenant des publishers sont reçus sur une socket ROUTER sur un des brokers. Celle-ci ajoute une frame contenant l’identifiant de la socket DEALER du publisher qui a envoyé le message. Ceci permettra, si le message est envoyé via une socket ROUTER connectée à ce publisher, de router le message vers le bon publisher.
    Le broker a la charge de gérer une file de messages en mémoire, dans un fichier ou dans une base de données selon les besoins (garantie de ne pas perdre de message, performances, …).
    Les messages traités par le broker contiennent donc 2 frames :
    Format du message traité par le broker
    Code du broker :

    var publishers = zeromq.createSocket('router');
    publishers.on('message', function() {
      // Process messages from publishers
      var publisher_id = arguments[0],
          task_data = arguments[1];
    
      // Store in a queue
      ...
    });
    publishers.bindSync("tcp://*:5555");
  • Notification d'un worker prêt à traiter une tâche
    Du côté workers, ces derniers se connectent à tous les brokers via une socket DEALER. Ils envoient un message de type READY (en utilisant une frame) pour récupérer une tâche disponible sur les brokers (étapes 1 et 4 à partir des workers). Le type du message est géré dans le code applicatif et non pas par ZeroMQ. Il permet au code applicatif du broker de différencier les messages en provenance des workers :
    • READY pour préciser que le worker est prêt à traiter une tâche,
    • RESULT pour préciser que le message contient le résultat d’une tâche qui devra être transmis au publisher ayant publié la tâche.

    En utilisant une socket DEALER, les messages seront donc alternativement envoyés sur l’un ou l’autre des brokers et la socket gérera la perte d’un broker.
    Le message envoyé par un worker pour récupérer une tâche ressemble donc à :

    Format du message READY
    Le code du worker :

    var broker = zeromq.createSocket('dealer');
    broker.on('message', function() {...});
    ["tcp://broker1:5555",
     "tcp://broker2:5555",
     "tcp://broker3:5555"].forEach(function(address) {
      broker.connect(address);
    });
    broker.send(READY);
  • De la même manière que du côté publisher, un message READY se verra ajouter une frame contenant l’identifiant du worker qui l’a envoyé par la socket ROUTER du broker :
    Format du message READY sur le broker
  • Avec un message contenant une tâche à exécuter (provenant d’un publisher) et d’un message READY (provenant d’un worker disponible), le broker peut constituer un message contenant la tâche en plaçant l’identifiant d’un worker disponible dans la première frame afin que le message soit routé vers ce worker par la socket ROUTER (qui supprimera au passage cette première frame):
    Format du message contenant la tâche et envoyé par le broker au worker
    Le code du broker devient alors :
    var workers = zeromq.createSocket('router');
    workers.on('message', function() {
      // Process messages from workers
      var worker_id = arguments[0],
          message_type = arguments[1]; // READY
    
      if (message_type == READY) {
        // dequeue tasks
        var task, publisher_id = ...;
        workers.send(worker_id, publisher_id, task);
      }
    });
    workers.bindSync("tcp://*:5555");
  • Récupération d'une tâche par un worker prêt
    Le worker (celui qui a envoyé un message READY) reçoit alors le message (étape 2 à partir du broker1) suivant contenant la tâche à exécuter :
    Format du message reçu par un worker
    Le handler de réception d’un message sur le worker :
    broker.on('message', function() {
      // Process task messages from broker
      var publisher_id = arguments[0],
          task_data = arguments[1];
      // process task
      ...
    });

Cette architecture permet aux workers de renvoyer une réponse au publisher si l’on veut par exemple que le publisher réponde de façon synchrone sans pour autant accaparer le CPU et bloquer les requêtes suivantes (cas de NodeJS ici). Le cheminement inverse s’effectue alors de la façon suivante :

  • Envoi du résultat d'une tâche par un worker
    Le worker envoie un message contenant le résultat via sa socket DEALER vers les brokers (toujours en round-robin et suivant la disponibilité de ces derniers) en utilisant une frame pour spécifier un type de message RESULT (étape 3 à partir des workers) :
    Format d'un message résultat envoyé par un worker
    Code du handler de traitement des messages par un worker :
    broker.on('message', function() {
      // Process task messages from broker
      var publisher_id = arguments[0],
          task_data = arguments[1];
      // process task
      ...
    
      // Send a result to the publisher
      broker.send(RESULT, publisher_id, result);
      // Send READY message to get a new task
      broker.send(READY);
    });
  • Un des brokers reçoit le message via sa socket ROUTER qui ajoute une frame avec l’identifiant du worker. Dans ce cas-ci, l’identifiant nous sera inutile.
    Format du message de résultat reçu par le broker
    L’identifiant du publisher à l’origine de la tâche étant toujours présent dans le message, il est possible de lui envoyer le message suivant sur la socket ROUTER sur laquelle sont connectés les publishers. La socket routera alors le message vers le bon publisher :

    Format du message résultat envoyé par le broker au publisher
    Le handler des messages en provenance des workers devient alors :

    workers.on('message', function() {
      // Process messages from workers
      var worker_id = arguments[0],
          message_type = arguments[1], // READY or RESULT
          publisher_id = arguments[2],
          result = arguments[3];
    
      if (message_type == READY) {
        // dequeue tasks
        var task, publisher_id = ...;
        workers.send(worker_id, publisher_id, task);
      } else if (message_type == RESULT) {
        publishers.send(publisher_id, result);
      }
    });
  • Transmission du résultat d'une tâche au publisher
    le publisher reçoit alors le message contenant le résultat sur sa socket DEALER (étape 4 à partir du broker2) :
    Format du message de résultat reçu par un publisher
    Le handler de messages d’un publisher :
    broker.on('message', function() {
      // Process messages from broker
      var result = arguments[0];
      ...
    });

Conclusion

Comme nous avons pu le constater, ZeroMQ permet d’élaborer une solution adaptée à ses besoins en combinant les différents types de sockets disponibles (d’autres exemples sont accessibles dans le guide ZeroMQ). Cette souplesse vient tout de même au prix d’une certaine complexité puisqu’il faut élaborer son propre protocole (en utilisant les frames des messages ZeroMQ) et traiter quelques sujets qui n’ont pas été abordés dans cet article comme la gestion des timeouts. Par exemple, lorsqu’un broker reçoit avec succès un message READY mais ne peut répondre au worker en raison d’une coupure réseau ou d’un crash du broker, dans ce cas là, sans timeout, le worker pourrait attendre indéfiniment que le broker lui renvoie une tâche à exécuter.
ZeroMQ offre aussi de très bons résultats concernant les performances, que ce soit la latence ou le débit de transmission des messages. Ceci en fait une bonne solution pour les systèmes devant traiter de gros volumes de messages (en nombre ou en taille). Il est cependant moins adapté lorsque l’on ne peut pas se permettre de perdre des messages : dans l’exemple de cet article, la file de messages est n’est pas persisté sur le broker et ZeroMQ ne gère pas de transaction; un message envoyé aux brokers n’est donc pas garantir d’atteindre un worker ou un publisher si un broker crash.

10 commentaires sur “Délégation de tâches avec ZeroMQ”

  • bonjour, pouvez-vous détailler l'affirmation "aucune de ces deux solutions ne permet de satisfaire l’ensemble de ces critères" en ce qui concerne RabbitMQ, car pour moi, cette solution satisfait tous les critères mentionnés (tolérant à la panne, scalable, pouvoir intégrer des systèmes hétérogènes). Merci
  • Je suis également preneur des arguments quant à cette solution, qui me semble également remplir ces critères.
  • Bonjour, Je me suis également fait la même remarque.. J'utilise peu RabbitMQ, mais il me semble qu'il réunit les critères : - Tolérant à la panne. - Scalability horizontale. - Intégration multi-langage. RabbitMQ peut-être utilisé en cluster actif/passif : http://www.rabbitmq.com/ha.html De plus il est possible d'écrire chaque message sur le disque dur est ainsi de ne pas utiliser que la mémoire RAM. Cela permet de répondre à des problématiques simple de coupure de courant. Pour l'évolution horizontale, RabbitMQ peut-être placer en cluster.. Donc cela réponds au besoin. Et pour le multi-langage, voici les différents portages des clients AMQP : http://www.rabbitmq.com/devtools.html Il me semble que ZeroMQ dispose de portage plus nombreux que RabbitMQ. Mais pour l'exemple, un portage JS & Ruby existe. Armetiz.
  • Bonjour, Vous avez tout à fait raison, les critères que je cite sont effectivement satisfaits par RabbitMQ. J'ajouterai donc une précision sur le contexte dans lequel la solution de l'article a été mise en place : nous avons privilégié une faible latence (nécessaire dans certain de nos cas d'usage) plutôt que la garantie de ne pas perdre de message. Hors, la tolérance à la panne de RabbitMQ est assuré par réplication de la file de message sur le broker qui offre une meilleure garantie de ne pas perdre de message au prix d'une latence plus importante. Un autre point me fait m'interroger sur la capacité de RabbitMQ à scaler une file de message : la réplication d'une file de message se fait en master/slaves, ce que je comprends par le fait que la file master distribue les messages à l'ensemble des workers (même si ces derniers peuvent se connecter sur les différents noeuds du cluster RabbitMQ). Mais peut-être ai-je mal compris le fonctionnement des queues répliquées de RabbitMQ ? Merci pour vos retours, David
  • Sur un gros site en production depuis un moment, RabbitMQ supporte plus de 10 millions de messages par jour sans frémir (CPU stable et faible, consommation mémoire ridiculement petite). Nous avons choisi RabbitMQ car nous voulions avoir la garantie de ne perdre *aucun* message. Avant de se demander si le mécanisme de réplication des queues de RabbitMQ peut l'empêcher de scaler, il faut faire des essais et voir quelle limite on atteint. Je n'ai pas réussi à la mettre en défaut ou à genoux... alors son mode de réplication ne m’importe peu, puisqu'il fonctionne "out of the box". Je suis extrêmement satisfait de RabbitMQ.
  • 10 millions de messages par jour fait un peu plus de 100 messages par seconde. Dans notre contexte, nous espérons (cela dépendra du succès de notre plateforme :) ) un trafic un peu plus important. En effet, s'agissant d'une plateforme multi-tenant, nous devons prévoir un débit de messages plus important. En fait, si le besoin de perdre aucun message avait été nécessaire, je pense aussi que RabbitMQ me parait une solution prometteuse.
  • Oui, 100/s c'est une moyenne. Bien évidement, le trafic n'est pas en permanence à 100/s : on a de gros pics. Ce qu'on constate : - les machines rabbit restent tranquilles (impossible de voir qu'un gros burst est en cours) - on ne perd aucun message.
  • Connaissant assez bien RabbitMQ, il me semble également qu'il répond à tous ces critères. Il est possible que ZeroMQ soit plus "performant" mais sans test précis, je ne m'avancerai pas. Pour avoir rapidement évalué les 2 d'un point de vue fonctionnalités, il y a quelques avantages vraiment intéressants à RabbitMQ : - possibilité de persister les messages sur le disque - gestion de l'acknowledge (automatique, manuel ou multiple, "nack") - interface de gestion : le fait d'avoir un broker central permet d'avoir une interface de gestion centralisée (plugin management de RabbitMQ) pour voir ce qui se passe sur le broker, purger des files, changer les bindings,... Très utile en dev et peut également bien dépanner par la suite. Sauf erreur de ma part, tout ça n'est pas dans ZeroMQ. Après, c'est aussi une question de goût et on doit surement pouvoir ré-implémenter ça avec ZeroMQ, mais j'avoue qu'avoir tout ça "out-of-the-box" m'avait particulièrement séduit.
  • Moi ce que j'aime bien dans RabbitMQ, c'est que le logo est un lapin. J'ai d'ailleurs pu le représenter en LEGO (http://cl.ly/260A0m0P3h07331w343q) et c'est un aspect qui me parait difficile à implémenter dans 0MQ.
  • @Matthieu Bonjour, Pour moi, les fonctionnalités d'administration que vous citez sont possibles avec Rabbit du fait de l'architecture à base de broker. Je suppose qu'avec ZeroMQ, qui est décentralisé, la topologie est "inscrite" dans le comportement des acteurs, plutôt que configurée dans un broker.
    1. Laisser un commentaire

      Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *


      Ce formulaire est protégé par Google Recaptcha