Utiliser Hadoop pour le calcul de la Value At Risk Partie 4

le 05/11/2010 par Marc Bojoly
Tags: Software Engineering

Dans le premier article de cette série, j'ai introduit pourquoi le framework Hadoop peut être utile pour calculer la VAR et analyser les données intermédiaires. Dans le second et le troisième article j'ai donné deux implémentations concrètes du calcul de la VAR avec Hadoop. Je vais maintenant m'attarder sur certains détails d'optimisation utilisés dans ces implémentations.

Writable, Comparable, Comparator

Pour avoir un aperçu complet de cette implémentation, je vais vous montrer le contenu des classes DrawKey et CustomResultWritable que vous avez vues dans les signatures. Hadoop est un framework conçu pour le traitement de fichiers et le tri. La sérialisation et l'algorithme de comparaisons sont deux points essentiels pour l'optimisation. Pour ce faire, Hadoop définit un mécanisme de sérialisation propriétaire. Des types Writable (sérialisables) comme LongWritable fournissent à la fois un constructeur et une méthode set(). Des objets existants peuvent être alimentés avec de nouvelles valeurs par la méthode set(). Cette réutilisation d'objets évite d'instancier un grand nombre de nouveaux objets (dans mon précédent article j'avais noté que l'instanciation était une phase très coûteuse). Le format de sérialisation est spécialisé et optimisé pour les besoins d'Hadoop. Il fournit par exemple "out of the box" un stockage de longueur variable avec la classe LongWritable. De façon à préserver la structure complète de mon résultat (result) qui est une combinaison de champs double et int, j'ai besoin de définir une classe personnalisée qui implémente l'interface Writable de façon à correspondre aux signatures des méthodes map() et reduce(). Cette implémentation CustomResultWritable montre comment utiliser le mécanisme de sérialisation de Hadoop.

public class CustomResultWritable implements Writable {
	private static String SEPARATOR = "\t";
	/**  Time in days before the maturity */
	private int t;
	/** Spot (actual price) of the underlying */
	private double s0;
	/** Strike (target price) of the underlying */
	private double k;
	/** Risk free interest rate */
	private double r;
	/** Implicit volatility of the Equity	 */
	private double sigma;
	/** Computed prices with those parameters	 */
	private double price;
        
        //Getter and setters
	
	/** Default constructor to allow setting by reference	 */
	public CustomResultWritable() {}

	public CustomResultWritable(int t, double s0, double k, double r, double sigma,
			double price) {
		super();
		this.t = t;
		this.s0 = s0;
		this.k = k;
		this.r = r;
		this.sigma = sigma;
		this.price = price;
	}

	public void set(int t, double s0, double k, double r, double sigma,
			double price) {
		this.t = t;
		this.s0 = s0;
		this.k = k;
		this.r = r;
		this.sigma = sigma;
		this.price = price;
	}

	@Override
	public void readFields(DataInput input) throws IOException {
		this.t = input.readInt();
		this.s0 = input.readDouble();
		this.k = input.readDouble();
		this.r = input.readDouble();
		this.sigma = input.readDouble();
		this.price = input.readDouble();
	}

	@Override
	public void write(DataOutput output) throws IOException {
		output.writeInt(this.t);
		output.writeDouble(this.s0);
		output.writeDouble(this.k);
		output.writeDouble(this.r);
		output.writeDouble(this.sigma);
		output.writeDouble(this.price);
	}

	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		builder.append(t).append(SEPARATOR).append(s0).append(SEPARATOR)
				.append(k).append(SEPARATOR).append(r).append(SEPARATOR)
				.append(sigma).append(SEPARATOR).append(price);
		return builder.toString();
	}

}

La méthode toString() fournit un moyen d'écriture au format texte ce qui est nécessaire pour un format de sortie texte (text output format). Vous pouvez noter qu'Hadoop fournit des types prédéfinis pour encapsuler les types Java primitifs, les String ainsi qu'un mécanisme plus générique ObjectWritable qui permet de les combiner. Cependant, comme ce mécanisme est plus générique et un peu complexe, j'ai écrit directement un type writable personnalisé.

Au niveau de la comparaison, Hadoop permet également des développements spécifiques pour en améliorer les performances. Je vais donc vous décrire l'implémentation de ma clé. Mais avant de détailler le code, j'ai besoin de préciser que j'ai une hiérarchie de clés : ScenarioKey extend DrawKey. J'ai expliqué dans ma seconde implémentation où elles sont utilisées et pourquoi j'ai placé la taille du percentile dans la DrawKey. Je vous présente l'implémentation de ScenarioKey celle de DrawKey est très proche.

public class ScenarioKey implements WritableComparable<scenariokey> {

	// For toString() implementation
	protected static final String SEPARATOR = ";";

	/**
	 * Identifer the Scenario (ParametersValueGeneratorConfiguration)
	 */
	private VLongWritable id;
	/**
	 * Size of the percentile = totalNumberOfDraws * varPrecision
	 */
	private VIntWritable percentileSize;

	public ScenarioKey() {
		set(new VLongWritable(), new VIntWritable());
	}

	public ScenarioKey(long id, int percentileSize) {
		super();
		set(new VLongWritable(id), new VIntWritable(percentileSize));
	}

	public ScenarioKey(VLongWritable id, VIntWritable percentileSize) {
		super();
		set(id, percentileSize);
	}

	public void set(VLongWritable id, VIntWritable percentileSize) {
		this.id = id;
		this.percentileSize = percentileSize;
	}

	public VLongWritable getId() {
		return id;
	}
	
	public VIntWritable getPercentileSize() {
		return percentileSize;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((id == null) ? 0 : id.hashCode());
		result = prime * result
				+ ((percentileSize == null) ? 0 : percentileSize.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		ScenarioKey other = (ScenarioKey) obj;
		if (id == null) {
			if (other.id != null)
				return false;
		} else if (!id.equals(other.id))
			return false;
		if (percentileSize == null) {
			if (other.percentileSize != null)
				return false;
		} else if (!percentileSize.equals(other.percentileSize))
			return false;
		return true;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		id.readFields(in);
		percentileSize.readFields(in);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		id.write(out);
		percentileSize.write(out);
	}

	@Override
	public int compareTo(ScenarioKey o) {
		return ScenarioKey.compare(this, o);
	}
	
	/**
	 * Compares two scenario keys
	 * This method is required for being able to compare two subclass
	 * of Scenario key because cast don't allow to refer to an overriden method 
	 * @param k1
	 * @param k2
	 * @return
	 */
	public static int compare(ScenarioKey k1, ScenarioKey k2)  {
		if (k1 == null || k2 == null) {
			throw new NullPointerException();
		}
		int cmp = k1.id.compareTo(k2.id);
		if (cmp != 0) {
			return cmp;
		} else {
			return  k1.percentileSize.compareTo(k2.percentileSize);
		}
	}

	@Override
	public String toString() {
		return this.id + SEPARATOR + this.percentileSize;
	}

}

Cette implémentation de Writable est très proche de CustomResultWritable avec une différence notable: les champs ne sont pas des types primitifs Java mais des VLongWritable. Cela permet de minimiser la quantité de données écrites WritableComparable nécessite simplement d'implémenter la méthode compareTo(). L'implémentation lit et compare seulement les champs nécessaires pour ce faire.

Parce que trier et comparer est l'une des tâches les plus importantes d'Hadoop (voir l'explication du pattern Map/Reduce dans le premier article), une possibilité d'optimisation est fournie en définissant un RawComparator. Cette classe compare directement les octets représentant les objets évitant ainsi de désérialiser toutes les données avant de les trier. En pratique, je commence par désérialiser mon identifiant de scénario et je renvoie la réponse immédiatement s'ils sont différents. Je l'ai implémenté comme une classe interne de ScenarioKey.

static final class Comparator extends WritableComparator {

		protected Comparator() {
			super(ScenarioKey.class, true);
		}
		
		@SuppressWarnings("unchecked")
		@Override
		public final int compare(WritableComparable w1, WritableComparable w2) {
			ScenarioKey d1 = (ScenarioKey)w1;
			ScenarioKey d2 = (ScenarioKey)w2;
			int cmp = ScenarioKey.compare(d1, d2);
			return cmp;
		}

		@Override
		public final int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
				int l2) {
			// si = index of first relevant byte
			// li = total length of the byte array to compare
			long thisId;
			long thatId;
			try {
				thisId = readVLong(b1, s1);
			}
			catch(IOException ioex) {
				throw new IllegalArgumentException("First VLong is invalid", ioex);
			}
			try {
				thatId = readVLong(b2, s2);
			}
			catch(IOException ioex) {
				throw new IllegalArgumentException("Second VLong is invalid", ioex);
			}
			
			int cmp = (thisId < thatId ? -1
					: (thisId == thatId ? 0 : 1));
			if (cmp == 0) {
				int idL1 = WritableUtils.decodeVIntSize(b1[s1]);
				int idL2 = WritableUtils.decodeVIntSize(b2[s2]);
				//PercentileSize
				int thisPs = 0;
				int thatPs = 0;
				try {
					thisPs = readVInt(b1, s1+idL1);
				}
				catch(IOException ioex) {
					throw new IllegalArgumentException("First VInt is invalid", ioex);
				}
				try {
					thatPs = readVInt(b2, s2+idL2);
				}
				catch(IOException ioex) {
					throw new IllegalArgumentException("Second VInt is invalid", ioex);
				}
				cmp =  (thisPs < thatPs ? -1
						: (thisPs == thatPs ? 0 : 1));
			}
			return cmp;
		}//End compare
	}//End Comparator

Ce comparateur est enregistré par ce bloc statique :

static {
    WritableComparator.define(ScenarioKey.class, new ScenarioKey.Comparator());
}

Le framework Hadoop fournit une classe d'aide : WritableUtils pour aider à décoder la représentation en octets mais le résultat reste très technique.

Cela conclut le code d'implémentation que j'ai utilisé avec Hadoop. Je vous donnerai des chiffres de performance dans le dernier article de cette série. Mais, avant cela, je vais vous montrer dans le prochain article comment Hadoop peut également être utilisé pour réaliser de l'analyse décisionnelle sur les données intermédiaires.