Friday, August 20, 2010

ConsistencyLevel in Hector and Cassandra

I started playing with failover in Casssandra the other day and rapidly found myself in a bit of a pickle. I had a 2 node development environment allowing me to play with Cassandra and start developing Hector programs. So I decided to turn one machine off and see if my program would carry on as normal.

It didn’t. Not much of a failover I thought. To make matters worse if attached via the Cassandra cli client, I could retrieve data from my single node. What was going on ?

Turns out this was all to do with the consistency of my cluster. Take a look at the consistency section of:

http://wiki.apache.org/cassandra/API

There are multiple levels of consistency available in a Cassandra cluster, and they can be different for read and write operations. What I hadn’t realized is that Hector defaults to a consistency of QUORUM whereas cli defults to a consistency of ONE (I believe). So with a two node cluster, Hector will fail on reads if one goes down.

Adding Nodes

One way to get round the problem is to add more nodes and up the replication level in the conf files to 4. I added two notes that where not part of the seeding process and used the Autoboostrap option in he conf file to join the cluster. This has kind of solved the problem. I can now read from one of the bootstrapped nodes if any of the other nodes goes down, but not from one of the seed nodes. I think more work on the configuration and layout of the cluster is needed by me.

Changing the consistency in Hector

You can of course change the consistency in Hector. The code here refers to Hector 0.6.15 and above (for now). The first thing we need to do is implement a consistencylevelpolicy. Here’s an example based on the default consistency implementation in Hector

import me.prettyprint.cassandra.model.*;
import org.apache.cassandra.thrift.ConsistencyLevel;

public final class MyConsistancyLevel implements ConsistencyLevelPolicy {

@Override
public  ConsistencyLevel get(OperationType op) {
   switch (op){
      case READ:return ConsistencyLevel.QUORUM;
      case WRITE: return ConsistencyLevel.ONE;
      default: return ConsistencyLevel.QUORUM; //Just in Case
   }
}
@Override
public ConsistencyLevel get(OperationType op, String cfName) {
   return ConsistencyLevel.QUORUM;
}
}

In this example we set the READ consistency to QUORUM and the WRITE consistency to ONE. (I’ve also included a default consistency just in case !). Once we’ve got this class we can set the consistency for the keyspace like this:

ConsistencyLevelPolicy mcl = new MyConsistancyLevel();
ko.setConsistencyLevelPolicy(mcl);

And that’s it !

Many thanks to Ran Tavory and Colin Vipurs for their help on this.

As ever comments and so on gratefully received.

Wednesday, August 18, 2010

A brief note about using Clusters in Hector V2 API

Recently Hector, the java library for access to Cassandra dB was updated to version 2. Now it’s time to explore moving jBloggyAppy over to the new API. In this blog I’m briefly going to look at Hector V2’s clustering options which greatly improve on version 1. To create a cluster we use getOrCreateCluster from the Hector Factory (HFactory e.prettyprint.cassandra.model.HFactory.*; ) Ran Tavory recommends importing the static library to reduce typing !:

import static me.prettyprint.cassandra.model.HFactory.*;


Once we’ve done that we create the cluster
Cluster c = HFactory.getOrCreateCluster("MyCluster", "154.36.xx.yyy:9160");

And that’s it !

However, note that we are now connected to only the machine in the cluster that we have named. we can get a list of all machines in the cluster like this:

Set <String>hosts= c.getClusterHosts(true); 
Iterator it =hosts.iterator(); 
while (it.hasNext()) {  
   System.out.println(it.next()); 
} 

The problem here is that we haven't got the port number of each machine in the cluster, nor do can we find out about the topology. Thanks to the Hector mail list folks for pointing this out.

If we want to know the clusters name:
System.out.println(c.describeClusterName());

Finally should we want to use the cluster with a V1 pool:
CassandraClient client =c.borrowClient();

and release it in a finally clause:
} finally {
   c.releaseClient(client);
}

Tuesday, August 10, 2010

Using Java reflection to render a JSON feed

In today’s post we are going to look at a method for rendering JSON output from our java web app. It’s important to note that what I’m going to show relies on a couple of things:

1: I like to use Java beans to encapsulate data between elements of the MVC model. If more than one element is needed the beans are added to a list.

2: These beans use standard accessor methods starting with “get” to access elements private methods.

3: Our controller, a servlet , adds the beans to the request via by setting a attribute and then forwards to the view (usually a jsp page) using a requestdispatcher.

If you look at the code at github you’ll see that we have 4 of these bean stores with different accessor methods and private variables. And here in lies the problem. We could write a JSON encoder that worked on each bean but that’s going to be duplicating a lot of work. The answer is to use Java Reflection to “look” into the bean and extract it’s accessor methods. This will allow us to write one servlet that can process any bean or list of beans and convert it to JSON notation.

For more information on reflection look at Java Reflection

We will use the JSON library from JSON.org to make life easy.

Our Servlet that will generate the JSON will not know what type of bean is being sent to it, or if it is a list of beans. So rather than deal with a distint class we just deal with the Object class. We can get the beans and test for a list like this:

Object temp=request.getAttribute("Data");
Class c = temp.getClass();
String className=c.getName();
if (className.compareTo("java.util.LinkedList")==0){

If it’s not a linked list we will want to get the Object, find all the methods and call the accessor methods. Here’s our method for doing this which returns a JSONObject with only values from the bean that actually have data. In this example “Value” has been obtained from the request using getAttribute.

private JSONObject  ProcessObject(Object Value){ //Value has been passed to the servlet
 JSONObject Record=new JSONObject();
  
 try {
            Class c = Value.getClass();
            Method methlist[] = c.getDeclaredMethods();
            for (int i = 0; i < methlist.length; i++) {  
              Method m = methlist[i];
             
              String mName=m.getName();
             
                 if (mName.startsWith("get")==true){
                       String Name=mName.replaceFirst("get", "");
                  Class partypes[] = new Class[0];
                  Method meth = c.getMethod(mName, partypes);
                 
                  Object rt= meth.invoke(Value);
                  if (rt!=null){
                   System.out.println(Name+" Return "+ rt);
                   try{
                    Record.put(Name,rt);
                   }catch (Exception JSONet){
                 System.out.println("JSON Fault"+ JSONet);
                 return null;
                }
               
                  }
                 }
            }
            
            
         }
         catch (Throwable e) {
            System.err.println(e);
         }
         return Record;
 }
Dealing with a linked list of objects is just a case of iterating through the list and calling the above object for each bean we want to encode:
if (className.compareTo("java.util.LinkedList")==0){ //Deal with a linked list
 List Data = (List)request.getAttribute("Data"); 
 Iterator iterator;
 JSONObject JSONObj=new JSONObject();
 JSONArray Parts=new JSONArray();
 iterator = Data.iterator();     
 while (iterator.hasNext()){
  Object Value=iterator.next();
  JSONObject obj =ProcessObject(Value);
  try {
   Parts.put(obj);
  }catch (Exception JSONet){
           System.out.println("JSON Fault"+ JSONet);
          }
 }
 try{
  JSONObj.put("Data",Parts);
 }catch (Exception JSONet){
       System.out.println("JSON Fault"+ JSONet);
      }
 if (JSONObj!=null){
  PrintWriter out = response.getWriter();
  out.print(JSONObj);
 } 
Our simple code could be extended. Although it will deal with simple types stored in beans (strings, ints, longs, dates etc) if your bean stores more complex data (arrays or lists) then handling the reflection will need to be a lot more complicated!

Monday, August 9, 2010

Hector V2 API announced

One of the problems of using “cutting edge” software is that things are always evolving and occasionally the work you are doing can get left behind. This has kinda happened with the code I've been working on, I’ve been working on http://github.com/acobley/jBoggyAppy

I’ve been using Hector as the API between Java and Casssandra, building models to encapsulate data stored in the database. However Hector has just undergone a major overhaul, upgrading the interface to version 2. This is great and really to be appreciated, Ran and the team have made things easier and moved away from the complexities of Thrift .

I’ve decided I’ll carry on with the code using the V1 API and then come back and redo it in V2 later. This should be relatively easy, I’m keeping all the Hector code in one place (Java Bean Connectors) and with luck I’ll just override the current methods with new V2 methods. The only changes I may need to make to the controllers are in the dB connection method. The V2 api implements clustering (which in it self is a useful step forward) but this may change how my code needs to interact.

I’m looking forward to using V2 once I’ve got comments implemented in my jBloggyAppy code !

Converting a long to a byte array and back again

One thing that may not be immediately obvious is that Cassandra stores all values as a byte array. While this is not really important if your are storing a string (converting to a byte array is relatively easy) what if you need to store a date. Dates in Java are essentially of type log and represent the number of milliseconds since 1970 (or there abouts). If you want to store a date you need to convert it to a byte array and back again when going into and out of the key store. You could use serialization to achieve this, but as this article:

http://java.sun.com/developer/technicalArticles/Programming/serialization/

points out serialization can be a slow process. So, here I present two methods to convert a date to and from a byte array. I welcome comments on these, I’m sure they can be speeded up and generally be improved.

Converting to a byte array

Longs are stored as 8 bytes, so converting them is a case of masking off the bottom byte storing it in the array and then shifting the orignal number right 8 bits.

private  byte[] longToByteArray(long value)
{
byte[] buffer = new byte[8]; //longs are 8 bytes I believe
for (int i = 7; i >= 0; i--) { //fill from the right
 buffer[i]= (byte)(value & 0x00000000000000ff); //get the bottom byte
     
 //System.out.print(""+Integer.toHexString((int)buffer[i])+",");
 value=value >>> 8; //Shift the value right 8 bits
 }
return buffer;
}

Converting back from a byte array to a long


Converting back the other way we use a multiplier to convert each byte to it’s correct value. This multiplier is shifted left 8 bits each time round the loop.

private long byteArrayToLong(byte[] buffer){
long value=0;
long multiplier=1;
for (int i = 7; i >= 0; i--) { //get from the right
 //System.out.println(Long.toHexString(multiplier)+"\t"+Integer.toHexString((int)buffer[i]));
 value=value+(buffer[i] & 0xff)*multiplier; // add the value * the hex multiplier
 multiplier=multiplier <<8;
 }
return value;
}

A simple test case

The following code shows these examples in use, converting a Date to a byte array and back again maintaining it’s value:
long tempnow = System.currentTimeMillis();
Date tempDate= new Date(tempnow);
System.out.println("now "+tempnow);
System.out.println("Native Date "+tempDate);
        
//Convert to Byte Array and print
byte btempnow[]=longToByteArray(tempnow);
System.out.println();
System.out.print("Byte Array ");
displayByteArrayAsHex(btempnow);
        
//and Convert it back again
long converted =byteArrayToLong(btempnow);
tempDate=new Date(converted);
System.out.println("converted now "+converted);
System.out.println("converted  Date "+tempDate);

Friday, August 6, 2010

jBloggyAppy on Github

I've started work integrating the Cassandra code into an actual app. All the code will be available on github:

jBloggyAppy

The goal of the exercise is:

1: To write a Cassandra based app in Java
2: The App should use the MVC programming model
3: The app should implement a Restful interface
4: There will be no CSS (this is not a design exercise) and only minimal Javascript for any AJAX calls.
5: Use OpenID to facilitate logins

The code already deals with creating a new user, listing all users and listing the details for one user.

Wednesday, August 4, 2010

Reading column name value pairs from a Supercolumn for the BloggyAppy

In the last blog post we look at reading from normal column families, in this part we are going to look at reading from supercolumns. We are still folowing the BloggyAppy design from Arin Sarkissian. In this design we have a keyspace called columns. Each entry has a key (that is the title slug from before). Each supercoulmn uses a TIMEUID as the key/ name and the details of the comment are stored as columns underneath that. Here’s the structure:

Comments: {
 Blog-Slug:{
  Time_UUID_1:{
   Coment: A Comment,
   Email: andy@abc.com
},
Time_UUID_2:{
   Coment: A Comment,
   Email: andy@abc.com
}
}
}


The first job is to get the comment keys for a particular title slug. First of all remember that we are going to get all the Time_UUID’s which will be the super columns. The process is very similar to getting columns in a normal column family. There are only one major difference, instead of getting a RangeSlice from the keyspace we will use the getSuperRangeSlices method from the keyspace. The KeyRange and slice predicate work in exactly the same way as before. So to get the map of supercolmns we use:


Keyspace ks = client.getKeyspace("BloggyAppy");
ColumnParent columnParent = new ColumnParent("Comments");
          
SlicePredicate slicePredicate = new SlicePredicate();
SliceRange supercolumnRange = new SliceRange();
             
supercolumnRange.setStart(new byte[0]);  
supercolumnRange.setFinish(new byte[0]); supercolumnRange.setReversed(true); 
supercolumnRange.setCount(1000);
slicePredicate.setSlice_range(supercolumnRange);
KeyRange titlesRange = new KeyRange(200); 
titlesRange.setStart_key("First-Blog");
titlesRange.setEnd_key("First-Blog");
              
Map<String, List<SuperColumn>> supermap =ks.getSuperRangeSlices(columnParent, slicePredicate, titlesRange);

Now we have the map we can step through the keys (in our example there will be only one) and then for each get the list of supercolumns underneath it. The only thing to remember here is to convert the column name to type UUId:

for (String key : supermap.keySet()) {
 List<SuperColumn> columns = supermap.get(key);

 System.out.println("Key "+key);
 for (SuperColumn column : columns) {
  //print columns with values
  java.util.UUID Name=toUUID(column.getName()) ;
  System.out.println("Name "+Name);
  }
}


Finally we want to get the names and values of the columns inside the supercolumn. The trick here is to get the actual SuperColumn from the keyspace. Here’s one way to do it (note that column.getName will return a timeUUID in our case):

ColumnPath cp = new ColumnPath("Comments");
cp.setSuper_column(column.getName());
SuperColumn sc = ks.getSuperColumn(key, cp);

Now we have the Supercolumn we can just get a list of columns it contains and iterate through them:

List<Column> cols=sc.getColumns();
Iterator<Column>itr = cols.iterator(); 
while(itr.hasNext()) {
 Column col = itr.next(); 
 System.out.println("\t\t"+string(col.getName()) + "\t ==\t" + string(col.getValue()));
}


We should now have all we need to read columns and supercolumsn from the keyspaces. The next step will be to encapsulate all this into models for our web application to use. We’ll start that in our next post.

Tuesday, August 3, 2010

Implementing the comments: Writing a comment as a supercolumn

So far we have looked at simple column families but now it’s time to tackle super columns. We are still working with the BloggyAppy design from Arin Sarkissian. We are using some code from Hector’s test code.

Following Arin’s specification the Comments Column Family is going to look like this

Comments: {
 Blog-Slug:{
  Time_UUID_1:{
   Coment: A Comment,
   Email: andy@abc.com
},
Time_UUID_2:{
   Coment: A Comment,
   Email: andy@abc.com
}
}
}

Lets be clear Blog-Slug will be the Slug entry for the blog that is being commented on. This is the key we will be looking for. Under that are the supercolumn entries, with a key of type UUID. This contains a number of columns that are the entries for the comments. So to add a comment we can do this:
Keyspace ks = client.getKeyspace("BloggyAppy");
ColumnPath cp = new ColumnPath("Comments");
java.util.UUID timeUUID=getTimeUUID();              
cp.setSuper_column(asByteArray(timeUUID));
cp.setColumn(bytes("email"));
ks.insert(slugValue, cp, bytes("andy@abc.com"));
cp.setColumn(bytes("Comment"));
ks.insert(slugValue, cp, bytes("AComment"));


SlugValue is essentially the title. Note that getTimeUUID is defined as:

public static java.util.UUID getTimeUUID()
     {
return java.util.UUID.fromString(new com.eaio.uuid.UUID().toString());
     }

Reading keys and columns from a simple column family in a Cassandra dB

In the first part of these posts I’m going to look at retrieving data from a Cassandra Keyspace. We are working with the BloggyAppy design from Arin Sarkissian and the code is heavily based on the Hector examples at
Wiki.

First up lets look at getting the author details from the Authors ColumnFamily. So assuming we have a pool of connections from the Hector pool called “client” we can set the Column Family like this :

Keyspace ks = client.getKeyspace("BloggyAppy");
//retrieve sample data
ColumnParent columnParent = new ColumnParent("Authors");


We are going to use the getRangeSlice method from the KeySpace class to search for a single Author or to list all Author details. This requires a ColumnParent (set above) a slicePredicate and a KeyRange. The keyRange is used to limit the number of keys that are returned and to limit the keysearch to a certain range. The code looks like this:

KeyRange keyRange = new KeyRange();
keyRange.setStart_key(“”);
keyRange.setEnd_key("");


These settings will get all Keys in the Keyspace. If you want to limit the number you can put a numerical value in the KeyRange constructor:

KeyRange keyRange = new KeyRange(1);


This will get just one result. If you want to look for one Key only (such as the Author Andy) then you can do this:

KeyRange keyRange = new KeyRange(1);
keyRange.setStart_key(“Andy”);
keyRange.setEnd_key("");


Hopefully you can see that:

KeyRange keyRange = new KeyRange(100);
keyRange.setStart_key(“Andy”);
keyRange.setEnd_key("Dave");


Will get at most 100 keys that are between the “Andy” and “Dave”

So we can see how to restrict the number of keys that are returned. We now need to look at limiting the number of columns from the requested key are returned. Suppose we have the following

Andy
 Tel  == 01382 345078
 Email  == andy@r2-dvd.org
 Address  == QMB

We may only need the first column (Tel) or all columns or a slice range in between. We will use a slice range (that looks a lot like the keyRange !)

SliceRange columnRange = new SliceRange();
columnRange.setCount(4);
columnRange.setStart(new byte[0]);
columnRange.setFinish(new byte[0]);
columnRange.setReversed(true);


Some differences to note here we can change the order of that the columns are returned using setReversed. The start and end of the column Range are byte arrays (so that the columns need note be strings in the dB) If you want to search for a string (if that makes sense in your app) you can do this of course:


String start="Email";
byte bStart[]=start.getBytes();
columnRange.setStart(bStart); 


Finally we can create a slicePredicate form the columnRange and get the keys and columns from the keySpace.

SlicePredicate slicePredicate = new SlicePredicate(); slicePredicate.setSlice_range(columnRange); 
Map<String, List<Column>>  map = ks.getRangeSlices(columnParent, slicePredicate, keyRange); 


Now we’ve got the map, we’ll just read through it and display the columns:

for (String key : map.keySet()) { 
 List<Column> 
 columns = map.get(key); 
 //print key  
 System.out.println(key);  
 for (Column column : columns) { 
 //print columns with values 
 System.out.println("\t" + string(column.getName()) + "\t ==\t" + string(column.getValue())); 
} 
} 


This should be all we need to get “records” from a simple column family inside a keyspace.

If we want to get all posts by a particular author all we need to do is use our AuthorPosts Column family and use the above code to display the details for each post. Here’s my complete code for displaying all posts for Author “Andy”:

public class ReadAuthorPosts {
 public static void main(String[] args) throws Exception{
  // TODO Auto-generated method stub
        CassandraClientPool pool = CassandraClientPoolFactory.INSTANCE.get();
         CassandraClient client = pool.borrowClient("xxx.yy.36.151", 9160);
       
         try {
             Keyspace ks = client.getKeyspace("BloggyAppy");
             //retrieve sample data
             ColumnParent columnParent = new ColumnParent("AuthorPosts");

             SlicePredicate slicePredicate = new SlicePredicate();

             /**
              * this effect how many columns we are want to retrieve
              * also check slicePredicate.setColumn_names(java.util.List<byte[]> column_names)
              * .setColumn_names(new ArrayList<byte[]>()); no columns retrievied at all
              */
             SliceRange columnRange = new SliceRange();
             String Start="s";
             //For these beware of the reversed state
             //columnRange.setStart(Start.getBytes());  //Sets the first column name to get
             columnRange.setStart(new byte[0]);  //We'll get them all.
             columnRange.setFinish(new byte[0]); //Sets the last column name to get
             //effect on columns order
             columnRange.setReversed(false); //Changes order of columns returned in keyset
             columnRange.setCount(1000); //Maximum number of columsn in a key
 
             slicePredicate.setSlice_range(columnRange);

             //count of max retrieving keys
             KeyRange keyRange = new KeyRange(1);  //Maximum number of keys to get
             keyRange.setStart_key("Andy");
             keyRange.setEnd_key("");
             Map<String, List<Column>> map = ks.getRangeSlices(columnParent, slicePredicate, keyRange);

             //printing keys with columns
             for (String key : map.keySet()) {
                 List<Column> columns = map.get(key);
                 //print key
                 System.out.println(key);
                 for (Column column : columns) {
                     //print columns with values
                  java.util.UUID Name=toUUID(column.getName()) ;
              
                     System.out.println("\t" + Name + "\t ==\t" + string(column.getValue()));
                    DisplayPost(string(column.getValue()));
                 
                 }
             }

             // This line makes sure that even if the client had failures and recovered, a correct
             // releaseClient is called, on the up to date client.
             client = ks.getClient();

         } finally {
             pool.releaseClient(client);
         }
 }
 
 
 public static java.util.UUID toUUID( byte[] uuid )
    {
    long msb = 0;
    long lsb = 0;
    assert uuid.length == 16;
    for (int i=0; i<8; i++)
        msb = (msb << 8) | (uuid[i] & 0xff);
    for (int i=8; i<16; i++)
        lsb = (lsb << 8) | (uuid[i] & 0xff);
    long mostSigBits = msb;
    long leastSigBits = lsb;

    com.eaio.uuid.UUID u = new com.eaio.uuid.UUID(msb,lsb);
    return java.util.UUID.fromString(u.toString());
    }
  
  
  private static void DisplayPost(String sKey){
   CassandraClientPool pool = CassandraClientPoolFactory.INSTANCE.get();
   CassandraClient client=null;
   try{
        client = pool.borrowClient("xxx.yy.36.151", 9160);
   
        HashMap hm = new HashMap();
        hm.put("pubDate", "");

        
            Keyspace ks = client.getKeyspace("BloggyAppy");
            //retrieve sample data
            ColumnParent columnParent = new ColumnParent("BlogEntries");

            SlicePredicate slicePredicate = new SlicePredicate();

            /**
             * this effect how many columns we are want to retrieve
             * also check slicePredicate.setColumn_names(java.util.List<byte[]> column_names)
             * .setColumn_names(new ArrayList<byte[]>()); no columns retrievied at all
             */
            SliceRange columnRange = new SliceRange();
            String Start="s";
            //For these beware of the reversed state
            //columnRange.setStart(Start.getBytes());  //Sets the first column name to get
            columnRange.setStart(new byte[0]);  //We'll get them all.
            columnRange.setFinish(new byte[0]); //Sets the last column name to get
            //effect on columns order
            columnRange.setReversed(false); //Changes order of columns returned in keyset
            columnRange.setCount(10); //Maximum number of columsn in a key

            slicePredicate.setSlice_range(columnRange);

            //count of max retrieving keys
            KeyRange keyRange = new KeyRange(200);  //Maximum number of keys to get
            keyRange.setStart_key(sKey);
            keyRange.setEnd_key(sKey);
            Map<String, List<Column>> map = ks.getRangeSlices(columnParent, slicePredicate, keyRange);

            //printing keys with columns
            for (String key : map.keySet()) {
                List<Column> columns = map.get(key);
                //print key
                System.out.println(key);
                for (Column column : columns) {
                    //print columns with values
                 String Name=string(column.getName()) ;
             
                    System.out.println("\t" + Name + "\t ==\t" + string(column.getValue()));
                
                }
            }

            // This line makes sure that even if the client had failures and recovered, a correct
            // releaseClient is called, on the up to date client.
            client = ks.getClient();
        }catch(Exception et){
     System.out.println("Can't connect to server "+et);
     return;
    }
       
         try{
          pool.releaseClient(client);
         }catch(Exception et){
          System.out.println("Can't release pool "+et);
         }
       
        
  }
 
}