Complex Event Processing avec Esper

le 28/10/2009 par Nicolas Salmon
Tags: Software Engineering

Dans un précédent article, Karim Ben Othman nous introduisait le concept de Complex Event Processing (CEP).

Afin de rendre cette notion plus concrète, je vais au travers de cet article, vous présenter un framework de CEP open source : Esper.

Je vous invite à découvrir pas à pas, comment nous pourrions implémenter, avec Esper, le cas d'utilisation suivant : la détection de bagages égarés à l’aéroport.

UseCase RFID Aeroport

Le schéma ci-dessus illustre le système mis en place.

Prenons le scénario suivant : En partant du postulat que chaque bagage, une fois enregistré, porte une puce RFID que des antennes sont capables de localiser. Les antennes transmettent les messages à une application « centrale » avec les bagages qu’elles ont identifiés dans leur périmètre. Pour détecter les problèmes nous considérons que le temps maximum d’un « voyage » sur le tapis est de n secondes, et, qu’au delà de ce délai, le bagage doit être considéré comme manquant. La méthode consiste donc à vérifier qu’un même bagage est passé par les 4 antennes dans l’intervalle de temps attendu et donc, qu’il existe 4 évènements détectés pour un même bagage dans les n dernières secondes.

Esper : un framework open source dédié au CEP

Esper est un Framework open source dédié au CEP, disponible pour Java et depuis peu pour .Net également.  Dans le cadre de cet article, je prendrais pour exemple l’implémentation Java, mais les concepts abordés sont facilement transposables pour l’univers .Net. Esper se présente sous la forme d’une librairie Java (Jar), qui peut donc facilement s’insérer dans une application Java standalone, un serveur d’application ou un conteneur web de type Tomcat. EsperTech, la société derrière Esper fournit autour de ce Framework 2 autres solutions :

- Esper Enterprise Edition : ajoute un client lourd pour faire du monitoring sur une application intégrant Esper (suivi des évènements en temps réél, reporting, ...), accès JMX

- Esper HA : pour les besoins en haute disponibilité, tolérance aux pannes  : gère le clustering avec un cache distribué, prévention de la saturation mémoire grâce à un mécanisme de débordement vers disque ou base de données.

Rentrons maintenant dans le vif du sujet, et découvrons comment fonctionne Esper.

Implémentation du use case

L’implémentation des règles de traitement des évènements se fait via un langage de requêtage, proche du SQL, appelé EPL (Event Processing Language), qui permet de requêter le contenu des évènements. En effet, plutôt que de requêter sur des données historiques comme dans une base de données classique, avec Esper, les données sur lesquelles nous allons effectuer nos requêtes est un ensemble d’évènements se produisant sur une période de temps donnée.

Requêtage sur flux d’évènements en utilisant une fenêtre de temps

Définition des évènements

Un évènement est un message provenant d'un autre système. Par exemple, un flux d'information financière viendrait nous indiquer que le prix de l'action IBM est passé à 120,60$.

Avec Esper, on dispose de plusieurs moyens pour définir des évènements : XML, POJO, … Pour pouvoir se connecter au monde extérieur est récupérer ces évènements, Esper fournit une interface de type "Adapter" qui permet de faire le pont avec diverses sources d'information. On dispose par exemple d'une implémentation d'adapter pour JMS.

Dans notre exemple, les évènements fournis par les antennes RFID contiennent un couple « identifiant du bagage » et « identifiant de l’antenne ». En choisissant l’implémentation en pur Java, un événement peut être défini de la sorte :

public class LuggageEvent {

private String id;
private String antennaId;

public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAntennaId() {
return antennaId;
}
public void setAntennaId(String antennaId) {
this.antennaId = antennaId;
}
}

Déclaration de ma requête

A vrai dire, c’est ici que nous rentrons vraiment dans le cœur du sujet et que se trouve toute l’intelligence de notre application. Il s’agit d’écrire la requête qui va permettre de détecter le sous-ensemble d’évènements nous intéressant. Reprenons notre exemple et voyons voir comment nous pourrions l’exprimer à travers le langage de requêtage EPL. Voici les règles que je souhaite fixer dans mon système :

  • un bagage non perdu a été détecté par les 4 antennes dans un intervalle de temps de 5 minutes

  • un bagage perdu a été détecté par 1, 2 ou 3 antennes seulement dans l’intervalle de temps de 5 minutes

Nous avons vu plus haut, que le requêtage sur flux d'évènements peut se faire avec une fenêtre de temps donnée, celle ci sera donc de 5 minutes dans notre exemple.

SchemaTimeWindow

Concept de time window illustré pour 3 bagages

Venons-en à la création de notre requête EPL. Même si au final celle-ci sera relativement concise, une démarche rigoureuse et une bonne compréhension des concepts sont nécessaires.  Je vais donc détailler la démarche pas à pas. Pour sélectionner l’ensemble de mes évènements de type "LuagageEvent" définis en Java :

select * from LuggageEvent ;

Facile jusqu’ici quand on connaît le SQL. Je souhaite maintenant requêter sur les évènements se produisant sur les 5 dernières minutes. Je vais utiliser ici le concept de « time window », fenêtre de temps, en mode « time batch ». Le mode « time batch » me permet d’accumuler les évènements sur une période de 5 min et de requêter ensuite sur ces évènements. Une nouvelle fenêtre de 5 min, indépendante de la précédente,  est créée à l’issue de ces 5 min.

select * from LuggageEvent.win:time_batch(5 min)

J’arrive désormais à sélectionner tous les évènements se produisant sur les 5 dernières minutes, mais je voudrais maintenant avoir des paquets d’évènements pour un même bagage, afin de pouvoir compter le nombre d’évènements pour un même bagage. Pour cela le langage EPL introduit la notion de « group by », qui permet de grouper les évènements par identifiant de bagage dans notre cas. Rien de nouveau donc par rapport au SQL, ce qui est une bonne chose, et facilite la prise en main du langage. Nous pourrions coder la requête ainsi :

select * from LuggageEvent.win:time_batch(5 min) group by id

Le problème avec cette solution est que le comptage des évènements se fait à partir du premier bagage détecté (sur la fenêtre de temps de 5 minutes). Ce n’est pas ce que l’on souhaite, on voudrait avoir une fenêtre de temps dédiée à chaque bagage. Nous allons donc ajouter à notre requête une instruction qui déclenchera la création d’une fenêtre de temps par bagage : c'est l'opérateur "groupby" de la "standard view set", qui va créer des sous-vues, propres à chaque bagage.

select * from LuggageEvent.std:groupby(id).win:time_batch(5 min)

Ok, maintenant je vais pouvoir compter le nombre d’événements par bagage sur l’intervalle individuel de 5 min, correct ? Presque … EPL fournit une instruction « count » permettant de compter le nombre de résultats remontés par la requête :

select * from LuggageEvent.std:groupby(id).win:time_batch(5 min) having  (count(*)!=4  and count(*)!=0)

Oui, … mais non.  Cette requête ne fonctionnera pas car l’opérateur « count » est une fonction de type « agregate », qui nécessite de travailler sur des groupes d’évènements (en utilisant « group by »). On peut ensuite poser les conditions en utilisant l'opérateur « having ». L’instruction « group by » présentée plus haut est donc indispensable pour pouvoir faire un count sur un identifiant de bagage.

select * from LuggageEvent.std:groupby(id).win:time_batch(5 min)
group by id having (count(*)!=4  and count(*)!=0)

Une dernière étape pourrait être de restreindre le select, uniquement sur l’identifiant du bagage, car c’est le seul attribut qui m’intéresse pour le traitement post requête. Voici donc pour la partie intelligente. La requête est plutôt succincte, le langage étant expressif et complet.

Remarque : la requête ne permet pas d’assurer que l’on soit passé par toutes les antennes, mais seulement que le bagage ait été détecté par 4 antennes.

Utilisation du moteur de CEP dans notre application

Il est temps maintenant d’instancier les objets nécessaires à l’utilisation d’Esper dans le code de notre application :

//configure engine
Configuration config = new Configuration();

// Declaration of the service provider
EPServiceProvider epService =
EPServiceProviderManager.getProvider("myCepEngine",config);

On déclare ici un objet Configuration qui permet de configurer le moteur de CEP. Le moteur en lui même est un objet de type EPServiceProvider.

On enregistre ensuite la requête que nous venons de créer, de la manière suivante :

String lostLuggaeStmt =
"select id from LuggageEvent.std:groupby(id).win:time_batch(5 min)"+
" group by id having (count(*)!=4  and count(*)!=0)";
EPStatement statement =
epService.getEPAdministrator().createEPL(lostLuggaeStmt);

En ayant déclaré ma requête comme ci-dessus, un événement sera déclenché dans le système à chaque fois que cette requête remontera des résultats.

Observateur sur requête : comment réaliser une action si une de nos règles se confirme

Dans le cas où la requête génère un événement (i.e. un bagage a été perdu) une action doit pouvoir être effectuée. Pour cela je peux déclarer en tant qu’observateur la classe Java de mon choix.

// Attach a listener to lostLuggageStmt
statement.addListener(new LuggageListener());

Mon observateur (Listener), sera appelé lorsque la requête retournera des résultats. Voici un exemple d’implémentation :

public class LuggageListener implements UpdateListener {

public void update(EventBean[] newEvents, EventBean[] oldEvents) {
System.out.println("    !!!!!!!!WARNING!!!! "+newEvents[0].get("id")+" has been lost");
}

}

Mise en place des tests : simulation d'envoi d'évènements

Afin de pouvoir valider le fonctionnement du système mis en place, Esper nous fournit des classes utilitaires. Nous allons simuler la génération d’évènements dans notre système, en déclarant chacun de ces évènements dans un fichier csv comme suit :

id,antennaId,timestamp
Luggage1,A1,1
Luggage1,A2,1000
Luggage1,A3,2000
...

La colonne id fait référence à l’identifiant du bagage, antennaId à l’identifiant de l’antenne, et timestamp est le temps en ms sur lequel l’événement doit être généré. Voici un extrait de log obtenu :

event fired : id=Luggage5 antennaId = A1
!!!!!!!!WARNING!!!! Luggage4 has been lost
event fired : id=Luggage3 antennaId = A4
event fired : id=Luggage5 antennaId = A2
event fired : id=Luggage5 antennaId = A3
event fired : id=Luggage4 antennaId = A4
!!!!!!!!WARNING!!!! Luggage5 has been lost
!!!!!!!!WARNING!!!! Luggage4 has been lost

On remarque que le bagage 4 est perdu 2 fois. En effet, le premier signalement de bagage 4 perdu a été envoyé car seulement 3 événements liés à ce bagage ont été détectés pendant la période de 5 min. Cependant, un autre événement lié à ce bagage 4 arrive seul et trop tard. D’où le dernier message.

Le mot de la fin …

Prendre des décisions sur les évènements du présent et non plus sur des informations du passé est une  des promesses du Complex Event Processing, et me semble être une technologie qui intéressera assez vite bon nombre d’entreprises. Avec 2 ans d’existence, Esper est la réponse open source face à diverses solutions existantes depuis un peu plus longtemps. En dépit de sa relative jeunesse Esper m’a paru mature, et au vu de la puissance et l’expressivité de son langage de requêtage EPL semble pouvoir répondre à de nombreux cas d’utilisation. Cependant, il est indéniable qu’une bonne maîtrise des concepts apportés par Esper est nécessaire afin de pouvoir produire une solution simple et élégante.

Note : le code source de l'exemple utilisé dans cet article est disponible sur notre repository Subversion.