Sunday 31 January 2016

Spark Streaming Notes

What is Spark Streaming:

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.

In Short, Processing Huge data that is real time both in terms of processing and results.

Much like Spark is built on the concept of RDDs, Spark Streaming provides an
abstraction called DStreams, or discretized streams. A DStream is a sequence of data
arriving over time.

Spark Streaming uses a “micro-batch” architecture, where the streaming computation is treated as a continuous series of batch computations on small batches of data. Spark Streaming receives data from various input sources and groups it into small batches. New batches are created at regular time intervals. At the beginning of each time interval a new batch is created, and any data that arrives during that interval gets added to that batch. At the end of the time interval the batch is done growing. The size of the time intervals is determined by a parameter called the batch interval. The batch interval is typically between 500 milliseconds and several seconds, as configured by the application developer. Each input batch forms an RDD, and is processed using Spark jobs to create other RDDs. The processed results can then be pushed out to external systems in batches.
DStream: Core Concept of Spark StreamingInternally, each DStream is represented as a sequence of RDDs arriving at each time step (hence the name “discretized”). DStreams can be created from various input sources, such as Flume, Kafka, or HDFS. Once built, they offer two types of operations: transformations, which yield a new DStream, and output operations, which write data to an external system.
DStreams provide many of the same operations available on RDDs, plus new operations related to time, such as sliding windows.

Socket Listening Example:
Socket listening means, the application listens the specific TCP Port continuously and when there is a message/event at that port, it gets picked up and processed.


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds


object SocketListening {
   def main(args:Array[String]){
     val conf = new SparkConf();
     val sc = new SparkContext();
     val streamingContext = new StreamingContext(sc, Seconds(1));
    
     val dStream = streamingContext.socketTextStream("localhost", 9999);
    
     val heLines = dStream.filter(_.contains("error"))
    
     heLines.print()
    
     streamingContext.start();
    
     streamingContext.awaitTermination()
}
To start receiving data, we must explicitly call start() on the StreamingContext. Then, Spark Streaming will start to schedule Spark jobs on the underlying SparkContext. This will occur in a separate thread, so to keep our application from exiting, we also need to call awaitTermination to wait for the streaming computation to finish.
Note that a streaming context can be started only once, and must be started after we
set up all the DStreams and output operations we want.
Transformations on DStreamsThey can be grouped into either stateless or stateful:
• StateLess: In stateless transformations the processing of each batch does not depend on the
data of its previous batches. They include the common RDD transformations we
have seen in Chapters 3 and 4, like map(), filter(), and reduceByKey().
• Stateful : In this type of transformations, in contrast, use data or intermediate results from previous
batches to compute the results of the current batch. They include transformations
based on sliding windows and on tracking state across time.
Transform It is not like regular transformations. The transform operation (along with its variations like transformWith) allows arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD operation that is not exposed in the DStream API. For example, the functionality of joining every batch in a data stream with another dataset is not directly exposed in the DStream API. However, you can easily use transform to do this. This enables very powerful possibilities. For example, one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.
Map
Works on each rows
MapPartitions
Works on each partition
Transform
Works on each rdd


Scala Code:




Answer:

To clarify further:
A yourDStream.map(record => yourFunction(record)) will do something on every record in every RDDs in the DStream. Which essentially means every records in the DStream. ButyourDStream.transform(rdd => anotherFunction(rdd)) allows you to do arbitrary stuff on every RDD in the DStream.
For example, if you do yourDStream.transform(rdd => rdd.map(record => yourFunction(record)) is exactly same as the one in the first line. Only a map function.
However, you can also do
yourDStream.transform(rdd => rdd.map(...).reduceByKey(....).filter(...).flatMap(....).sortByKey(...) ) which obviously involves multiple stages of shuffles by keys. So transform is far more general operation than map that allows arbitrary computations on each RDD of a DStream. For example, say you want to sort every batch of data by a key. Currerntly, there is no DStream.sortByKey() to do that. However, you can easily use transform to do DStream.transform(rdd => rdd.sortByKey()).

Transform Code Example:
package stream
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec._
/**
 * @author lenovo
 */
case class Word(word:String, count:Int);
object TransformExample {
  def main(args:Array[String]){
val conf = new SparkConf();
val sc = new SparkContext(conf);
val sqlContext = new SQLContext(sc);
val df = sqlContext.read.format("json").load(args(0));
val streamingContext = new StreamingContext(sc, Seconds(1));
val lines = streamingContext.socketTextStream("localhost", 9999)
val virusRDD = sc.parallelize(Seq(("sr",1),("srini",2),("sujji",4),("virus",6)));
val wordCounts = lines.flatMap { x => x.split("\\s+") }.map { x => (x,1) }.reduceByKey{_+_}
// Here we cannot join word Counts with Virus RDD using Map or other functions.
// Becuase wordCounts is of DFrame and virus Data is of RDD. 
// To do this, we can use transform, which acts on the RDDs of DStream

val finalData = wordCounts.transform{x => {
x.sortBy(_._1, true);
   x.join(virusRDD) 
  }
}
   finalData.print() 
   streamingContext.start();
   streamingContext.awaitTermination();
  }
}


Update State By Example:

package stream

import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._

object UpdateState {
  def main(args: Array[String]) {
    
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      
      println("state is " + state.get)
      
      values.foreach { x => {println("value  is " + x)} }
      
      val currentCount = values.sum
      
      println("current sum is " + currentCount)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    
    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(".")

    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    val output = wordDstream.updateStateByKey(updateFunc)
    
    output.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Simplest way for Running Spark on Windows machine

I have posted this solution as an answer to one of the stackoverflow question.

You can run spark jobs in windows machine. But, it needs few additional things. There are couple of files which are required in Hadoop Home.

1) winutils.exe
2) winutils.dll


Steps:

1) Download latest version of Hadoop from hadoop website.
2) Download winutils.exe and winutils.dll from below link. 
3) Copy winutils.exe and winutils.dll from that folder to your $HADOOP_HOME/bin.

4) or at the command, and add HADOOP_HOME/bin to PATH in environment variables. You can go to Advanced System settings and choose environment variables and do this. 

After this run the Spark jobs.