Using Hadoop for Value At Risk calculation Part 4

le 05/11/2010 par Marc Bojoly
Tags: Software Engineering

In the first part of this series, I have introduced why Hadoop framework could be useful to compute the VAR and analyze intermediate values. In the second part and in the third part I have given two concrete implementations of VAR calculation with Hadoop. I will now give you some details about the optimizations used in those implementations.

Writable, Comparable, Comparator

To have a full overview of the implementation, I will give you the content of DrawKey and CustomResultWritable class that you have seen in the signatures. Hadoop is a framework designed for file processing and sorting. Serialization and comparison are two key points to optimize. For that purpose Hadoop defines a custom serialization mechanism. Writable types like LongWritable provide both a constructor and a set() method. Existing objects can be fed by values with this set(). This reuse of objects prevents instanciating a large number of new objects (in my previous article I noticed that object instantiation was a time consuming phase). Serialization format is specialized and optimized for Hadoop requirements. For example it provides "out of the box" variable length values with LongWritable class. In order to preserve the complete structure of my result with combination of double and int fields, I need to define a custom class that implements the Writable interface in order to fit with the map() and reduce() signature. So this CustomResultWritable implementation shows how to use Hadoop serialization mechanism.

public class CustomResultWritable implements Writable {
	private static String SEPARATOR = "\t";
	/**  Time in days before the maturity */
	private int t;
	/** Spot (actual price) of the underlying */
	private double s0;
	/** Strike (target price) of the underlying */
	private double k;
	/** Risk free interest rate */
	private double r;
	/** Implicit volatility of the Equity	 */
	private double sigma;
	/** Computed prices with those parameters	 */
	private double price;
        
        //Getter and setters
	
	/** Default constructor to allow setting by reference	 */
	public CustomResultWritable() {}

	public CustomResultWritable(int t, double s0, double k, double r, double sigma,
			double price) {
		super();
		this.t = t;
		this.s0 = s0;
		this.k = k;
		this.r = r;
		this.sigma = sigma;
		this.price = price;
	}

	public void set(int t, double s0, double k, double r, double sigma,
			double price) {
		this.t = t;
		this.s0 = s0;
		this.k = k;
		this.r = r;
		this.sigma = sigma;
		this.price = price;
	}

	@Override
	public void readFields(DataInput input) throws IOException {
		this.t = input.readInt();
		this.s0 = input.readDouble();
		this.k = input.readDouble();
		this.r = input.readDouble();
		this.sigma = input.readDouble();
		this.price = input.readDouble();
	}

	@Override
	public void write(DataOutput output) throws IOException {
		output.writeInt(this.t);
		output.writeDouble(this.s0);
		output.writeDouble(this.k);
		output.writeDouble(this.r);
		output.writeDouble(this.sigma);
		output.writeDouble(this.price);
	}

	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		builder.append(t).append(SEPARATOR).append(s0).append(SEPARATOR)
				.append(k).append(SEPARATOR).append(r).append(SEPARATOR)
				.append(sigma).append(SEPARATOR).append(price);
		return builder.toString();
	}

}

toString() method provides a means to write it in text form which is required for a text output format. You should notice that Hadoop provides predefined types for encapsulating Java primitives and String as well as a more generic mechanism with ObjectWritable that allows to combine them. However, since this mechanism is more generic and quite complex, I wrote directly a custom writable type.

For the comparison purpose, Hadoop allows custom developments to optimize it too. The default implementation for my key class is the following. But just before I show you the code, I need to precise that I have a hierarchy of keys ScenarioKey extends DrawKey. I have explained in my second implementation where they are used and why I have put the percentile size in DrawKey. I give you the ScenarioKey implementation, the DrawKey one is very close to it.

public class ScenarioKey implements WritableComparable<scenariokey> {

	// For toString() implementation
	protected static final String SEPARATOR = ";";

	/**
	 * Identifer the Scenario (ParametersValueGeneratorConfiguration)
	 */
	private VLongWritable id;
	/**
	 * Size of the percentile = totalNumberOfDraws * varPrecision
	 */
	private VIntWritable percentileSize;

	public ScenarioKey() {
		set(new VLongWritable(), new VIntWritable());
	}

	public ScenarioKey(long id, int percentileSize) {
		super();
		set(new VLongWritable(id), new VIntWritable(percentileSize));
	}

	public ScenarioKey(VLongWritable id, VIntWritable percentileSize) {
		super();
		set(id, percentileSize);
	}

	public void set(VLongWritable id, VIntWritable percentileSize) {
		this.id = id;
		this.percentileSize = percentileSize;
	}

	public VLongWritable getId() {
		return id;
	}
	
	public VIntWritable getPercentileSize() {
		return percentileSize;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((id == null) ? 0 : id.hashCode());
		result = prime * result
				+ ((percentileSize == null) ? 0 : percentileSize.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		ScenarioKey other = (ScenarioKey) obj;
		if (id == null) {
			if (other.id != null)
				return false;
		} else if (!id.equals(other.id))
			return false;
		if (percentileSize == null) {
			if (other.percentileSize != null)
				return false;
		} else if (!percentileSize.equals(other.percentileSize))
			return false;
		return true;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		id.readFields(in);
		percentileSize.readFields(in);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		id.write(out);
		percentileSize.write(out);
	}

	@Override
	public int compareTo(ScenarioKey o) {
		return ScenarioKey.compare(this, o);
	}
	
	/**
	 * Compares two scenario keys
	 * This method is required for being able to compare two subclass
	 * of Scenario key because cast don't allow to refer to an overriden method 
	 * @param k1
	 * @param k2
	 * @return
	 */
	public static int compare(ScenarioKey k1, ScenarioKey k2)  {
		if (k1 == null || k2 == null) {
			throw new NullPointerException();
		}
		int cmp = k1.id.compareTo(k2.id);
		if (cmp != 0) {
			return cmp;
		} else {
			return  k1.percentileSize.compareTo(k2.percentileSize);
		}
	}

	@Override
	public String toString() {
		return this.id + SEPARATOR + this.percentileSize;
	}

}

The writable implementation is close to the CustomResultWritable with one notable difference: fields are not Java primitives but VLongWritable. It allows minimizing the quantity of written data. WritableComparable just requires to implement compareTo() method. The implementation reads and compares only the required fields to do it.

Because sorting and comparing date is one of the most important task of Hadoop (see the Map/Reduce pattern description in the first article), an optimization possibility is provided by defining a RawComparator. This class compares directly bytes representation of the objects preventing to deserialize all the data before sorting them. In practice, I deserialize my scenario id first and return immediately if they are different. I implemented it as an internal class of ScenarioKey.

static final class Comparator extends WritableComparator {

		protected Comparator() {
			super(ScenarioKey.class, true);
		}
		
		@SuppressWarnings("unchecked")
		@Override
		public final int compare(WritableComparable w1, WritableComparable w2) {
			ScenarioKey d1 = (ScenarioKey)w1;
			ScenarioKey d2 = (ScenarioKey)w2;
			int cmp = ScenarioKey.compare(d1, d2);
			return cmp;
		}

		@Override
		public final int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
				int l2) {
			// si = index of first relevant byte
			// li = total length of the byte array to compare
			long thisId;
			long thatId;
			try {
				thisId = readVLong(b1, s1);
			}
			catch(IOException ioex) {
				throw new IllegalArgumentException("First VLong is invalid", ioex);
			}
			try {
				thatId = readVLong(b2, s2);
			}
			catch(IOException ioex) {
				throw new IllegalArgumentException("Second VLong is invalid", ioex);
			}
			
			int cmp = (thisId < thatId ? -1
					: (thisId == thatId ? 0 : 1));
			if (cmp == 0) {
				int idL1 = WritableUtils.decodeVIntSize(b1[s1]);
				int idL2 = WritableUtils.decodeVIntSize(b2[s2]);
				//PercentileSize
				int thisPs = 0;
				int thatPs = 0;
				try {
					thisPs = readVInt(b1, s1+idL1);
				}
				catch(IOException ioex) {
					throw new IllegalArgumentException("First VInt is invalid", ioex);
				}
				try {
					thatPs = readVInt(b2, s2+idL2);
				}
				catch(IOException ioex) {
					throw new IllegalArgumentException("Second VInt is invalid", ioex);
				}
				cmp =  (thisPs < thatPs ? -1
						: (thisPs == thatPs ? 0 : 1));
			}
			return cmp;
		}//End compare
	}//End Comparator

This comparator is registered through this static block:

static {
    WritableComparator.define(ScenarioKey.class, new ScenarioKey.Comparator());
}

Hadoop framework provides a helper class:WritableUtils to help decode byte representations but the result still remains a very technical code.

That concludes the code of implementations I used with Hadoop. I will give you performance figures showing in the last part of that article. But, before that, I will show how Hadoop can be used for Business Intelligence too in the next part of that series.