Délégation de tâches avec ZeroMQ
La délégation de tâches en asynchrone est un moyen efficace d'alléger la charge que subissent nos systèmes. En effet, de nombreux cas d'utilisation ne nécessitent pas d'être exécutés de façon synchrone lorsqu'un utilisateur effectue une action ou qu'un événement extérieur intervient.
Par exemple, lorsqu'il n'est pas nécessaire de restituer la dernière version des données et que le traitement avant restitution est coûteux en ressources, il est possible de renvoyer des données préalablement mises en cache et de déporter en asynchrone une tâche de rafraîchissement de ce cache.
Un autre exemple concerne les systèmes qui mêlent des requêtes fortement consommatrices en ressources (CPU, mémoire, ...) et des requêtes peu consommatrices pour lesquelles on va vouloir garantir une latence faible même lorsque des requêtes consommatrices sont en cours de traitement. Pour cela, déléguer les traitements coûteux à des workers via une file de messages peut aider à garantir des temps de réponse faibles pour les autres requêtes. Cette séparation est d'autant plus importante sur des systèmes qui s'appuient sur des technologies monothreadées telles que Ruby ou NodeJS (ce dernier a néanmoins l'avantage de ne pas consommer de ressources/process lorsqu'il effectue des I/O).
Pour résoudre cette problématique de délégation de tâches, on utilise habituellement une solution comme ActiveMQ ou RabbitMQ. Seulement, lorsqu'il est nécessaire :
- d'être tolérant à la panne
- d'être scalable que ce soit en ajoutant des publishers (qui soumettent les tâches asynchrones) ou des workers (qui consomment et exécutent ces tâches)
- de pouvoir intégrer des systèmes hétérogènes s'exécutant sur différentes plateformes (Java, Ruby, NodeJS, C, ...)
aucune de ces deux solutions ne permet de satisfaire l'ensemble de ces critères. Cet article présente une solution possible à base de ZeroMQ, écrite en NodeJS et utilisée pour traiter les deux exemples précédents.
ZeroMQ : Super sockets
ZeroMQ est une bibliothèque réseau écrite en C++ et qui offre des bindings dans de nombreux langages. Elle procure une couche d'abstraction au-dessus des sockets TCP classiques pour ajouter quelques fonctionnalités :
- une communication par message. Contrairement à TCP ou UDP, un message est reçu dans son intégralité ou n'est pas reçu du tout, jamais partiellement. En effet, lorsqu'on utilise des sockets TCP, pour réceptionner des données, il est nécessaire de déterminer la taille des messages ou de rechercher un délimiteur. ZeroMQ le fait pour nous, il nous renvoie le message dans sa globalité. Les messages ZeroMQ sont composés de frames qui permettent de segmenter les informations passées.
- la gestion des reconnexions lors d'une coupure.
- la gestion d'une file de messages au niveau de la socket. Ainsi, lors de la perte d'une connexion, aucun message ne sera perdu, ZeroMQ se chargera de transmettre les messages en file d'attente lorsque la connexion aura été ré-établie. ZeroMQ offre ainsi la possibilité de créer un système de messagerie sans broker en connectant les éléments du système en peer-to-peer. La notion de broker pourra toutefois être réintroduite suivant les besoins.
- l'abstraction du protocole de transport des messages:
- TCP pour une communication sur le réseau,
- PGM pour une communication en multicast sur le réseau,
- IPC pour une communication inter-process au sein d'un même hôte,
- INPROC, pour une communication au sein d'un même processus (entre plusieurs threads par exemple).
- le support de différents patterns de communication suivant l'association des types de sockets ZeroMQ :
PUB-SUB
: l'association des socketsPUB
etSUB
permet de distribuer des messages d'un publisher (socketPUB
) vers N consommateurs (socketSUB
) qui reçoivent tous les mêmes messages,REQ-REP
: le pattern classique de requête/réponse (1 publisher, 1 consommateur),PUSH-PULL
: la socket de typePUSH
distribue les messages en round-robin dans la file de messages des sockets de typePULL
connectées : 1 publisher et N consommateurs qui se répartissent la consommation des messages.
Pour utiliser ces patterns de communication, une socket ZeroMQ est capable de se connecter à plusieurs autres sockets ZeroMQ. Le comportement lors de l'envoi d'un message sur une socket ZeroMQ connectée à plusieurs sockets dépendra du type de cette socket.
Délégation de tâches scalable et tolérante à la panne
Bien que ZeroMQ fonctionne sans broker, il peut être nécessaire de réintroduire dans l'architecture un élément de ce type afin de limiter le nombre de connexions réseau. Par exemple, dans une architecture de délégation de tâches, avec 100 publishers et 100 workers, sans broker, il faudra établir 10 000 connexions (100 publishers * 100 workers) qui risquent d'engorger le réseau. Avec 2 brokers (pour supporter la panne de l'un d'entre eux), il faudra 400 connexions (100 publishers * 2 brokers + 100 workers * 2 brokers).
ZeroMQ propose 2 autres types de sockets qui permettent notamment le routage de messages :
ROUTER
: cette socket ajoute aux messages entrant l'identifiant de la socket ZeroMQ source dans une nouvelle frame qui devient la première frame du message. Ensuite, elle utilise cet identifiant des messages sortants (en supprimant la première frame du message) pour sélectionner la socket destination du message (si elle est connectée bien sûr). En modifiant cette frame, nous sommes capables de personnaliser le routage des messages sortants via cette socket. L'identifiant d'une socket peut être spécifié à la création sinon il sera généré aléatoirement par ZeroMQ.DEALER
: cette socket permet de recevoir et envoyer des messages (contrairement à la socketPUSH
qui peut seulement en envoyer). L'envoi de messages se faisant en round-robin aux sockets connectées.
Avec ces différents types de sockets nous pouvons les assembler de la façon suivante pour obtenir notre système de délégation de tâches asynchrones avec les caractéristiques suivantes :
- chaque publisher peut publier une tâche sur n'importe quel broker,
- chaque worker peut récupérer une tâche et publier le résultat sur n'importe quel broker,
- les brokers sont capables de router le résultat d'une tâche vers le publisher qui l'a publiée.
NB : Nous utilisons ici 3 brokers, pour éviter que les workers envoient systématiquement le même type de message au même broker (ce qui aurait été le cas avec 2 brokers car le worker envoie alternativement un message READY
et un message RESULT
).
Cette architecture permet de supporter la perte de n'importe quel composant mais aussi d'ajouter des publishers, des brokers ou des workers suivant la charge de tâches à traiter.
En utilisant une socket de type
DEALER
du côté des publishers, celle-ci étant connectée à tous les brokers, les messages sont envoyés en round-robin (par ZeroMQ) vers les brokers (étapes 1, 2 et 3 à partir des publishers). Si un broker venait à disparaitre, la socketDEALER
continuerait à dialoguer avec les brokers restants et lorsque le broker perdu serait de nouveau accessible, ZeroMQ se chargera de se reconnecter sur ce broker qui réintègrera alors le cycle de round-robin d'envoi de messages. Les messages envoyés ne contiennent qu'une seule frame contenant les données de la tâche à exécuter :Code du Publisher :
var broker = zeromq.createSocket('dealer'); ["tcp://broker1:5555", "tcp://broker2:5555", "tcp://broker3:5555"].forEach(function(address) { broker.connect(address); }); ... broker.send("task data");
Les messages provenant des publishers sont reçus sur une socket
ROUTER
sur un des brokers. Celle-ci ajoute une frame contenant l'identifiant de la socketDEALER
du publisher qui a envoyé le message. Ceci permettra, si le message est envoyé via une socketROUTER
connectée à ce publisher, de router le message vers le bon publisher. Le broker a la charge de gérer une file de messages en mémoire, dans un fichier ou dans une base de données selon les besoins (garantie de ne pas perdre de message, performances, ...). Les messages traités par le broker contiennent donc 2 frames :Code du broker :
var publishers = zeromq.createSocket('router'); publishers.on('message', function() { // Process messages from publishers var publisher_id = arguments[0], task_data = arguments[1]; // Store in a queue ... }); publishers.bindSync("tcp://*:5555");
Du côté workers, ces derniers se connectent à tous les brokers via une socket
DEALER
. Ils envoient un message de typeREADY
(en utilisant une frame) pour récupérer une tâche disponible sur les brokers (étapes 1 et 4 à partir des workers). Le type du message est géré dans le code applicatif et non pas par ZeroMQ. Il permet au code applicatif du broker de différencier les messages en provenance des workers :READY
pour préciser que le worker est prêt à traiter une tâche,RESULT
pour préciser que le message contient le résultat d'une tâche qui devra être transmis au publisher ayant publié la tâche.
En utilisant une socket
DEALER
, les messages seront donc alternativement envoyés sur l'un ou l'autre des brokers et la socket gérera la perte d'un broker. Le message envoyé par un worker pour récupérer une tâche ressemble donc à :Le code du worker :
var broker = zeromq.createSocket('dealer'); broker.on('message', function() {...}); ["tcp://broker1:5555", "tcp://broker2:5555", "tcp://broker3:5555"].forEach(function(address) { broker.connect(address); }); broker.send(READY);
De la même manière que du côté publisher, un message
READY
se verra ajouter une frame contenant l'identifiant du worker qui l'a envoyé par la socketROUTER
du broker :Avec un message contenant une tâche à exécuter (provenant d'un publisher) et d'un message
READY
(provenant d'un worker disponible), le broker peut constituer un message contenant la tâche en plaçant l'identifiant d'un worker disponible dans la première frame afin que le message soit routé vers ce worker par la socketROUTER
(qui supprimera au passage cette première frame):Le code du broker devient alors :
var workers = zeromq.createSocket('router'); workers.on('message', function() { // Process messages from workers var worker_id = arguments[0], message_type = arguments[1]; // READY if (message_type == READY) { // dequeue tasks var task, publisher_id = ...; workers.send(worker_id, publisher_id, task); } }); workers.bindSync("tcp://*:5555");
Le worker (celui qui a envoyé un message
READY
) reçoit alors le message (étape 2 à partir du broker1) suivant contenant la tâche à exécuter :Le handler de réception d'un message sur le worker :
broker.on('message', function() { // Process task messages from broker var publisher_id = arguments[0], task_data = arguments[1]; // process task ... });
Cette architecture permet aux workers de renvoyer une réponse au publisher si l'on veut par exemple que le publisher réponde de façon synchrone sans pour autant accaparer le CPU et bloquer les requêtes suivantes (cas de NodeJS ici). Le cheminement inverse s'effectue alors de la façon suivante :
Le worker envoie un message contenant le résultat via sa socket
DEALER
vers les brokers (toujours en round-robin et suivant la disponibilité de ces derniers) en utilisant une frame pour spécifier un type de messageRESULT
(étape 3 à partir des workers) :Code du handler de traitement des messages par un worker :
broker.on('message', function() { // Process task messages from broker var publisher_id = arguments[0], task_data = arguments[1]; // process task ... // Send a result to the publisher broker.send(RESULT, publisher_id, result); // Send READY message to get a new task broker.send(READY); });
Un des brokers reçoit le message via sa socket
ROUTER
qui ajoute une frame avec l'identifiant du worker. Dans ce cas-ci, l'identifiant nous sera inutile.L'identifiant du publisher à l'origine de la tâche étant toujours présent dans le message, il est possible de lui envoyer le message suivant sur la socket
ROUTER
sur laquelle sont connectés les publishers. La socket routera alors le message vers le bon publisher :Le handler des messages en provenance des workers devient alors :
workers.on('message', function() { // Process messages from workers var worker_id = arguments[0], message_type = arguments[1], // READY or RESULT publisher_id = arguments[2], result = arguments[3]; if (message_type == READY) { // dequeue tasks var task, publisher_id = ...; workers.send(worker_id, publisher_id, task); } else if (message_type == RESULT) { publishers.send(publisher_id, result); } });
le publisher reçoit alors le message contenant le résultat sur sa socket
DEALER
(étape 4 à partir du broker2) :Le handler de messages d'un publisher :
broker.on('message', function() { // Process messages from broker var result = arguments[0]; ... });
Conclusion
Comme nous avons pu le constater, ZeroMQ permet d'élaborer une solution adaptée à ses besoins en combinant les différents types de sockets disponibles (d'autres exemples sont accessibles dans le guide ZeroMQ). Cette souplesse vient tout de même au prix d'une certaine complexité puisqu'il faut élaborer son propre protocole (en utilisant les frames des messages ZeroMQ) et traiter quelques sujets qui n'ont pas été abordés dans cet article comme la gestion des timeouts. Par exemple, lorsqu'un broker reçoit avec succès un message READY
mais ne peut répondre au worker en raison d'une coupure réseau ou d'un crash du broker, dans ce cas là, sans timeout, le worker pourrait attendre indéfiniment que le broker lui renvoie une tâche à exécuter. ZeroMQ offre aussi de très bons résultats concernant les performances, que ce soit la latence ou le débit de transmission des messages. Ceci en fait une bonne solution pour les systèmes devant traiter de gros volumes de messages (en nombre ou en taille). Il est cependant moins adapté lorsque l'on ne peut pas se permettre de perdre des messages : dans l'exemple de cet article, la file de messages est n'est pas persisté sur le broker et ZeroMQ ne gère pas de transaction; un message envoyé aux brokers n'est donc pas garantir d'atteindre un worker ou un publisher si un broker crash.