Friday, 23 December 2016

Replacing a dead cassandra node and resolution to issues with Host ID

This blog details about replacing a dead cassandra node. Recently, i have faced this situation and i had to struggle with few issues. I would like to detail all those issues and point right resolution in all the situations.

1. First i would like to describe normal replace procedure. This should be first try, and this works only in ideal situations. Nevertheless, try it.

a. Check the status of the node using "nodetool status". If any node is down, the status will appear with "DN" status.


Assuming that you have 6 nodes, here is how it looks. I am giving a masked details and renamed host id and IPs.

|/ State=Normal/Leaving/Joining/Moving
--  Address         Load       Tokens       Owns (effective)  Host ID                               Rack
UN   7.57 GiB   256          50.2%             2ssbaar-9dbdaf-4r324-8697-f80b9351e7  --
UN    7.4 GiB    256          50.1%             9ssbaar-9dbdaf-4r324-8697-f80b9351e7  --
UN   7.82 GiB   256          51.3%             1ssbaar-9dbdaf-4r324-8697-f80b9351e7  --
UN    7.62 GiB   256          48.9%             2f71f53-9dbdaf-4r324-8697-f80b9351e7  --
DN   6.91 GiB   256          47.8%             38abaar-9dbdaf-4r324-8697-f80b9351e7  --
UN    7.92 GiB   256          51.7%             7cddaar-9dbdaf-4r324-8697-f80b9351e7  --

Here, in the above status, the node with IP is DOWN.

Assuming we have to replace that with a new machine, here are the steps.

1) install cassandra on new node and do not start cassandra.

2) make sure the seed details and every thing is fine in the cassandra installation.

3) start cassandra with following command (assuming your cassandra installation directory is /usr/lib/cassandra).

/usr/lib/cassandra/bin/cassandra -Dcassandra.replace_address_first_boot=

Now, the story begins.

case 1: If it starts without any problems, you must be lucky and go to all the nodes and do repair on every one. That should be it.

case 2: If you get waring saying, it is unsafe to replace and use cassandra.allow_unsafe_replace.

then: /usr/lib/cassandra/bin/cassandra -Dcassandra.replace_address_first_boot= -Dcassandra.allow_unsafe_replace=true

if it starts after that, you can still consider your self lucky.. go ahead with repair on each node and you will be done.

case 3: If it screams with error 
java.lang.RuntimeException: Host ID collision between active endpoint
It means, the details from seed about the cluster information are still having the died machine in its gossip or system information. If you get this situation, proceed as follows.

i) nodetool removenode --host

Then run the cassandra command with replace as in step 1/step 2.

ii) If it still screams at you,

what you can do is, go to the data folder of cassandra. This will be configured in cassandra.yaml. By default it will be, <cassandra_installation_directory>/data.

check the system directory in data directory. This is system information collected from all the machines. Once it is created with old machine details, you will get into this situation.

run the command on the new/fresh node that you want to replace. 

P.S. Do not run this command on any existing machine. This will destroy complete cluster information if mis-used.

rm -r <data_direcotry>/system/*

What it means: Removing all system tables data from the new cassandra node.

now run the command with replace_address command above. If you encounter case 2, run with unsafe replace true.

This should join the cluster now without any issues. When  you check the nodetool status,
you should see only new node, but with same host id as old machine, like below.

|/ State=Normal/Leaving/Joining/Moving
--  Address         Load       Tokens       Owns (effective)  Host ID                               Rack
UN   7.57 GiB   256          50.2%             2ssbaar-9dbdaf-4r324-8697-f80b9351e7  --
UN    7.4 GiB    256          50.1%             9ssbaar-9dbdaf-4r324-8697-f80b9351e7  --
UN   7.82 GiB   256          51.3%             1ssbaar-9dbdaf-4r324-8697-f80b9351e7  --
UN    7.62 GiB   256          48.9%             2f71f53-9dbdaf-4r324-8697-f80b9351e7  --
UN   6.91 GiB   256          47.8%             38abaar-9dbdaf-4r324-8697-f80b9351e7  --
UN    7.92 GiB   256          51.7%             7cddaar-9dbdaf-4r324-8697-f80b9351e7  --

Run repair on all the machines, and you are ready to go... 

Thank You.

Friday, 22 July 2016

Dynamically Changing the Dependency based on condition in SBT

I have recently faced this question in stackoverflow, which i resolved. Though many people already done way advanced things than this in Scala and SBT, i just felt sharing this would be a good one.

The Problem: 

User wants to change the dependency in SBT file based on condition. Users might be using different versions of APIs in different regions like Dev, Test etc. This question was posted asking that, the user wants to use different dependency based on the environment he is using.


I have just performed the dynamic build with two different spark version in my example. I have to use two different version based on specific condition.

You can do that in two ways. As you need to provide input in one or other way, so you need to use command line parameters.

1) using build.sbt it self.

a) you can define a parameter with the name "sparkVersion"

b) read that parameter in build.sbt, (you can write scala code in build.sbt, and it gets compiled to scala any way in build time.)

c) perform the conditional based based dependencies as below.

val sparkVersion = Option(System.getProperty("sparkVersion")).getOrElse("default")

if(sparkVersion == "newer"){
    println(" newer one");
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" 
    println(" default one");
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"

you can play with all the build options at your will.

2) Using build.scala file. You can create a build.scala file in /project/build.scala

you can write the code below.

import sbt._
import Keys._

object MyBuild extends Build {  
  val myOptionValue = Option(System.getProperty("scalaTestVersion")).getOrElse("defaultValue")

  val depVersion = if(myOptionValue == "newer"){
    println(" asked for newer version" );
    println(" asked for older/default version" );

   val dependencies = Seq(
    "org.scalatest" %% "scalatest" % depVersion % "test"

   lazy val exampleProject = Project("SbtExample", file(".")).settings(
    version       := "1.2",
    scalaVersion  := "2.10.4",
    libraryDependencies ++= dependencies


After this, just run the build command as below.

sbt clean compile -DsparkVersion=newer -DscalaTestVersion=newer

You will understand how good and magical sbt is with this.

Tuesday, 3 May 2016

Schema evolution with Avro

what is schema evolution
Schema evolution is the term used for how the store behaves when schema is changed after data has been written to the store using an older version of that schema. 
The modifications one can safely perform to schema without any concerns are:
> A field with a default value is added.
> A field that was previously defined with a default value is removed.
> A field's doc attribute is changed, added or removed.
> A field's order attribute is changed, added or removed.
> A field's default value is added, or changed.
> Field or type aliases are added, or removed.

Rules for Changing Schema:

1.For best results, always provide a default value for the fields in your schema. This makes it possible to delete fields later on if you decide it is necessary. If you do not provide a default value for a field, you cannot delete that field from your schema.

2.You cannot change a field's data type. If you have decided that a field should be some data type other than what it was originally created using, then add a whole new field to your schema that uses the appropriate data type.

3.You cannot rename an existing field. However, if you want to access the field by some name other than what it was originally created using, add and use aliases for the field.

4.A non-union type may be changed to a union that contains only the original type, or vice-versa.

How do you handle schema evolution with AVRO:

Schema evolution is the automatic transformation of Avro schema. This transformation is between the version of the schema that the client is using (its local copy), and what is currently contained in the store. When the local copy of the schema is not identical to the schema used to write the value (that is, when the reader schema is different from the writer schema), this data transformation is performed. When the reader schema matches the schema used to write the value, no transformation is necessary.
Schema evolution is applied only during deserialization. If the reader schema is different from the value's writer schema, then the value is automatically modified during deserialization to conform to the reader schema. To do this, default values are used.
There are two cases to consider when using schema evolution: when you add a field and when you delete a field. Schema evolution takes care of both scenarios, so long as you originally assigned default values to the fields that were deleted, and assigned default values to the fields that were added.

Avro schemas can be written in two ways, either in a JSON format:
            "type": "record",
            "name": "Person",
            "fields": [
              {"name": "userName",        "type": "string"},
              {"name": "favouriteNumber", "type": ["null", "long"]},
              {"name": "interests",       "type": {"type": "array", "items": "string"}}
…or in an IDL:

record Person {
    string               userName;
    union { null, long } favouriteNumber;
    array<string>        interests;

The following are the key advantages of Avro:
* Schema evolution – Avro requires schemas when data is written or read. Most interesting is that you can use different schemas for serialization and deserialization, and Avro will handle the missing/extra/modified fields.
* Untagged data – Providing a schema with binary data allows each datum be written without overhead. The result is more compact data encoding, and faster data processing.
* Dynamic typing – This refers to serialization and deserialization without code generation. It complements the code generation, which is available in Avro for statically typed languages as an optional optimization.

Example code to handle the schema evolution

Create an External table with Avro Schema -- This can be tried with external or managed table

CREATE external TABLE avro_external_table
  LOCATION 'hdfs://quickstart.cloudera:8020/user/cloudera/avro_data/'

-- Select query to check the data
Select * from avro_external_table

-- Alter table statement to alter the schema file, to check the schema evolution
ALTER TABLE avro_external_table SET TBLPROPERTIES ('avro.schema.url'='hdfs://localhost:8020/user/cloudera/new_schema.avsc');

-- Select query
Select * from avro_external_table 


"type": "record",
"name": "Meetup",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "meetup_date",
"type": "string"
}, {
"name": "going",
"type": "int"
}, {
"name": "organizer",
"type": "string",
"default": "unknown"
}, {
"name": "topics",
"type": {
"type": "array",
"items": "string"

New_Schema :
(renames the going to attendance and adds new field with “location” ):

"type": "record",
"name": "Meetup",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "meetup_date",
"type": "string",
"java-class": "java.util.Date"
}, {
"name": "attendance",
"type": "int",
"aliases": ["going"]
}, {
"name": "location",
"type": "string",
"default": "unknown"

Wednesday, 20 April 2016

Hourglass Sum Hacker Rank problem solution

HourGlass Sum problem:

You are given a 6∗6  2D array. An hourglass in an array is a portion shaped like this:

a b c

e f g

For example, if we create an hourglass using the number 1 within an array full of zeros, it may look like this:

1 1 1 0 0 0
0 1 0 0 0 0
1 1 1 0 0 0
0 0 0 0 0 0
0 0 0 0 0 0

0 0 0 0 0 0

Actually there are many hourglasses in the array above. The three leftmost hourglasses are the following:

1 1 1     1 1 0     1 0 0
  1            0           0
1 1 1     1 1 0     1 0 0

The sum of an hourglass is the sum of all the numbers within it. The sum for the hourglasses above are 7, 4, and 2, respectively.

In this problem you have to print the largest sum among all the hourglasses in the array.


My solution works with iterative approach as of now. I am preparing a solution with recursive as well. 

import java.util.*;
import java.text.*;
import java.math.*;
import java.util.regex.*;

public class Solution {

    public static void main(String[] args) {
        Scanner in = new Scanner(;
        int arr[][] = new int[6][6];
        for(int i=0; i < 6; i++){
            for(int j=0; j < 6; j++){
                arr[i][j] = in.nextInt();
        int max = Integer.MIN_VALUE;
for (int i = 2; i < 6; i++) {
for (int j = 2; j < 6; j++) {
int sum = 0;
for (int k = i - 2; k <= i; k++) {
for (int l = j - 2; l <= j; l++) {
if (k == i - 1 && l != j - 1) {
sum = sum + 0;
} else {
sum = sum + arr[k][l];
if (sum > max) {
max = sum;


Tuesday, 9 February 2016

How to join two HBase Tables using Spark

Joining HBase Tables using Spark:

I have to join two HBase tables to get the result for one of project and i could not fing a concrete solution that can resolve this. 
So, i tried to resolve this by my own. 

Here is how the solution works:
I am taking classic case of User and Dept joins, as i could not present my project work due to security reasons. 

Let's Say, 

User table has below structure

ColumnFamily        Qualifier
user userid
user                      name
user deptId

Dept table has below structure

ColumnFamily Qualifier
department deptid
department                      department
department description

Considering, you have to join based on DeptID that is available on both the tables. Here is how the code will work.

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.slf4j.LoggerFactory

object HBaseJoin {
  val log = LoggerFactory.getLogger(HBaseJoin.getClass)

  def main(args: Array[String]) {
    val conf = new SparkConf
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)

    val hbaseConf = HBaseConfiguration.create();
    hbaseConf.set("hbase.zookeeper.quorum", "localhost");
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "Users")
    val userRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).persist(StorageLevel.MEMORY_AND_DISK_SER)
    println("Number of Records found : " + routerRDD.count())
    // Creating a Pair RDD, that will have required column value as key.
    val userPairs = {case(rowkey:ImmutableBytesWritable, values:Result) => (Bytes.toString(values.getValue(Bytes.toBytes("user"), Bytes.toBytes("deptid"))), values) }).persist(StorageLevel.MEMORY_AND_DISK_SER)
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "depts")
    val deptRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).persist(StorageLevel.MEMORY_AND_DISK_SER)
    // Creating pair RDD from the other table as well, with the same key from the other table as above
    val interfaceData =  {case(rowkey:ImmutableBytesWritable, values:Result) => (Bytes.toString(values.getValue(Bytes.toBytes("dept"), Bytes.toBytes("deptid"))), values) }).persist(StorageLevel.MEMORY_AND_DISK_SER)
    // Join them -- The code below can be modified and can be worked according to requirement.
    // My requirement is to get the data and save it into other HBase table. So, i had to prepare RDD to save them
    val joinedRDD = routerData.join(interfaceData).persist(StorageLevel.MEMORY_AND_DISK_SER);
    val targetTableRDD ={case (key: String, results: (Result, Result)) => {
      val userData = results._1;
      val userCF = Bytes.toBytes("user");
      val deptCF = Bytes.toBytes("dept");
      val deptData = results._2;
      val joinCF = Bytes.toBytes("joined");
      val deptId = deptData.getValue(deptCF, Bytes.toBytes("deptid"));
      val description = deptData.getValue(deptCF, Bytes.toBytes("description"))
      val userid = userData.getValue(routerCF, Bytes.toBytes("userid"))
      val name = userData.getValue(deptCF, Bytes.toBytes("name"))
      val put = new Put(Bytes.toBytes(userid+","+deptId));
      put.addColumn(joinCF, Bytes.toBytes("deptid"), deptId)
      put.addColumn(joinCF, Bytes.toBytes("userid"), userid)
      put.addColumn(joinCF, Bytes.toBytes("description"), description)
      put.addColumn(joinCF, Bytes.toBytes("name"), name)
      (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put)
    saveToHbase(loopbackRDD, hbaseConf, "joined_data")



Sunday, 31 January 2016

Spark Streaming Notes

What is Spark Streaming:

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.

In Short, Processing Huge data that is real time both in terms of processing and results.

Much like Spark is built on the concept of RDDs, Spark Streaming provides an
abstraction called DStreams, or discretized streams. A DStream is a sequence of data
arriving over time.

Spark Streaming uses a “micro-batch” architecture, where the streaming computation is treated as a continuous series of batch computations on small batches of data. Spark Streaming receives data from various input sources and groups it into small batches. New batches are created at regular time intervals. At the beginning of each time interval a new batch is created, and any data that arrives during that interval gets added to that batch. At the end of the time interval the batch is done growing. The size of the time intervals is determined by a parameter called the batch interval. The batch interval is typically between 500 milliseconds and several seconds, as configured by the application developer. Each input batch forms an RDD, and is processed using Spark jobs to create other RDDs. The processed results can then be pushed out to external systems in batches.
DStream: Core Concept of Spark StreamingInternally, each DStream is represented as a sequence of RDDs arriving at each time step (hence the name “discretized”). DStreams can be created from various input sources, such as Flume, Kafka, or HDFS. Once built, they offer two types of operations: transformations, which yield a new DStream, and output operations, which write data to an external system.
DStreams provide many of the same operations available on RDDs, plus new operations related to time, such as sliding windows.

Socket Listening Example:
Socket listening means, the application listens the specific TCP Port continuously and when there is a message/event at that port, it gets picked up and processed.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds

object SocketListening {
   def main(args:Array[String]){
     val conf = new SparkConf();
     val sc = new SparkContext();
     val streamingContext = new StreamingContext(sc, Seconds(1));
     val dStream = streamingContext.socketTextStream("localhost", 9999);
     val heLines = dStream.filter(_.contains("error"))
To start receiving data, we must explicitly call start() on the StreamingContext. Then, Spark Streaming will start to schedule Spark jobs on the underlying SparkContext. This will occur in a separate thread, so to keep our application from exiting, we also need to call awaitTermination to wait for the streaming computation to finish.
Note that a streaming context can be started only once, and must be started after we
set up all the DStreams and output operations we want.
Transformations on DStreamsThey can be grouped into either stateless or stateful:
• StateLess: In stateless transformations the processing of each batch does not depend on the
data of its previous batches. They include the common RDD transformations we
have seen in Chapters 3 and 4, like map(), filter(), and reduceByKey().
• Stateful : In this type of transformations, in contrast, use data or intermediate results from previous
batches to compute the results of the current batch. They include transformations
based on sliding windows and on tracking state across time.
Transform It is not like regular transformations. The transform operation (along with its variations like transformWith) allows arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD operation that is not exposed in the DStream API. For example, the functionality of joining every batch in a data stream with another dataset is not directly exposed in the DStream API. However, you can easily use transform to do this. This enables very powerful possibilities. For example, one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.
Works on each rows
Works on each partition
Works on each rdd

Scala Code:


To clarify further:
A => yourFunction(record)) will do something on every record in every RDDs in the DStream. Which essentially means every records in the DStream. ButyourDStream.transform(rdd => anotherFunction(rdd)) allows you to do arbitrary stuff on every RDD in the DStream.
For example, if you do yourDStream.transform(rdd => => yourFunction(record)) is exactly same as the one in the first line. Only a map function.
However, you can also do
yourDStream.transform(rdd => ) which obviously involves multiple stages of shuffles by keys. So transform is far more general operation than map that allows arbitrary computations on each RDD of a DStream. For example, say you want to sort every batch of data by a key. Currerntly, there is no DStream.sortByKey() to do that. However, you can easily use transform to do DStream.transform(rdd => rdd.sortByKey()).

Transform Code Example:
package stream
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec._
 * @author lenovo
case class Word(word:String, count:Int);
object TransformExample {
  def main(args:Array[String]){
val conf = new SparkConf();
val sc = new SparkContext(conf);
val sqlContext = new SQLContext(sc);
val df ="json").load(args(0));
val streamingContext = new StreamingContext(sc, Seconds(1));
val lines = streamingContext.socketTextStream("localhost", 9999)
val virusRDD = sc.parallelize(Seq(("sr",1),("srini",2),("sujji",4),("virus",6)));
val wordCounts = lines.flatMap { x => x.split("\\s+") }.map { x => (x,1) }.reduceByKey{_+_}
// Here we cannot join word Counts with Virus RDD using Map or other functions.
// Becuase wordCounts is of DFrame and virus Data is of RDD. 
// To do this, we can use transform, which acts on the RDDs of DStream

val finalData = wordCounts.transform{x => {
x.sortBy(_._1, true);

Update State By Example:

package stream

import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._

object UpdateState {
  def main(args: Array[String]) {
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      println("state is " + state.get)
      values.foreach { x => {println("value  is " + x)} }
      val currentCount = values.sum
      println("current sum is " + currentCount)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = => (x, 1))

    val output = wordDstream.updateStateByKey(updateFunc)

Simplest way for Running Spark on Windows machine

I have posted this solution as an answer to one of the stackoverflow question.

You can run spark jobs in windows machine. But, it needs few additional things. There are couple of files which are required in Hadoop Home.

1) winutils.exe
2) winutils.dll


1) Download latest version of Hadoop from hadoop website.
2) Download winutils.exe and winutils.dll from below link. 
3) Copy winutils.exe and winutils.dll from that folder to your $HADOOP_HOME/bin.

4) or at the command, and add HADOOP_HOME/bin to PATH in environment variables. You can go to Advanced System settings and choose environment variables and do this. 

After this run the Spark jobs.