Using Hadoop for Value At Risk calculation Part 2

le 03/11/2010 par Marc Bojoly
Tags: Software Engineering

In the first part of this series, I have introduced why Hadoop framework could be useful to compute the VAR and analyze intermediate values. In that second part I will give you a first concrete implementation of the VAR calculation with Hadoop.

Hadoop map implementation

The implementation is an adaptation of the code described in my previous article. The classes used to compute the VAR are the same: OptionPricer, Parameters and ParametersValueGenerator. The following classes were added for Hadoop purpose: InputParser (parsing of the input file), ScenarioKey and DrawKey, IntermediateResultMapper, VarSimpleOptionDirectSearch.VarSimpleOptionDriver was added for configuration purpose and VarSimpleOptionDirectSearch as the launcher.

IntermediateResultMapper is an implementation of the map requirements as described in the first article and is very close to the GridGain implementation:

class IntermediateResultMapper extends MapReduceBase implements
		Mapper<longwritable , Text, DrawKey, Text> {
	
	private static Logger log = LoggerFactory.getLogger(IntermediateResultMapper.class);
	
	private enum HadoopCounters {
		SKIPPED_LINE,
		COMPUTING_ERROR
	}

	@Override
	public void map(final LongWritable key, final Text value,
			OutputCollector<drawkey , Text> output,
			Reporter reporter) throws IOException {
		String line = value.toString();
		InputStruct is = InputParser.parse(line);
		if (is != null) {
			Parameters params = new Parameters(
					is.t, is.s0, is.k, is.r, is.sigma);
			ParametersValueGenerator valueGenerator = new ParametersValueGenerator(is.historicalVolatility, 0, 0);
			OptionPricer optionPricer = new OptionPricer(params);
			valueGenerator.initializeReference(params);
			valueGenerator.intializeRandomGenerator();
			try {
				int percentileSize = (int)(is.drawsNb*(1-is.varPrecision));
				DrawKey dk = new DrawKey(is.id, percentileSize, 0);
				for(int i = 0 ; i < is.jobDrawsNb ; i++) {
					final Result result = computePriceScenario(optionPricer, valueGenerator);
					dk.setValue(result.getPrice());//Optimization: set by reference to avoid object creation
					output.collect(dk, new Text(result.toString()));
				}
			} catch (Exception e) {
				reporter.incrCounter(HadoopCounters.COMPUTING_ERROR, 1);
				log.error("Error computing var", e);
			}
		} else {
			reporter.incrCounter(HadoopCounters.SKIPPED_LINE, 1);
		}
	}
	
	/**
	 * Compute the price of the call based on a scenario
	 * generated by a draw
	 */
	private Result computePriceScenario(final OptionPricer optionPricer,
		final ParametersValueGenerator valueGenerator) throws MathException {
		valueGenerator.setNewGeneratedParameters(optionPricer.getParameters());
		final double price = optionPricer.computeCall();
		return new Result(optionPricer.getParameters(),price);
	}
}

One key point is the signature of that method: public void map(final LongWritable key, final Text value, OutputCollector output, Reporter reporter). The input file is read as a (LongWritable, Text) pairs. With a simple file, the key (LongWritable) is the byte offset of the beginning the line and the value the content of the line. For example:

0     1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
45   1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
...

The second line begins at character 45. This byte offset provides easily a unique key. It is ignored in my implementation. The map produces (DrawKey, Text) pairs as described below.

1;10;0.513775910851316	252	84.31301373924966	120.0	0.05	0.2	0.513775910851316
...

The key contains the scenario id, the total number of draws and the price value. The values contain the price and the parameters. I will explain a bit later why this choice was important for a further optimization. Thus, all my draws are computed and generated like in the GridGain implementation but directly stored in a file system. So it allows two things

  • To compute the VAR;
  • To process these intermediate results to extract useful information.

I will describe these two points.

Reduce implementation

Output of the map task could be considered like the job results in GridGain implementation (see my second article). I could have used exactly the same algorithm with an in memory sort. However it won't have taken benefit from Hadoop map/reduce implementation.

Hadoop map/reduce implementation is based on the sorting keys. Map tasks produce files.

key1   valueA  
key2   valueB
key1   valueC
...

These files are sorted by key. Then all the lines with the same key are combined in a pair (key, list of values) form.

key1 (valueA, valueC) key2 (valueB)

Then reduce tasks are fed with such values. The signature of the corresponding reduce function is reduce(DrawKey k, Iterator values, OutputCollector, Reporter reporter). So sorting by key is the heart of Hadoop map/reduce implementation because it allows transforming map phase output into reduce phase input. I will not immediately give you "the" reduce implementation because several choices can be made. Before that, I will discuss the determination of the key which is a fundamental point in the definition of the Hadoop implementation.

Determination of the key

To perform VAR calculation, I need to have the list of all computed prices, sort them and extract the percentile: the 10% lowest values. Finally I have to identify the highest value of that percentile (see my first article for more details). Processing of a map/reduce job by Hadoop can be applyied simultaneously to several sets of data. In order to provide one result for each set, Hadoop processing follows these steps:

  • Applying the map() function to all the values;
  • Sort data by key. Keys identify the sets of data so grouping prepare splitting of the set;
  • Send all the values corresponding to a given key to a reduce()function

For my implementation, I will take benefit from the sort provided by Hadoop process. Indeed, by using a simple Treeset, like in the GridGain implementation, I'm limited by the Java heap size. By contrast, Hadoop provides a finely tuned mixed in memory and on disk sort algorithm. To give a clue of Hadoop performance on sorting dataset, I refer to the Terabyte sort: a benchmark consisting in sorting 1 TB of data and which Hadoop has won.

First reduce implementation: Identity reducer and VAR extraction by the main program

So, in order to use Hadoop sorting capability, I use DrawKey containing scenario id;call price value. That way, the input of the reduce task contains for each successive scenario the sorted prices of the job. At the end of the Hadoop job, I read the sorted results file through the distributed file system API. I iterate through the 1% lowest value and identify the VAR. It is my first implementation. It is configured in Hadoop through the following code:

@SuppressWarnings("deprecation")
public class VarSimpleOptionDirectSearch {

	static class VarSimpleOptionDriver extends Configured implements Tool {
		private static Log log = LogFactory.getLog(VarSimpleOptionDriver.class);

		@Override
		public int run(String[] args) throws Exception {
			if (args.length != 2) {
				log.error(String.format(
						"Usage %s [generic options] <input /> <output>\n",
						getClass().getSimpleName()));
				ToolRunner.printGenericCommandUsage(System.err);
				return -1;
			}

			// Generate the prices for each draw and sort them
			JobConf generateMapConf = buildGenerateMapperConf(args);

			double startTime = System.nanoTime();
			JobClient.runJob(generateMapConf);

			Path outputPath = new Path(args[1]);
			SequenceFile.Reader[] readers = org.apache.hadoop.mapred.SequenceFileOutputFormat
					.getReaders(generateMapConf, outputPath);
			long id;
			int percentileSize;
			for (SequenceFile.Reader reader : readers) {
				try {
					DrawKey key = new DrawKey();
					Writable value = new CustomResultWritable();
					reader.next(key, value);
					do {
						id = key.getId().get();
						percentileSize = key.getPercentileSize().get();
						int cpt = 0; // Count number of draws
						log.info(key.toString() + " cpt:" + cpt);
						do { // For each value
							if (cpt == percentileSize) {
								log.info("VAR: cpt" + cpt + "\t"
										+ key.toString() + "\t"
										+ value.toString());
							} else {
								//Continue silently
							}
							cpt++;
						} while (reader.next(key, value)
								&& id == key.getId().get()
								&& percentileSize == key
										.getPercentileSize().get());
					} while(reader.next(key, value)); // End for each (id, percentileSize)
				} finally {
					reader.close();
				}
			}
			double computeTime = System.nanoTime() - startTime;
			log.info("ComputeTime " + computeTime/1000000 + " (ms.)");

			return 0;
		}

		private JobConf buildGenerateMapperConf(String[] args) {
			JobConf jobConf = new JobConf();
			//org.apache.hadoop.mapred.SequenceFileOutputFormat.getReaders()
			//tries to read the _logs directory as part of the SequenceFile
			//which leads to an error
			//Putting the history in another folder bypasses the problem
			jobConf.set("hadoop.job.history.user.location", "job_history");
			jobConf.setJarByClass(VarSimpleOptionDirectSearch.class);

			FileInputFormat.addInputPath(jobConf, new Path(args[0]));
			FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));

			// Compute intensive task: each mapper will receive a line
			jobConf.setInputFormat(NLineInputFormat.class);
			jobConf.setMapperClass(IntermediateResultMapperBinary.class);
			jobConf.setReducerClass(IdentityReducer.class);
			jobConf.setNumReduceTasks(1);

			// Set no limit to the number of task per JVM in order to take
			// advantage of HotSpot runtime optimizations after long runs
			jobConf.setNumTasksToExecutePerJvm(-1);
			jobConf.setOutputKeyClass(DrawKey.class);
			jobConf.setOutputValueClass(CustomResultWritable.class);
			jobConf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
			return jobConf;
		}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new VarSimpleOptionDriver(), args);
		System.exit(exitCode);
	}

}// End VarSimpleOption class

Please note that I use Hadoop 0.20.2 with the old style API. Hadoop is launched by the main method. A Configured based class is provided to help launching the job. The run() method configures the job, launches it and iterates the results in order to get the VAR value. I will give some explanations about the jobs configuration.

jobConf.setMapperClass(IntermediateResultMapperBinary.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(DrawKey.class);
jobConf.setOutputValueClass(CustomResultWritable.class);

Map and reduce tasks are configured that way. I use an IdentityReducer that just copies the map output after sorting and so avoids the reduce phase. Output key and value classes have to be defined too. JVM should be informed that the received values are not the default LongWritable and Text ones. Due to generic type erasure, the type of the map output cannot be inferred at runtime without such configuration. jobConf.setInputFormat(NLineInputFormat.class); configures how to split the input file between tasks. I chose NLineInputFormat in order to launch one map task for each line. By configuring the tasktracker to launch as many tasks as available cores on the node, I optimize the compute intensive map tasks by executing one map task per core. As you might have noticed the input format line 1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250 contains both

  • 1000: the total number of draws
  • 250: the number of draws for that task

That way, I can define input files so that one draw is split between several map tasks. In the following example one draw is divided into 4 tasks.

1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250
1;252;120.0;120.0;0.05;0.2;0.15;1000;0.99;250

Each map task can generate the corresponding number of draws as provided in the input line. And the total number of draws (here 1000) is used to define the size of the percentile at 1% (here 0.01*1000=10) jobConf.setNumReduceTasks(1); forces having all results sorted together in one single file and not in several files sorted separately. It is required in order to be able to identify the percentile. As for GridGain implementation, it implies that for one job the reduce phase is not distributed. jobConf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class); configures a binary output format to reduce the size.

That concludes the most important configuration properties for the job and the first implementation. That first implementation is still improvable and that will be the subject of the next part of this series.