Joining HBase Tables using Spark:
I have to join two HBase tables to get the result for one of project and i could not fing a concrete solution that can resolve this.
So, i tried to resolve this by my own.
Here is how the solution works:
I am taking classic case of User and Dept joins, as i could not present my project work due to security reasons.
Let's Say,
User table has below structure
ColumnFamily Qualifier
------------------------------------
user userid
user name
user deptId
Dept table has below structure
ColumnFamily Qualifier
------------------------------------
department deptid
department department
department description
Considering, you have to join based on DeptID that is available on both the tables. Here is how the code will work.
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.storage.StorageLevel
import org.slf4j.LoggerFactory
object HBaseJoin {
val log = LoggerFactory.getLogger(HBaseJoin.getClass)
def main(args: Array[String]) {
val conf = new SparkConf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result]))
val sc = new SparkContext(conf)
val hbaseConf = HBaseConfiguration.create();
hbaseConf.set("hbase.zookeeper.quorum", "localhost");
hbaseConf.set(TableInputFormat.INPUT_TABLE, "Users")
val userRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).persist(StorageLevel.MEMORY_AND_DISK_SER)
println("Number of Records found : " + routerRDD.count())
// Creating a Pair RDD, that will have required column value as key.
val userPairs = userRDD.map( {case(rowkey:ImmutableBytesWritable, values:Result) => (Bytes.toString(values.getValue(Bytes.toBytes("user"), Bytes.toBytes("deptid"))), values) }).persist(StorageLevel.MEMORY_AND_DISK_SER)
hbaseConf.set(TableInputFormat.INPUT_TABLE, "depts")
val deptRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).persist(StorageLevel.MEMORY_AND_DISK_SER)
// Creating pair RDD from the other table as well, with the same key from the other table as above
val interfaceData = deptRDD.map( {case(rowkey:ImmutableBytesWritable, values:Result) => (Bytes.toString(values.getValue(Bytes.toBytes("dept"), Bytes.toBytes("deptid"))), values) }).persist(StorageLevel.MEMORY_AND_DISK_SER)
// Join them -- The code below can be modified and can be worked according to requirement.
// My requirement is to get the data and save it into other HBase table. So, i had to prepare RDD to save them
val joinedRDD = routerData.join(interfaceData).persist(StorageLevel.MEMORY_AND_DISK_SER);
val targetTableRDD = joinedRDD.map({case (key: String, results: (Result, Result)) => {
val userData = results._1;
val userCF = Bytes.toBytes("user");
val deptCF = Bytes.toBytes("dept");
val deptData = results._2;
val joinCF = Bytes.toBytes("joined");
val deptId = deptData.getValue(deptCF, Bytes.toBytes("deptid"));
val description = deptData.getValue(deptCF, Bytes.toBytes("description"))
val userid = userData.getValue(routerCF, Bytes.toBytes("userid"))
val name = userData.getValue(deptCF, Bytes.toBytes("name"))
val put = new Put(Bytes.toBytes(userid+","+deptId));
put.addColumn(joinCF, Bytes.toBytes("deptid"), deptId)
put.addColumn(joinCF, Bytes.toBytes("userid"), userid)
put.addColumn(joinCF, Bytes.toBytes("description"), description)
put.addColumn(joinCF, Bytes.toBytes("name"), name)
(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put)
}
}).persist(StorageLevel.MEMORY_AND_DISK_SER);
saveToHbase(loopbackRDD, hbaseConf, "joined_data")
sc.stop()
}
}
I have to join two HBase tables to get the result for one of project and i could not fing a concrete solution that can resolve this.
So, i tried to resolve this by my own.
Here is how the solution works:
I am taking classic case of User and Dept joins, as i could not present my project work due to security reasons.
Let's Say,
User table has below structure
ColumnFamily Qualifier
------------------------------------
user userid
user name
user deptId
Dept table has below structure
ColumnFamily Qualifier
------------------------------------
department deptid
department department
department description
Considering, you have to join based on DeptID that is available on both the tables. Here is how the code will work.
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.storage.StorageLevel
import org.slf4j.LoggerFactory
object HBaseJoin {
val log = LoggerFactory.getLogger(HBaseJoin.getClass)
def main(args: Array[String]) {
val conf = new SparkConf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result]))
val sc = new SparkContext(conf)
val hbaseConf = HBaseConfiguration.create();
hbaseConf.set("hbase.zookeeper.quorum", "localhost");
hbaseConf.set(TableInputFormat.INPUT_TABLE, "Users")
val userRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).persist(StorageLevel.MEMORY_AND_DISK_SER)
println("Number of Records found : " + routerRDD.count())
// Creating a Pair RDD, that will have required column value as key.
val userPairs = userRDD.map( {case(rowkey:ImmutableBytesWritable, values:Result) => (Bytes.toString(values.getValue(Bytes.toBytes("user"), Bytes.toBytes("deptid"))), values) }).persist(StorageLevel.MEMORY_AND_DISK_SER)
hbaseConf.set(TableInputFormat.INPUT_TABLE, "depts")
val deptRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).persist(StorageLevel.MEMORY_AND_DISK_SER)
// Creating pair RDD from the other table as well, with the same key from the other table as above
val interfaceData = deptRDD.map( {case(rowkey:ImmutableBytesWritable, values:Result) => (Bytes.toString(values.getValue(Bytes.toBytes("dept"), Bytes.toBytes("deptid"))), values) }).persist(StorageLevel.MEMORY_AND_DISK_SER)
// Join them -- The code below can be modified and can be worked according to requirement.
// My requirement is to get the data and save it into other HBase table. So, i had to prepare RDD to save them
val joinedRDD = routerData.join(interfaceData).persist(StorageLevel.MEMORY_AND_DISK_SER);
val targetTableRDD = joinedRDD.map({case (key: String, results: (Result, Result)) => {
val userData = results._1;
val userCF = Bytes.toBytes("user");
val deptCF = Bytes.toBytes("dept");
val deptData = results._2;
val joinCF = Bytes.toBytes("joined");
val deptId = deptData.getValue(deptCF, Bytes.toBytes("deptid"));
val description = deptData.getValue(deptCF, Bytes.toBytes("description"))
val userid = userData.getValue(routerCF, Bytes.toBytes("userid"))
val name = userData.getValue(deptCF, Bytes.toBytes("name"))
val put = new Put(Bytes.toBytes(userid+","+deptId));
put.addColumn(joinCF, Bytes.toBytes("deptid"), deptId)
put.addColumn(joinCF, Bytes.toBytes("userid"), userid)
put.addColumn(joinCF, Bytes.toBytes("description"), description)
put.addColumn(joinCF, Bytes.toBytes("name"), name)
(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put)
}
}).persist(StorageLevel.MEMORY_AND_DISK_SER);
saveToHbase(loopbackRDD, hbaseConf, "joined_data")
sc.stop()
}
}
Hi, I have a doubt. This is related to your answer to following question - http://stackoverflow.com/questions/34601839/parse-nested-json-stringified-column-in-spark-streaming-sql. Do you know the same thing can be achieved in java ?
ReplyDeleteThanks
What is routerRDD and routercf?as this are not mentioned anywhere just used while joining tables.And if we have large number of records in hbase tables whether this code will be efficient?
ReplyDeleteReally nice blog post.provided a helpful information.I hope that you will post more updates like this Big Data Hadoop Online Training Bangalore
ReplyDeleteVery nice article,keep sharing more posts with us.
ReplyDeletethank you....
Big data and hadoop course
Big data training