Data Grid or NoSQL ? same, same but different…

For three years now, NoSQL as a piece of technologies for Big Data has spread over the world and is challenging the centralized world of RDBMS. The space of distributed storages is yet not new and banks, online gaming platforms are using for several years technologies called “data grid” to address latencies and throughput issues. And to be completely franc, “Big Data” is not far from being the “new SOA”: a radical paradigm shift lost in the middle of commercial buzz words but that’s another story…

What are the common points? The main differences?

(more…)

Scribe : a way to aggregate data and why not, to directly fill the HDFS?

HDFS is a distributed file system and quickly raise an issue : how to fill this file system with all my data? There are several options that go from batch import to Straight Through Processing.

  • Bulk load style. The first one is to keep collecting data on local file system and importing them by vacation. The second one is to use an ETL. Pentaho has announced support of Hadoop for Data Integration product. The first tests we conducted lead us to think this works much better to extract data from Hadoop (using Hive) than to import data. Yet this is just a matter of time before Pentaho fixes the few issues we encountered. The third one is to use solution like Sqoop. Sqoop extracts (or imports) data from your RDBMS using Map/Reduce algorithm. I hope we will be able to talk about that solution very soon.
  • Straight Through Processing style. In that domain, you can look at solutions like Flume, Chukwa (which is part of Apache Hadoop distribution) or Scribe. In brief, you collect and agregate data in a more STP style, from different sources, different applications, different machines. They globally work the same way than Scribe but solutions like Flume or Chukwa provide more connectors than Scribe in a sense you can, for instance, “tail” a log file etc etc…Chukwa is also much easily integrated with the “Hadoop stack” than what Scribe could be.

Scribe : a kind of distributed collector

(more…)

Scribe installation

Scribe installation is a little bit tricky (I need to precise I am not what we can call a C++ compilation expert and thanks to David for his help…).
Here is so how I installed Scribe on my Ubuntu (Ubuntu 10.04 LTS – the Lucid Lynx – released in April 2010)
(more…)

How to “crunch” your data stored in HDFS?

HDFS stores huge amount of data but storing it is worthless if you cannot analyse it and obtain information.

Option #1 : Hadoop : the Map/Reduce engine

Hadoop Overview

Hadoop is a Map/Reduce framework that works on HDFS or on HBase. The main idea is to decompose a job into several and identical tasks that can be executed closer to the data (on the DataNode). In addition, each task is parallelized : the Map phase. Then all these intermediate results are merged into one result : the Reduce phase.

In Hadoop, The JobTracker (a java process) is responsible for monitoring the job, (more…)

Hadoop Distributed File System : Overview & Configuration

Hadoop Distributed File System can be considered as a standard file system butt it is distributed. So from the client point of view, he sees a standard file system (the one he can have on your laptop) but behind this, the file system actually runs on several machines. Thus, HDFS implements fail-over using data replication and has been designed to manipulate, store large data sets (in large file) in a write-one-read-many access model for files.
(more…)

Event Sourcing & noSQL

I saw the talks of Greg Young about CQRS & especially “Event Sourcing” a couple of times and each time, I really really tell myself this pattern is just “génial” (the way we say it in french) even if Martin Fowler wrote about it in 2005 and deals in details with implementation concerns and issues (especially in the cases of integration with external systems).

Event Sourcing : stop thinking of your datas as a stock but rather as a list of events…
(more…)

Let’s play with Cassandra… (Part 3/3)

In this part, we will see a lot of Java code (the API exists in several other languages) and look at the client part of Cassandra.

Use Case #0: Open and close a connection to any node of your Cluster

Cassandra is now accessed using Thrift. The following code opens a connection to the specified node.

1
2
3
4
5
6
TTransport tr = new TSocket("192.168.216.128", 9160);
TProtocol proto = new TBinaryProtocol(tr);
tr.open();
Cassandra.Client cassandraClient = new Cassandra.Client(proto);
...
tr.close();

As I told previously, the default API does not provide any pool connections mechanisms that would have (1) the capacity to close and reopen connections in case a node has failed, (2) the capacity to load-balance requests among all the nodes of the cluster and (3) the capacity to automatically requesting another node in case the first attempt fails.

Use Case #1: Insert a customer

The following code insert a customer in the storage space (note that the object aCustomer is the object you want to persist)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Map<String , List< ColumnOrSuperColumn > > insertClientDataMap = new HashMap< string ,List<ColumnOrSuperColumn > >();
List< ColumnOrSuperColumn > clientRowData = new ArrayList< ColumnOrSuperColumn >();
 
ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setColumn(new Column("fullName".getBytes(UTF8),  
aCustomer.getName().getBytes(UTF8), timestamp));
clientRowData.add(columnOrSuperColumn);
 
columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setColumn(new Column("age".getBytes(UTF8),  
aCustomer.getAge().toString().getBytes(UTF8), timestamp));
clientRowData.add(columnOrSuperColumn);
 
columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setColumn(new Column("accountIds".getBytes(UTF8),  
aCustomer.getAccountIds().getBytes(UTF8), timestamp));
clientRowData.add(columnOrSuperColumn);

As you can read, the first line is in fact a Java representation of the structure: a map in which a row is identified by its key, and the value is a list of columns. The rest of the code only create and append ColumnOrSuperColumn objects. Here, the columns have the following names: fullName, age, accountIds. You will also notice that when you create the column, you specify the timestamp the column is created. Remember that this timestamp will be used for “read-repair” and so that all your clients must be synchronized (using a NTP for instance)

1
insertClientDataMap.put("customers", clientRowData);

The above lines put the list of Columns into the ColumnFamily named customers (so you can add several ColumnFamily in one time with the batch_insert method). Then, the following line inserts the customer into the Cassandra Storage. You need so to specify the keyspace, the row key (here the customer name), the Column family you want to insert and the Consistency Level you have chosen for this data.

1
cassandraClient.batch_insert("myBank", aCustomer.getName(), insertClientDataMap,  ConsistencyLevel.DCQUORUM);

Use Case #2: Insert operations for an account

Inserting an operation is almost the same code instead we are using SuperColumn.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Map< string , List< ColumnOrSuperColumn > > insertOperationDataMap = new HashMap< string , List< ColumnOrSuperColumn > >();
List< ColumnOrSuperColumn> operationRowData = new ArrayList< ColumnOrSuperColumn >();
List< Column > columns = new ArrayList< Column >();
 
// THESE ARE THE SUPERCOLUMN COLUMNS
columns.add(new Column("amount".getBytes(UTF8),  
aBankOperation.getAmount().getBytes(UTF8), timestamp));
columns.add(new Column("label".getBytes(UTF8),  
aBankOperation.getLabel().getBytes(UTF8), timestamp));
if (aBankOperation.getType() != null) {
	columns.add(new Column("type".getBytes(UTF8),  
aBankOperation.getType().getBytes(UTF8), timestamp));
}
For now, there is nothing new. A list of Columns is created with three columns: amount, label and type (withdrawal, transfer...). 
// here is a superColumn
SuperColumn superColumn = new  
SuperColumn(CassandraUUIDHelper.asByteArray(CassandraUUIDHelper.getTimeUUID()),  
columns);
ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setSuper_column(superColumn);
operationRowData.add(columnOrSuperColumn);

This case is different from the previous one. Instead of adding the previously defined Columns to the row, we create a SuperColumn with a dynamic (and time-based UUID) name…Quite dynamic isn’t it? Then, the three columns are added to the super column itself.
The end of the code is similar to the previous one. The row is added to the ColumnFamily named operations and then associated to the current customer account id.

1
2
3
4
// put row data dans la columnFamily operations
insertOperationDataMap.put("operations", operationRowData);
cassandraClient.batch_insert("myBank", aCustomer.getAccountIds(),  
insertOperationDataMap, ConsistencyLevel.ONE);

Here is what you get when reading the operations for the accounId

Use Case #3 : Removing an item

Removing a complete row is – in terms of API – as simple as the rest of the API.

1
cassandraClient.remove("myBank", myAccountId, new ColumnPath("operations"),  timestamp, ConsistencyLevel.ONE);

It is yet a little more complex when you are looking inside. In brief, in a distributed system where node failure will occur, you can’t simply physically delete the record. So you replace it by a tombstone and the “mark as deleted” record will be effectively deleted once the tombstone will be considered enough old. At least, you can still use “logical deletion” and write a code that do not use these flagged records.

To (quickly) conclude this series of articles

I really like Cassandra which looks like a ready to use tools (even if NoSQL is plenty of great tools) and a way to achieve high performance system at “low” (at least lower) cost than with commercial tools. There are still concerns I hope I will be able to discuss like security (Cassandra provides authentication mechanisms…), searching (or at least getting ranges of datas), monitoring (and how to monitor all the nodes of your cluster into a unique tools like Ganglia, Nagios or Graphite or even how to use Hadoop above Cassandra.

To be continued…

This is the story of a project…

This is the story of a project, neither more complex nor simpler than others: an application that communicates with a database and two other systems. Something quite mainstream from a technical and architectural side, something standard from the management side: all must be done for yesterday and there is a lot to do…In short, “it’s gonna be hard” as often say the developers but nobody screams it out too loud.
So we build the team. 40 persons are staffed, people are specialized. The teams are organized in pools, so that a kind of contract is setting up between the different pools. Each pool is responsible for treating certain kind of demands. A flow of demands appears. Certain pools are under pressure and become the bottleneck: a stock of demands is created upstream whereas the downstream pools are waiting…Therefore and for these under pressure pools, important things are becoming urgent things. Choices must be made among urgent things to treat the immediate ones. Task switching is becoming the way of working and in the end, the flow slows down.

Then the deadline of the “go live” comes: it is in two months. The user acceptance tests are just starting but have been delayed by the tedious and painful integration between the different components. Maybe the built contracts between the teams have complicated the integration: some mandatory parameters are missing, the dates do not respect the proper format, the error codes are partially interpreted…
In any case, the user acceptance tests detect more bugs than what the development team can resolve and all is not still tested. (more…)

Let’s play with Cassandra…(Part 2/3)

In this part, we will work in more details and closer to the code with Cassandra. The idea is to provide a kind of simplified current account system where a user has an account and the account has a balance…
This system will so manipulate the following concepts:
- A client has different kind of properties defining his identity
- A client has one account
- The account has a list of operations (withdrawal, transfer are all kind of operations)
Here is the way it would have been modelized in the relational world (or at least UML world)

(more…)

Let’s play with Cassandra… (Part 1/3)

I have already talked about it but NoSQL is about diversity and includes various different tools and even kind of tools. Cassandra is one of these tools and is certainly and currently one of the most popular in the NoSQL ecosystem. Built by Facebook and currently in production at web giants like Digg, Twitter, Cassandra is a hybrid solution between Dynamo and BigTable.

Hybrid firstly because Cassandra uses a column-oriented way of modeling data (inspired by the BigTable) and permit to use Hadoop Map/Reduce jobs and secondly because it uses patterns inspired by Dynamo like Eventually Consistent, Gossip protocols, a master-master way of serving both read and write requests

Another DNA of Cassandra (and in fact a lot of NoSQL solutions) is that Cassandra has been built to be fully decentralized, designed for failure and Datacenter aware (in a sense you can configure Cassandra to ensure data replication between several Datacenter…). Hence, Cassandra is currently used between the Facebook US west and east coast datacenters and stored (around two years ago) 50+ TB of data on a 150 node cluster.
(more…)