GemFire et traitement distribué

le 07/10/2011 par Nicolas Colomer
Tags: Software Engineering

Les enjeux autour de la donnée sont en train de changer par rapport à ce que l’on connait depuis les SGBDR : volume de plus en plus important, nombre d’utilisateur croissant, accès concurrents et transactionnels intensifs à la fois en lecture et en écriture, haute disponibilité à des niveaux coûts acceptables, coût de licence. Dans un tel contexte, les bases de données traditionnelles peuvent  montrer leurs limites.

Parmi les différentes réponses à ces enjeux que l’ont a vu émerger ces dernières années, deux d’entre elles se sont révélées particulièrement efficaces :

  • L’utilisation d’architectures distribuées permet de répartir non seulement la charge mais aussi les données sur plusieurs fournisseurs de données. Les éditeurs de SGBD proposent d’ors et déjà des versions clusterisées de leurs solutions (Oracle RAC, MySQL Cluster, PGCluster, SQL Server cluster) et de nouvelles solutions « scalable-ready » apparaissent, portées par la mouvance NoSQL (Cassandra, HBase, MongoDB, CouchDB).
  • L’utilisation de cache, pour chaque fournisseur de données, permet d’accélérer drastiquement les performances d’accès aux données. Certaines solutions appelées « memcache » ont été spécifiquement conçues pour proposer de tels systèmes. On peut citer par exemple Redis ou encore Memcached. Ces caches restent (encore bien souvent) limités à des architectures orientées « lecture ».

Les data grids ont challengé les SGBDR en réunissant les deux précédentes réponses : du partitionnement et du stockage mémoire (suivant les configurations choisies). Les acteurs sont multiples : Oracle, Gigaspaces mais également GemStone (VMWare)

1. GemFire, kesako ?

GemFire est la solution de cache distribué éditée par GemStone Systems, récemment acquis par VMWare. Encore appelé « VMware vFabric GemFire », GemFire est un composant de l’écosystème vFabric de VMware.

Cette solution propose, en plus de son framework, des mécanismes avancés de partitionnement (« sharding »), de persistance et de réplication. Ecrit en Java, GemFire peut fonctionner sur une JVM 32 ou 64 bits, c’est à dire sur n’importe quel OS compatible. Les clients peuvent être écrits à la fois en C++, C# ou Java grâce aux API fournies.

GemFire est un cache orienté-objet qui offre les services d’une base de donnée traditionnelle : OQL (une extension orientée objet du langage SQL), transactions (implémente l’interface JTA), cohérence des données (selon le paramétrage choisi), persistance sur disque (via les mécanismes de write behind ou write through), ou encore tolérance aux pannes.

En plus de ces fonctionnalités classiques, on retrouve également des mécanismes plus spécifiques comme le « continuous querying » permettant d’être notifié lorsque le résultat d’une requête a changé, ou le « function execution », permettant de distribuer un traitement au sein du clustrer GemFire. Ce sujet sera l’objet de la deuxième partie de cet article

1.1. Architecture de Gemfire

Un système distribué GemFire se compose de membres, des processus Java connectés les uns aux autres au travers d’un réseau. Chaque membre embarque un cache, fonctionnant soit en mode stockage (« data host ») soit en mode proxy (« data accessor »). Dans le premier cas, des données peuvent être écrite localement puis propagées ; dans l’autre cas, le cache agira comme une simple passerelle vers les données  distantes contenues dans la grille de données.

On distingue ainsi deux types de membre :

  • Le client, généralement une application embarquant un cache GemFire et connecté au système via une API.
  • Le  serveur (ou « cacheserver »),  un membre spécialisé dans le stockage de données qui s’exécute de façon autonome dans une JVM dédiée.

Ces membres se découvrent entre eux de deux façons :

  • Par l'intermédiaire de Locators. Ces sont des membres spécialisés qui maintiennent à jour une liste des membres composant l’architecture distribuée.
  • Par multicast. Chaque nouveau membre manifeste sa présence en émettant sur une IP et un port préalablement défini.

Dans le schéma précédent, on observe que le système distribué peut adopter plusieurs topologies :

  • Peer-to-peer : les données sont partagées entre plusieurs clients (liens bleus).
  • Client/serveur : les données sont partagées au sein d’un cluster dédié de serveurs sur lequel peuvent se connecter les clients (liens verts et gris).
  • Multi-site : plusieurs systèmes distribués sont reliés entre eux  via un ou plusieurs membres déclarés en tant que « gateway hubs » (non représenté).

Pour synchroniser les différents nœuds, GemFire propose une API de propagation par delta permettant de réduire les échanges sur le réseau, surtout dans le cas de régions fortement répliquées.

1.2. La sémantique

Dans GemFire, le concept principal est celui de région. Une région, encore appelé « data set », est un groupe logique dans lequel sont stockées les données.

Chaque région possède sa propre configuration (modèle de données, type de partitionnement, présence de réplication, nature de la persistance, gestion des événements). Cette configuration devra être déclarée à l’identique dans chacun des membres utilisant la région. Elle se fait via un descripteur XML ou programmatiquement, au travers de l’API GemFire.

Toute région implémente la bien-connue interface java.util.Map : la manipuler ne devrait pas poser de problème ! Les données y sont donc stockées sous forme de clef/valeur. Clefs et valeurs prennent toutes deux la forme d’un objet Java, toutefois, la clef sera le plus souvent une simple chaine de caractères.

On dénombre 4 types de régions :

  • Locale, la région n’est accessible que par le membre qui l’a déclaré et ne sera donc partagé avec aucun autre membre.
  • Distribuée, chaque membre stock localement les données qu’il créé, mais les partages avec les autres membres du système.
  • Répliquée, chacun des membres du système détient une copie complète de toutes les données de la région.
  • Partitionnée, les données sont réparties au travers des différents membres de la région, soit équitablement, soit selon des règles de partitionnement préalablement définies. Il est de plus possible de répliquer ces données (« redundant copies ») sur un ou plusieurs membres, afin de garantir leur haute disponibilité. Ainsi chaque membre est responsable d’une partition et héberge n réplicas d’autres partitions.

GemFire offre 3 modes de distribution (ou « scope ») des données permettant d’assurer différents niveaux de cohérence :

  • Distributed-no-ack : aucune contrainte n’est imposée lors de la propagation d’opérations. Ce niveau offre les meilleures performances, mais est sujet aux problèmes de concurrence.
  • Distributed-ack : les opérations distribuées sont validées par un système d’acquittement. Ce niveau est moins exposé aux problèmes de concurrence
  • Global : les entrées d’une région sont automatiquement verrouillées lorsqu’elles sont modifiées. C’est le seul scope permettant d’assurer une cohérence forte des données, mais c’est aussi le moins performant.

2. Distribuer un traitement

Le requêtage des données d’une région se fait via OQL. Ce langage répond à des besoins standard en autorisant l’utilisation de clauses SQL bien-connues telles que SELECT, FROM ou WHERE. Il est également possible de réaliser des jointures entre régions et des indexes.

Toutefois, les requêtes OQL sont soumises à certaines limitations qui dépendent principalement du type de région sur lequel on les exécute. Ces limitations concernent entre autre la cohérence des résultats dans le cas de régions répliquées ou distribuées. En effet, pour ces deux types de région, les requêtes sont effectuées sur les données contenues dans le cache local, qui peut, dans certains cas (race conditions, selon la stratégie de propagation), ne pas être cohérentes à un instant donné.

Lorsque l’on aborde la question du traitement au sein d’une architecture distribuée, on entend souvent parler de MapReduce. Le concept est simple : au lieu de rapatrier les données sur un nœud pour traitement, c’est la fonction réalisant ce traitement qui est envoyé sur chacun des nœuds, c’est à dire au plus proche de la donnée. Ceci offre l’avantage de paralléliser l’exécution de la fonction sur différents nœuds tout en diminuant les échanges au niveau du réseau.

Le « function execution service » fourni par GemFire, bien qu’assez éloigné de MapReduce, s’inspire de ce mécanisme. En voici les détails.

2.1. Mécanismes mis en jeu

Toute fonction distribuée doit être préalablement déclarée et déployée sur chaque membre d’une région partitionnée lors de son déploiement (descripteur + classpath). Cela implique que ces fonctions aient été préalablement codées en Java, compilées puis éventuellement packagées.

Lorsqu’un client déclenche l’exécution d’une fonction distribuée (action non bloquante pour le client), le serveur (data host) auquel il est connecté va notifier tous les autres serveurs du cluster, contenant localement une partie des données à traiter, d’engager l’exécution de ladite fonction.

Dans le cas ou la fonction retourne des résultats, les différents serveurs vont les soumettre à un collecteur de résultat contenu dans le serveur directement connecté au client. Ce collecteur accumule les résultats jusqu’à ce que chaque serveur ait notifié l’envoie du dernier résultat.

Le client peut alors les récupérer :

  • de manière bloquante : attente de la totalité des résultats, possibilité de spécifier un timeout.
  • de manière non bloquante : le résultat récupéré par le client est alors potentiellement incomplet.

La documentation ne spécifie aucun mécanisme de callback permettant de notifier de manière asynchrone la disponibilité d’un résultat complet.

Ces différentes étapes se résument dans le schéma suivant :

Remarque : le client peut restreindre l’exécution à un, une partie ou tous les membres d’une région. Il peut également spécifier directement les clefs qui seront soumise au traitement. Ces possibilités sont très utiles, par exemple lorsqu’a été défini une politique de partitionnement spécifique.

2.2. Contexte de l’exemple

Afin d’expérimenter le traitement distribué, nous nous sommes placé dans le contexte d’un service web. Cet exemple entre dans la continuité d’un POC réalisé en interne chez Octo. Nous avons souhaité calculer à la demande des informations statistiques sur la vitesse de véhicules au travers de données brutes remontées par ces derniers.

L’architecture visée se compose des éléments GemFire suivants :

  • Deux CacheServer et un Locator, déployés sur une machine virtuelle
  • Une application web JEE, déclenchant l’exécution de la fonction distribuée à la demande d’un utilisateur

2.3. Réalisation de la fonction distribuée

L’exécution de notre fonction distribuée va nécessiter 4 classes :

  • SpeedAverageFunction, représente la fonction à exécuter sur chacun des nœuds du cluster. Cette fonction récupère dans la partition locale toutes les vitesses instantanées pour un véhicule donné. Ces données sont transmises au SpeedAverageCollector via l’objet ResultSender.
  • SpeedAverageCollector, doit collecter les résultats retournés par les différents nœuds du cluster (action 4 dans le schéma précédent).
  • SpeedAverageResult, permet de stocker les résultats prétraités. C’est l’objet qui sera renvoyé par le collecteur au client.
  • GemFireManager, récupère et fournit une instance de la région. Elle permet de déclencher l’exécution de notre fonction distribuée, en spécifiant en argument l’identifiant du véhicule dont la vitesse moyenne doit être calculé.

Détaillons tour à tour chacune de ces 4 classes.

2.3.1. SpeedAverageFunction

Cette classe contient le traitement a effectuer par chacun des data hosts auxquels elle sera soumise. Toute fonction distribuée doit implémenter les interfaces Function et Declarable, cette dernière interface permettant d’être déclarée au sein d’un descripteur XML (voir la partie déploiement). Ci-après, le code commenté de la classe SpeedAverageFunction.

import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.execute.ResultSender;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.cache.query.SelectResults;
import com.gemstone.gemfire.cache.query.TypeMismatchException;
import com.octo.rtf.ws.model.VehicleMessage;

public class SpeedAverageFunction implements Declarable, Function {

	@Override
	public void init(Properties props) {
		// Distributed function name extracted from XML descriptor
		functionName = props.getProperty("name");
	}

	@Override
	public void execute(FunctionContext fc) {
		RegionFunctionContext context = (RegionFunctionContext) fc;
		// Get local region data
		Region localRegion = PartitionRegionHelper.getLocalDataForContext(context);
		// Get the result collector
		ResultSender resultSender = context.getResultSender();
		// Retrieve arguments
		String[] args = (String[]) context.getArguments();

		try {
			// Execute the query
			SelectResults queryResults = localRegion.query(formQuery(args));
			// Extract the results
			Map results = queryRouteAverage(queryResults);
			for (String key : results.keySet()) {
				Integer[] data = results.get(key);
				// Send each result to the collector
				resultSender.sendResult(new Object[] {key, data[0], data[1]});
			}
			resultSender.lastResult(null);
		} catch (
	}

	@Override
	public boolean hasResult() {
		// Does the function return results ?
		return true;
	}

	@Override
	public boolean isHA() {
		// Is the high availability option set ?
		return false;
	}

	@Override
	public boolean optimizeForWrite() {
		// Does the function write results on region ?
		return false;
	}
}

2.3.2. SpeedAverageCollector

Cette classe implémente l’interface ResultCollector<T extends Serializable,S extends Serializable>. T correspond à l’objet en entrée, c’est à dire retourné par les data hosts lorsqu’ils récupèrent des données (dans le cas de notre SpeedAverageCollector, un tableau d’objet), et S correspond à l’objet en sortie, c’est à dire retourné au client (ici, un objet SpeedAverageResult).

Ci-après, le code commenté de la classe SpeedAverageCollector.

import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.distributed.DistributedMember;

public class SpeedAverageCollector implements ResultCollector  {

	SpeedAverageResult sar = new SpeedAverageResult();

	public SpeedAverageCollector() {}

	@Override
	public void addResult(DistributedMember memberID,
			Object[] resultOfSingleExecution) {
		// Executed when a member return a result
		if (resultOfSingleExecution == null) return;
		String type = (String) resultOfSingleExecution[0];
		int value = (Integer) resultOfSingleExecution[1];
		int weight = (Integer) resultOfSingleExecution[2];
		sar.addResult(type, value, weight);
	}

	@Override
	public void clearResults() {
		// When the function is reexucted
		sar = new SpeedAverageResult();
	}

	@Override
	public void endResults() {
		// When a member sends its last result
	}

	@Override
	public SpeedAverageResult getResult() throws FunctionException {
		// Return the result (blocks until the last result is sent)
		return sar;
	}

	@Override
	public SpeedAverageResult getResult(long timeout, TimeUnit unit)
			throws FunctionException, InterruptedException {
		// Returns the result (blocks until timeout)
		return getResult();
	}
}

2.3.3. SpeedAverageResult

Cette classe va contenir l’ensemble des résultats et leurs accesseurs. Elle doit implémenter l’interface Serializable afin de pouvoir transférée aux data hosts via le réseau. Ci-après, le code commenté de la classe SpeedAverageResult.

public class SpeedAverageResult implements Serializable {

	private Map results;

	public SpeedAverageResult() {
		results = new HashMap();
	}

	public void addResult(String type, int value, int weight) {
		if (!results.containsKey(type)) {
			results.put(type, new Integer[] {value, weight});
		} else {
			Integer[] data = results.get(type);
			data[0] += value;
			data[1] += weight;
		}
	}

	public Double getAverage() {
		int sum = 0, num = 0;
		for (String type : results.keySet()) {
			Integer[] data = results.get(type);
			sum += data[0];
			num += data[1];
		}
		if (num == 0) return new Double(0);
		else return (double) (sum / num);
	}

	public Map getAverageByHighway() {
		Map result = new HashMap();
		for (String type : results.keySet()) {
			result.put(type, getAverageForHighway(type));
		}
		return result;
	}

	public Double getAverageForHighway(String type) {
		Integer[] data = results.get(type);
		return (double) (data[0] / data[1]);
	}
}

2.3.4. GemFireManager

Cette classe va permettre de se connecter en tant que client au système distribué GemFire. Elle va récupérer et fournir une instance de la région qui permettra de stocker les messages de nos véhicules connectés. La méthode getSpeedAverageFromVin() permettra de déclencher l’exécution de la fonction distribuée sur notre région. Ci-après, le code de la classe GemFireManager

import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.execute.Execution;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.octo.rtf.ws.model.VehicleMessage;

public class GemFireManager {

	private static final String LOCATOR_IP = "192.168.56.101";
	private static final int LOCATOR_PORT = 55221;
	private static final String REGION_NAME = "VehicleMessageRegion";
	private static final String FUNCTION_SPEEDAVERAGE = "SpeedAverage";

	private static ClientCache cache;
	private static Region region;

	public GemFireManager() {
		// Create a local client cache
		cache = new ClientCacheFactory()
			.addPoolLocator(LOCATOR_IP, LOCATOR_PORT)
			.set("log-level", "error").create();
		// Get the region
		region = cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
			.create(REGION_NAME);
	}

	public SpeedAverageResult getSpeedAverageFromVin(String vin) {
		Execution execution = FunctionService
			.onRegion(region)
			.withCollector(new SpeedAverageCollector())
			.withArgs(new String[] {vin});
		SpeedAverageCollector collector = (SpeedAverageCollector) execution
			.execute(FUNCTION_SPEEDAVERAGE, true, false, false);
		return collector.getResult();
	}
}

2.4. Configuration et déploiement des composants

2.4.1. Définissons la région

La région que nous allons créer ici devra stocker l’ensemble des messages remontés par les véhicules. Ces messages contiennent des données brutes, parmi lesquels un timestamp, la vitesse instantanée et le type de route emprunté.

Pour définir cette région, utilisons un descripteur XML :

<?xml version=<em>"1.0"</em> encoding=<em>"UTF-8"</em>?>
<!DOCTYPE cache PUBLIC
"-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN"
"http://www.gemstone.com/dtd/cache6_5.dtd">
<cache>
	<region name=<em>"VehicleMessageRegion"</em> refid=<em>"PARTITION"</em>>
		<region-attributes>
			<value-constraint>com.octo.rtf.ws.model.VehicleMessage</value-constraint>
		</region-attributes>
	</region>
	<function-service>
		<function>
			<class-name>com.octo.rtf.ws.gemfire.SpeedAverageFunction</class-name>
			<parameter name="name"><string>SpeedAverage</string></parameter>
		</function>
	</function-service>
</cache>

Le descripteur est plutôt explicite : nous définissons ici un cache contenant une région partitionnée nommée VehicleMessageRegion, dont la valeur sera un objet VehicleMessage. La clef sera une simple chaine de caractère (non spécifiée donc nature par défaut). Enfin nous spécifions une fonction exécutable au travers de son identifiant « SpeedAverage ».

2.4.2. Dépendances Java

Locator et CacheServer sont des processus Java indépendants. Afin d’être lancés, déclarons quelques variables d’environnement permettant de régler définitivement le problème des dépendances.

# VARIABLES GEMFIRE
GF_JAVA=$JAVA_HOME/bin/java; export GF_JAVA
PATH=$PATH:$JAVA_HOME/bin:$GEMFIRE/bin; export PATH
GEMFIRE=/srv/GemFire6514
CLASSPATH=$GEMFIRE/lib/gemfire.jar:$GEMFIRE/lib/antlr.jar:$GEMFIRE/lib/gfSecurityImpl.jar:$CLASSPATH; export CLASSPATH

2.4.3. Initialisation du Locator

Afin de déployer notre système distribué GemFire, choisissons la méthode du locator (par opposition à la méthode multicast). Le locator est un processus à l’écoute d’un port, réalisant la coordination entre les différents membres du système distribué GemFire. Le Locator peut être respectivement lancé et arrêté au travers des commandes suivantes.

gemfire locator-stop -port=55221 -address=192.168.56.101
gemfire locator-start -port=55221 -address=192.168.56.101

2.4.4. Lancement des cacheserver

Pour fonctionner, un cacheserver a besoin d’un répertoire de travail dans lequel il persistera les données (si l’option est spécifiée) ainsi que les différents logs.

Démarrons deux cacheservers dans deux répertoires distincts.

mkdir ./server1 ./server2
cacheserver start locators=192.168.56.101[55221] mcast-port=0 cache-xml-file=.. /server.xml -server-bind-address=192.168.56.101 -server-port=0 -dir=./server1
cacheserver start locators=192.168.56.101[55221] mcast-port=0 cache-xml-file=../server.xml -server-bind-address=192.168.56.101 -server-port=0 -dir=./server2

Pour stopper ces deux cacheserver, il suffit d’utiliser les commandes suivantes

cacheserver stop -dir=./server1
cacheserver stop -dir=./server2

Le système est maintenant prêt à opérer. Pour voir un exemple du résultat, n’hésitez pas à consulter le screencast disponible dans la deuxième partie de l’article concernant le push web.

3. Conclusion

Nous avons pu constater au travers de cet article que :

  • Les efforts à fournir pour déployer une telle architecture sont loin d’être insurmontables.
  • L’exécution de fonctions distribuées est particulièrement simple à mettre en œuvre.

Les fonctions distribuées ne forment cependant qu’une partie de ce que propose GemFire. Le framework offre également d’autres outils, comme la propagation par deltas pour réduire la charge sur le réseau ou le mécanisme de « Continuous Querying » qui permet d’être notifié sur changement du résultat d’une requête ou de l’état d’une région…