Using Hadoop for Value At Risk calculation Part 3

le 04/11/2010 par Marc Bojoly
Tags: Software Engineering

In the first part of this series, I have introduced why Hadoop framework could be useful to compute the VAR and analyze intermediate values. In the second part I have described a first implementation. One drawback of this previous implementation is that it does not take advantage of the reduce pattern. I did it by hand. I will now fully use Hadoop reduce feature.

Second implementation: extraction of the VAR by the reducer

If I simply code a reduce class with the signature public void reduce(DrawKey key, Iterator values, OutputCollector output, Reporter reporter), this method will be called one time per key scenario Id;price value so one time per draw, because each draw generates a different price. The successive calls to the reduce() method are totally independant (no shared variable). So it is not possible to know if 1% of the the values has been processed, to extract the percentile and thus the VAR in such a way. However, Hadoop provides a way to sort keys one way and group them another way and this feature will help us to solve the problem. The example given uses keys containing two integers and shows how to group by the first integer and sort them by the second one. I am proposing to use it to extract the percentile. I have created a new program for which I won't give you all the code but just a synopsis. This VarSimpleOptionReduce program defines two jobs:

  • a job with the previous mapper class and no reduce class (or an identity mapper). It allows computing and generating the call prices and store them;
  • a job with no map task and the IntermediateResultSortReducer that I will describe hereafter. Hadoop will sort the data and this reducer will extract the VAR.

The corresponding configuration is the following:

/**
 * Configuration adapted for a binary compressed result
 * @param args
 * @return
 */
private JobConf buildGenerateMapperConfBinaryResult(String[] args) {
	JobConf jobConf = new JobConf();
	//identical...
	jobConf.setMapperClass(IntermediateResultMapperBinary.class);
	jobConf.setNumReduceTasks(0);
	//...
	return jobConf;
}

/**
 * Configuration adapted for a binary compressed result
 * @param args
 * @return
 */
private JobConf buildGenerateReducerConfBinaryResult(String[] args) {
	JobConf jobConf = new JobConf();
	//...
	jobConf.setNumMapTasks(0);
	jobConf.setReducerClass(IntermediateResultSortReducer.class);
	jobConf.setPartitionerClass(FirstPartitionner.class);
        //Inputs of the reduce tasks are sorted by DrawKey
	jobConf.setOutputKeyComparatorClass(DrawKey.Comparator.class);
        //All keys equals according to the ScenarioKey.Comparator will be send to the same reduce class
	jobConf.setOutputValueGroupingComparator(ScenarioKey.Comparator.class);
        jobConf.setNumReduceTasks(1);//Use 2 if you have two scenarios
	//...
}

Map output is spread among partitions. The partitionner class runs on the output of the map task before sorting. Then each partition is processed by a different reduce task - a different call to the reduce() function-. All keys for the same scenario should be sorted together and not in two different reduce tasks. Thus the optimized configuration is to have one partition and one reduce task per scenario.

jobConf.setPartitionerClass(FirstPartitionner.class);

So the FirstPartitionner should only take into account the scenario key (scenario id and percentile size).

@SuppressWarnings("deprecation")
public class FirstPartitionner implements Partitioner<drawkey , CustomResultWritable> {
	@Override
	public int getPartition(DrawKey key, CustomResultWritable value, int numPartitions) {
		return Math.abs((((ScenarioKey)key).hashCode()) * 127)%numPartitions; 
	}
	@Override
	public void configure(JobConf job) {}
}

DrawKey.Comparator.class and ScenarioKey.Comparator.class compares instances of DrawKey and ScenarioKey. I will give you more details about that in the following paragraph. In this way, the reduce() method will take this kind of input (syntax with parenthesis is only illustrative):

1;10;0.513775910851316	( "252	84.31301373924966	120.0	0.05	0.2	0.513775910851316", "1;10;0.513775910851316	252	103.39569385168355	120.0	0.05	0.2	4.181165705822988", "1;10;0.513775910851316	252	123.11293496630553	120.0	0.05	0.2	14.414516512987014")

So reduce() method will be called one time for the scenario key 1;10 with 3 different price values in the ascending order. One drawback of the reducer signature is that the reducer will be called with a DrawKey: 1;10;0.513775910851316. The corresponding call price value 0.513775910851316 is the smallest one and should be ignored from a business perspective. The real price values computed after each draw 0.513775910851316, 4.181165705822988, 14.414516512987014 are provided in the values. That's why the call price value is duplicated both in the key (for sorting) and in value (for collecting in reduce() function).

I used a third and last refinement for my key, putting the percentile size in it. It allows each reduce() call to get that value in an easy and unique way. The final shape of my key is then scenario id;percentile size;call price. We can now see the reducer implementation:

@SuppressWarnings("deprecation")
public final class IntermediateResultSortReducer extends MapReduceBase implements Reducer<drawkey , CustomResultWritable, DrawKey, CustomResultWritable> {
	/**
	 * Prerequisite: Key are grouped by ScenarioKey and sorted by DrawKey
	 * So we have for each ScenarioKey, values sorted by Price 
	 */
	@Override
	public void reduce(DrawKey key, Iterator<CustomResultWritable> values,
			OutputCollector<drawkey , CustomResultWritable> output, Reporter reporter)
			throws IOException {
		//Collect only the s
		CustomResultWritable previousResult = new CustomResultWritable();
		CustomResultWritable currentResult = new CustomResultWritable();
		int countDown = key.getPercentileSize().get();
		while(countDown >= 0 && values.hasNext()) {
			if(countDown == 0) {
				output.collect(key, values.next());
				reporter.setStatus("Reduce ended");
				return;//Exit
			}
			else {
				currentResult = values.next();
				System.out.println(currentResult);
				if(currentResult.getPrice() < previousResult.getPrice()) {
					reporter.setStatus("Data are not sorted");
					System.err.println(String.format("Previous price: %s, current price: %d", previousResult.getPrice(), currentResult.getPrice()));
				}
				
			}
			countDown--;
			if(!values.hasNext()) { 
				String err = String.format("Reducer: Key %s has only %d values with an expected percentile of %d", key.toString(), key.getPercentileSize().get()-countDown, key.getPercentileSize().get());
				System.err.println(err);
				reporter.setStatus(err);
			}
		}
	}
}

The reduce() function reads the percentile size - lets name it p - and collects the p th. values. The output file contains the VAR 0.191 and the corresponding parameters.

1;10000;4.1013116251405945E-10	252	78.12793687367298	120.0	0.05	0.2	0.19109501386036332

So, like in the first implementation, I use efficiently benefit of the sort feature of Hadoop. But furthermore:

  • I use a partitionner allowing me to create one partition per scenario. It allows Hadoop to distribute each input file for reduce phase on a different node - each partition being stored in a different file;
  • I use a SecondarySortPartitionner allowing me to get all the values for one scenario in the same reduce task. That way, Hadoop can parallelize the reduce phase for each scenario by instantiating several reduce tasks.

That concludes the second implementation for VAR calculation on Hadoop and that part of this series. I used other optimizations in my code that I will explain you in the next part. And in the last articles of this series, I will give you some performances figures, and I will conclude why it is in the interest of using Hadoop for VAR calculation.