Utiliser Hadoop pour le calcul de la Value At Risk Partie 2

le 03/11/2010 par Marc Bojoly
Tags: Software Engineering

Dans le premier article de la série, j'ai introduit pourquoi le framework Hadoop pouvait être utile pour calculer la VAR et analyser les résultats intermédiaires. Dans ce second article je vais décrire une première implémentation du calcul de la VAR avec Hadoop.

Implémentation de la fonction map avec Hadoop

Cette implémentation est une adaptation du code que j'ai décrite dans mon précédant article. Les classes utilisées pour calculer la VAR sont les mêmes : OptionPricer, Parameters et ParametersValueGenerator. Les classes suivantes ont été ajoutées pour les besoins de Hadoop : InputParser (analyse (parsing) du fichier d'entrée), ScenarioKey et DrawKey, IntermediateResultMapper, VarSimpleOptionDirectSearch.VarSimpleOptionDriver a été introduite dans un but de configuration et VarSimpleOptionDirectSearch comme lanceur.

IntermediateResultMapper est l'implémentation de la fonction map comme décrit dans le tout premier article. Elle est très proche de l'implémentation pour GridGain:

class IntermediateResultMapper extends MapReduceBase implements
		Mapper<longwritable , Text, DrawKey, Text> {
	
	private static Logger log = LoggerFactory.getLogger(IntermediateResultMapper.class);
	
	private enum HadoopCounters {
		SKIPPED_LINE,
		COMPUTING_ERROR
	}

	@Override
	public void map(final LongWritable key, final Text value,
			OutputCollector<drawkey , Text> output,
			Reporter reporter) throws IOException {
		String line = value.toString();
		InputStruct is = InputParser.parse(line);
		if (is != null) {
			Parameters params = new Parameters(
					is.t, is.s0, is.k, is.r, is.sigma);
			ParametersValueGenerator valueGenerator = new ParametersValueGenerator(is.historicalVolatility, 0, 0);
			OptionPricer optionPricer = new OptionPricer(params);
			valueGenerator.initializeReference(params);
			valueGenerator.intializeRandomGenerator();
			try {
				int percentileSize = (int)(is.drawsNb*(1-is.varPrecision));
				DrawKey dk = new DrawKey(is.id, percentileSize, 0);
				for(int i = 0 ; i < is.jobDrawsNb ; i++) {
					final Result result = computePriceScenario(optionPricer, valueGenerator);
					dk.setValue(result.getPrice());//Optimization: set by reference to avoid object creation
					output.collect(dk, new Text(result.toString()));
				}
			} catch (Exception e) {
				reporter.incrCounter(HadoopCounters.COMPUTING_ERROR, 1);
				log.error("Error computing var", e);
			}
		} else {
			reporter.incrCounter(HadoopCounters.SKIPPED_LINE, 1);
		}
	}
	
	/**
	 * Compute the price of the call based on a scenario
	 * generated by a draw
	 */
	private Result computePriceScenario(final OptionPricer optionPricer,
		final ParametersValueGenerator valueGenerator) throws MathException {
		valueGenerator.setNewGeneratedParameters(optionPricer.getParameters());
		final double price = optionPricer.computeCall();
		return new Result(optionPricer.getParameters(),price);
	}
}

Un point notable est la signature de la méthode : public void map(final LongWritable key, final Text value, OutputCollector output, Reporter reporter). Le fichier d'entrée est lu comme une paire (LongWritable, Text). Avec un fichier standard, la clé (LongWritable) est l'offset du début de la ligne et la valeur correspond au contenu de la ligne. Par exemple:

0     1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
45   1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
...

La seconde ligne débute au caractère 45. Cet offset fournit simplement une clé unique. Cette clé est tout simplement ignorée dans cette implémentation. La fonction map produit des paires (DrawKey, Text) comme illustré ci-dessous.

1;10;0.513775910851316	252	84.31301373924966	120.0	0.05	0.2	0.513775910851316
...

La clé contient l'identifiant du scénario, le nombre total de tirages et le prix. Les valeurs contiennent le prix et les paramètres. Je vais expliquer un peu plus loin en quoi ce choix était important pour les optimisations à venir. Ainsi, tous mes tirages sont calculés et générés comme dans l'implémentation GridGain mais stockés directement dans un système de fichiers. Cela permet deux choses:

  • De calculer la VAR
  • D'analyser ensuite les résultats intermédiaires pour en extraire des informations utiles

Je décrirai ces deux points.

Implémentation de la fonction Reduce

La sortie de la tâche map peut être considérée comme les job results de GridGain (cf. mon second article). J'aurais pu utiliser exactement le même algorithme avec un tri en mémoire. Cependant, je n'aurai pas tiré parti de l'implémentation map/reduce fournie par Hadoop.

L'implémentation map/reduce d'Hadoop est basée sur le tri des clés. Les tâches Map produisent des fichiers.

key1   valueA  
key2   valueB
key1   valueC
...

Ces fichiers sont triés par clé. Ensuite, toutes les lignes avec la même clé sont combinées sous forme de paires (key, list of values).

key1 (valueA, valueC) key2 (valueB)

Ensuite, les tâches reduce sont alimentées avec de telles valeurs. La signature de la fonction reduce est reduce(DrawKey k, Iterator values, OutputCollector, Reporter reporter). Ainsi, le tri par clé est le coeur de l'implémentation map/reduce de Hadoop car cela permet de transformer les sorties de la phase map en entrées de la phase reduce. Je ne vais pas vous proposer immédiatement "l'implémentation" de reduce car plusieurs choix sont possibles. Avant cela, je vais discuter le choix de la clé qui est un point fondamental pour l'implémentation d'Hadoop.

Choix de la clé

Pour effectuer le calcul de la VAR, j'ai besoin d'avoir la liste de tous les prix calculés, de les trier et d'extraire le percentile : les 10% de valeurs les moins élevées. Enfin, je dois identifier la plus haute valeur du percentile (voir mon premier article pour plus de détails). Le traitement des travaux de map/reduce par Hadoop peut être réalisé simultanément sur plusieurs jeux de données. De façon à fournir un résultat unique pour chaque ensemble, Hadoop fonctionne selon les étapes suivantes:

  • Appliquer la fonction map() à toutes les valeurs
  • Trier les données par clé. Les clés identifient les ensembles de données, et de cette façon, grouper prépare à l'éclatement du jeu de données sur plusieurs reducers
  • Envoyer toutes les valeurs qui correspondent à une clé à une seule fonction reduce()

Pour mon implémentation, je vais tirer parti du tri fourni par le mécanisme d'Hadoop. En effet, en utilisant un simple Treeset, comme dans l'implémentation avec GridGain, je suis limité par la taille de la Heap Java. Au contraire, Hadoop fournit un algorithme de tri optimisé réparti entre le disque et la mémoire. Pour vous donner un aperçu des performances d'Hadoop dans ce domaine, je ferai référence au Terabyte sort : un benchmark qui consiste à trier 1 TB de données et qu'Hadoop a gagné.

Première implémentation du reduce: un reduce idempotent (Identity Reducer) et une extraction de la VAR par la fonction main

Ainsi, de façon à utiliser la capacité de tri de Hadoop, j'utilise la clé DrawKey contenant scenario id;call price . De cette façon, l'entrée de la tâche reduce contient pour chaque scénario successif les prix correspondants. A la fin du job Hadoop, je lis le fichier contenant les résultats triés à travers l'API du système de fichiers distribué. Je parcours les 1% de valeurs les plus faibles et identifie ainsi la VAR. Cela constitue ma première implémentation. Elle est définie dans Hadoop grâce au code suivant :

@SuppressWarnings("deprecation")
public class VarSimpleOptionDirectSearch {

	static class VarSimpleOptionDriver extends Configured implements Tool {
		private static Log log = LogFactory.getLog(VarSimpleOptionDriver.class);

		@Override
		public int run(String[] args) throws Exception {
			if (args.length != 2) {
				log.error(String.format(
						"Usage %s [generic options] <input /> <output>\n",
						getClass().getSimpleName()));
				ToolRunner.printGenericCommandUsage(System.err);
				return -1;
			}

			// Generate the prices for each draw and sort them
			JobConf generateMapConf = buildGenerateMapperConf(args);

			double startTime = System.nanoTime();
			JobClient.runJob(generateMapConf);

			Path outputPath = new Path(args[1]);
			SequenceFile.Reader[] readers = org.apache.hadoop.mapred.SequenceFileOutputFormat
					.getReaders(generateMapConf, outputPath);
			long id;
			int percentileSize;
			for (SequenceFile.Reader reader : readers) {
				try {
					DrawKey key = new DrawKey();
					Writable value = new CustomResultWritable();
					reader.next(key, value);
					do {
						id = key.getId().get();
						percentileSize = key.getPercentileSize().get();
						int cpt = 0; // Count number of draws
						log.info(key.toString() + " cpt:" + cpt);
						do { // For each value
							if (cpt == percentileSize) {
								log.info("VAR: cpt" + cpt + "\t"
										+ key.toString() + "\t"
										+ value.toString());
							} else {
								//Continue silently
							}
							cpt++;
						} while (reader.next(key, value)
								&& id == key.getId().get()
								&& percentileSize == key
										.getPercentileSize().get());
					} while(reader.next(key, value)); // End for each (id, percentileSize)
				} finally {
					reader.close();
				}
			}
			double computeTime = System.nanoTime() - startTime;
			log.info("ComputeTime " + computeTime/1000000 + " (ms.)");

			return 0;
		}

		private JobConf buildGenerateMapperConf(String[] args) {
			JobConf jobConf = new JobConf();
			//org.apache.hadoop.mapred.SequenceFileOutputFormat.getReaders()
			//tries to read the _logs directory as part of the SequenceFile
			//which leads to an error
			//Putting the history in another folder bypasses the problem
			jobConf.set("hadoop.job.history.user.location", "job_history");
			jobConf.setJarByClass(VarSimpleOptionDirectSearch.class);

			FileInputFormat.addInputPath(jobConf, new Path(args[0]));
			FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));

			// Compute intensive task: each mapper will receive a line
			jobConf.setInputFormat(NLineInputFormat.class);
			jobConf.setMapperClass(IntermediateResultMapperBinary.class);
			jobConf.setReducerClass(IdentityReducer.class);
			jobConf.setNumReduceTasks(1);

			// Set no limit to the number of task per JVM in order to take
			// advantage of HotSpot runtime optimizations after long runs
			jobConf.setNumTasksToExecutePerJvm(-1);
			jobConf.setOutputKeyClass(DrawKey.class);
			jobConf.setOutputValueClass(CustomResultWritable.class);
			jobConf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
			return jobConf;
		}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new VarSimpleOptionDriver(), args);
		System.exit(exitCode);
	}

}// End VarSimpleOption class

Notez que j'ai utilisé Hadoop 0.20.2 mais avec l'ancienne API. Hadoop est lancé par la méthode main. Une classe mère configured est fourni pour aider à lancer le job. La méthode run() le lance puis parcourt les résultats de façon à extraire la VAR. Je vais maintenant donner quelques explications sur la configuration.

jobConf.setMapperClass(IntermediateResultMapperBinary.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(DrawKey.class);
jobConf.setOutputValueClass(CustomResultWritable.class);

Les tâches map et reduce sont configurées de cette façon. J'utilise un reducer idempotent IdentityReducer qui copie simplement la sortie de la map après avoir trié les valeurs et ainsi évite la phase de reduce. Les classes de la clé et de la valeur de sortie doivent également être définies. La JVM doit être informée que les valeurs reçues ne sont pas les LongWritable et Text par défaut. Du fait du type erasure (effacement des générics au runtime), le type de la sortie de la map ne peut pas être déterminé au runtime sans une telle configuration. jobConf.setInputFormat(NLineInputFormat.class); configure comment découper le fichier d'entrée en différentes tâches. J'ai choisi d'utiliser la classe NLineInputFormat de façon à lancer une tâche map par ligne. En configurant le processus hébergeant les tâches (le tasktracker) pour lancer autant de tâches que de coeurs disponibles sur le noeud, j'optimise les tâches map intenses en calcul en en exécutant simultanément une sur chaque core. Comme vous l'avez peut être noté, le format de la ligne en entrée 1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250 contient à la fois :

  • 1000: le nombre total de tirages
  • 250: le nombre de tirages à réaliser par cette tâche

De cette façon, je peux définir les fichiers d'entrée de telle façon qu'un tirage soit réparti entre plusieurs tâches. Dans l'exemple suivant un tirage est divisé en 4 tâches.

1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250

Chaque tâche map peut générer le nombre correspondant de tirages comme indiqué dans la ligne d'entrée. Puis le nombre total de tirages (ici 1000) est utilisé pour définir la taille du percentile à 1% (ici 0,01*1000=10). jobConf.setNumReduceTasks(1); force tous les résultats à être triés dans un fichier unique et non dans plusieurs fichiers triés séparément. Cela est nécessaire de façon à pouvoir identifier le percentile. Comme pour notre implémentation GridGain, cela implique que pour un job la tâche de reduce n'est pas distribuée. jobConf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class); configure enfin un format de sortie binaire afin de minimiser la taille.

Cela conclut les points importants de la configuration pour le job et la première implémentation. Cette-dernière est améliorable et cela sera le sujet du prochain article de cette série.