Utiliser Hadoop pour le calcul de la Value At Risk Partie 3

le 04/11/2010 par Marc Bojoly
Tags: Software Engineering

Dans le premier article de cette série, j'ai introduit pourquoi le framework Hadoop pouvait être utilisé pour calculer la VAR et analyser les valeurs intermédiaires. Dans le second article j'ai décrit une première implémentation. Un inconvénient de cette précédente implémentation est qu'elle ne tire pas pleinement partie du pattern reduce. Je réalise le travail manuellement. Je vais désormais utiliser pleinement la fonctionnalité reduce.

Seconde implémentation : extraction de la VAR par le reducer

Si je code simplement une classe reduce avec la signature public void reduce(DrawKey key, Iterator values, OutputCollector output, Reporter reporter), cette méthode sera appelée une fois par clé scenario Id;price value une fois par tirage, car chaque tirage génère un prix différent. Les appels successifs à la méthode reduce() sont totalement indépendants (aucune variable partagée). Ainsi, comme il n'est pas possible de savoir si 1% des valeurs a déjà été traité, on ne peut pas savoir quand extraire le percentile et en déduire la VAR de cette façon. Cependant, Hadoop fournit une façon de trier les clés selon un certain ordre et de les grouper selon un autre critère. Cette fonctionnalité va m'aider à trouver une solution. L'exemple fournit par Hadoop utilise des clés contenant deux entiers et montre comment grouper ces clés par le premier entier et les grouper selon le second. Je vous propose d'utiliser cela pour extraire le percentile. J'ai créé un nouveau programme dont je ne vais pas vous détailler l'intégralité du code mais simplement un résumé. Le programme VarSimpleOptionReduce définit deux jobs :

  • un job avec la précédente classe mapper et pas de classe reduce (ou un reducer idempotent). Cela permet de calculer et de générer les prix des calls et les stocker
  • un job sans tâche map et le IntermediateResultSortReducer que je vais vous décrire ci-dessous. Hadoop va tirer la donnée et le reducer var extraire la VAR.

La configuration correspondante est la suivante:

/**
 * Configuration adapted for a binary compressed result
 * @param args
 * @return
 */
private JobConf buildGenerateMapperConfBinaryResult(String[] args) {
	JobConf jobConf = new JobConf();
	//identical...
	jobConf.setMapperClass(IntermediateResultMapperBinary.class);
	jobConf.setNumReduceTasks(0);
	//...
	return jobConf;
}

/**
 * Configuration adapted for a binary compressed result
 * @param args
 * @return
 */
private JobConf buildGenerateReducerConfBinaryResult(String[] args) {
	JobConf jobConf = new JobConf();
	//...
	jobConf.setNumMapTasks(0);
	jobConf.setReducerClass(IntermediateResultSortReducer.class);
	jobConf.setPartitionerClass(FirstPartitionner.class);
        //Inputs of the reduce tasks are sorted by DrawKey
	jobConf.setOutputKeyComparatorClass(DrawKey.Comparator.class);
        //All keys equals according to the ScenarioKey.Comparator will be send to the same reduce class
	jobConf.setOutputValueGroupingComparator(ScenarioKey.Comparator.class);
        jobConf.setNumReduceTasks(1);//Use 2 if you have two scenarios
	//...
}

La sortie de la map est répartie entre différentes partitions. La classe partitionner s'exécute sur la sortie de la tâche map avant le tri. Ensuite chaque partition est traitée par une tâche reduce différente (c'est-à-dire par un nouvel appel à une fonction reduce()). Toutes les clés pour le même scénario doivent être triées ensemble et non dans deux tâches séparées. Ainsi, l'optimisation de configuration consiste à avoir une partition et une tâche reduce par scénario.

jobConf.setPartitionerClass(FirstPartitionner.class);

Ainsi le FirstPartitionner doit prendre en compte uniquement la clé de scénario (scenario key : scenario id et percentile size).

@SuppressWarnings("deprecation")
public class FirstPartitionner implements Partitioner<drawkey , CustomResultWritable> {
	@Override
	public int getPartition(DrawKey key, CustomResultWritable value, int numPartitions) {
		return Math.abs((((ScenarioKey)key).hashCode()) * 127)%numPartitions; 
	}
	@Override
	public void configure(JobConf job) {}
}

DrawKey.Comparator.class et ScenarioKey.Comparator.class comparent des instances de DrawKey et ScenarioKey. Je vous donnerai plus de détails à ce propos dans un prochain paragraphe. De cette façon, la méthode reduce() prendra ce type d'entrée (la syntaxe avec les parenthèses est purement illustrative) :

1;10;0.513775910851316	( "252	84.31301373924966	120.0	0.05	0.2	0.513775910851316", "1;10;0.513775910851316	252	103.39569385168355	120.0	0.05	0.2	4.181165705822988", "1;10;0.513775910851316	252	123.11293496630553	120.0	0.05	0.2	14.414516512987014")

Ainsi la méthode reduce() sera appelée une fois pour chaque clé de scénario 1;10 avec 3 prix différents triés de façon croissante. Un effet de bord de la signature du reducer est qu'il sera appellé avec une clé de tirage DrawKey : 1;10;0.513775910851316. La valeur du prix du call correspondant 0.513775910851316 est simplement le prix le plus faible et doit tout simplement être ignoré car il n'a pas de sens d'un point de vue métier. Les valeurs réelles des prix calculés après chaque tirage 0.513775910851316, 4.181165705822988, 14.414516512987014 sont fournies dans les valeurs. C'est pourquoi la valeur du prix du call doit être dupliquée à la fois dans la clé (pour le besoin de tri) et dans la valeur (pour collecter les valeurs dans la fonction reduce()).

J'ai utilisé un troisième et dernier raffinement pour ma clé, en stockant également la taille du percentile à l'intérieur. Cela permet à chaque appel à la fonction reduce() d'obtenir cette valeur d'une façon simple et centralisée. La forme finale de ma clé est donc scenario id;percentile size;call price. Nous allons pouvoir regarder l'implémentation du reducer :

@SuppressWarnings("deprecation")
public final class IntermediateResultSortReducer extends MapReduceBase implements Reducer<drawkey , CustomResultWritable, DrawKey, CustomResultWritable> {
	/**
	 * Prerequisite: Key are grouped by ScenarioKey and sorted by DrawKey
	 * So we have for each ScenarioKey, values sorted by Price 
	 */
	@Override
	public void reduce(DrawKey key, Iterator<CustomResultWritable> values,
			OutputCollector<drawkey , CustomResultWritable> output, Reporter reporter)
			throws IOException {
		//Collect only the s
		CustomResultWritable previousResult = new CustomResultWritable();
		CustomResultWritable currentResult = new CustomResultWritable();
		int countDown = key.getPercentileSize().get();
		while(countDown >= 0 && values.hasNext()) {
			if(countDown == 0) {
				output.collect(key, values.next());
				reporter.setStatus("Reduce ended");
				return;//Exit
			}
			else {
				currentResult = values.next();
				System.out.println(currentResult);
				if(currentResult.getPrice() < previousResult.getPrice()) {
					reporter.setStatus("Data are not sorted");
					System.err.println(String.format("Previous price: %s, current price: %d", previousResult.getPrice(), currentResult.getPrice()));
				}
				
			}
			countDown--;
			if(!values.hasNext()) { 
				String err = String.format("Reducer: Key %s has only %d values with an expected percentile of %d", key.toString(), key.getPercentileSize().get()-countDown, key.getPercentileSize().get());
				System.err.println(err);
				reporter.setStatus(err);
			}
		}
	}
}

La fonction reduce() lit la taille du percentile - que je noterai p - et collecte les p valeurs les plus faibles. Le fichier de sortie contient la VAR 0.191 et les paramètres correspondants.

1;10000;4.1013116251405945E-10	252	78.12793687367298	120.0	0.05	0.2	0.19109501386036332

Ainsi, comme pour la première implémentation, j'ai tiré bénéfice de la fonctionnalité de tri d'Hadoop. Mais en plus :

  • J'utilise un partitionner ce qui me permet de créer une partition par scénario. Cela permet à Hadoop de traiter chaque fichier d'entrée de la phase reduce sur un noeud, chaque partition pouvant être placée dans un fichier différent
  • J'utilise un SecondarySortPartitionner mer permettant de récupérer toutes les valeurs d'un scénario dans la même tâche reduce. De cette façon, Hadoop peut paralléliser la phase reduce pour chaque scénario en instanciant plusieurs tâches reduce

Cela conclut la seconde implémentation du calcul de VAR sur Hadoop et cet article de cette série. J'ai utilisé d'autres optimisations dans mon code que je vais vous expliquer dans le prochain article. Enfin dans les derniers articles de la série, je vous donnerai quelques mesures de performances et je conclurai sur l'intérêt d'utiliser Hadoop pour le calcul de la VAR.