Tuesday 9 February 2016

How to join two HBase Tables using Spark

Joining HBase Tables using Spark:

I have to join two HBase tables to get the result for one of project and i could not fing a concrete solution that can resolve this. 
So, i tried to resolve this by my own. 

Here is how the solution works:
I am taking classic case of User and Dept joins, as i could not present my project work due to security reasons. 

Let's Say, 

User table has below structure

ColumnFamily        Qualifier
------------------------------------
user userid
user                      name
user deptId


Dept table has below structure

ColumnFamily Qualifier
------------------------------------
department deptid
department                      department
department description

Considering, you have to join based on DeptID that is available on both the tables. Here is how the code will work.

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.storage.StorageLevel
import org.slf4j.LoggerFactory

object HBaseJoin {
  val log = LoggerFactory.getLogger(HBaseJoin.getClass)

  def main(args: Array[String]) {
    val conf = new SparkConf
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result]))
    
    val sc = new SparkContext(conf)

    val hbaseConf = HBaseConfiguration.create();
    hbaseConf.set("hbase.zookeeper.quorum", "localhost");
    
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "Users")
    
    val userRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).persist(StorageLevel.MEMORY_AND_DISK_SER)
    println("Number of Records found : " + routerRDD.count())
    
    // Creating a Pair RDD, that will have required column value as key.
    val userPairs = userRDD.map( {case(rowkey:ImmutableBytesWritable, values:Result) => (Bytes.toString(values.getValue(Bytes.toBytes("user"), Bytes.toBytes("deptid"))), values) }).persist(StorageLevel.MEMORY_AND_DISK_SER)
    
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "depts")
    
    val deptRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).persist(StorageLevel.MEMORY_AND_DISK_SER)
    
    // Creating pair RDD from the other table as well, with the same key from the other table as above
    val interfaceData = deptRDD.map(  {case(rowkey:ImmutableBytesWritable, values:Result) => (Bytes.toString(values.getValue(Bytes.toBytes("dept"), Bytes.toBytes("deptid"))), values) }).persist(StorageLevel.MEMORY_AND_DISK_SER)
    
    
    // Join them -- The code below can be modified and can be worked according to requirement.
    // My requirement is to get the data and save it into other HBase table. So, i had to prepare RDD to save them
    val joinedRDD = routerData.join(interfaceData).persist(StorageLevel.MEMORY_AND_DISK_SER);
    
    val targetTableRDD = joinedRDD.map({case (key: String, results: (Result, Result)) => {
      
      val userData = results._1;
      val userCF = Bytes.toBytes("user");
      val deptCF = Bytes.toBytes("dept");
      val deptData = results._2;
      val joinCF = Bytes.toBytes("joined");
      
      val deptId = deptData.getValue(deptCF, Bytes.toBytes("deptid"));
      val description = deptData.getValue(deptCF, Bytes.toBytes("description"))
      val userid = userData.getValue(routerCF, Bytes.toBytes("userid"))
      val name = userData.getValue(deptCF, Bytes.toBytes("name"))
      
      val put = new Put(Bytes.toBytes(userid+","+deptId));
      put.addColumn(joinCF, Bytes.toBytes("deptid"), deptId)
      put.addColumn(joinCF, Bytes.toBytes("userid"), userid)
      put.addColumn(joinCF, Bytes.toBytes("description"), description)
      put.addColumn(joinCF, Bytes.toBytes("name"), name)
      
      (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put)
    }
    
    }).persist(StorageLevel.MEMORY_AND_DISK_SER);
    
    saveToHbase(loopbackRDD, hbaseConf, "joined_data")
    sc.stop()

  }

}

4 comments:

  1. Hi, I have a doubt. This is related to your answer to following question - http://stackoverflow.com/questions/34601839/parse-nested-json-stringified-column-in-spark-streaming-sql. Do you know the same thing can be achieved in java ?

    Thanks

    ReplyDelete
  2. What is routerRDD and routercf?as this are not mentioned anywhere just used while joining tables.And if we have large number of records in hbase tables whether this code will be efficient?

    ReplyDelete
  3. Really nice blog post.provided a helpful information.I hope that you will post more updates like this Big Data Hadoop Online Training Bangalore


    ReplyDelete
  4. Very nice article,keep sharing more posts with us.
    thank you....

    Big data and hadoop course

    Big data training

    ReplyDelete