Sunday, 16 August 2015

Learning Spark Notes: Advanced Spark Programming - Chapter 6

Accumulators:

These are global variables and they can be accessed even in distributed mode.

They need three things

1) Declare an accumulator using sc.accumulator(<initialValue>)
2) if scala, then use += to add any value of your wish (eg: accum += 1), if java, then method add
(eg: accum.add(1))
3) get the value of the accumulator using value() method (eg: accum.value())

Note: Accumulators generally used in transformations. In order get the actual value, you need to perform an action first.  such as, saveAsTextFile, count etc.

Accumulators are write only variables

Accumulators in transformations may be calculated multiple times in transformations if there are failures or side effects. So its always advisable to use them with actions.

Custom Accumulators:

Custom Accumulator has to extend AccumulatorParam  and tow methods
1) for Initialization - method zero
2) for Addition - method addInPlace

Scala Example for Custom Accumulator

object CustomAccumulatorExmp {
  def main(args: Array[String]) {
    val conf = new SparkConf
    val sc = new SparkContext(conf)

    val acc = sc.accumulator(List(0))(ListAccumulator)
    val lines = sc.textFile(args(0));

    val lengths = lines.map { string =>
      {
        acc += List(string.size);
        println("length is " + string.length);
        string.length();
      }
    }

    lengths.count();
    lengths.first()
    acc.value.foreach { x => println(" -- " + x); }

  }

  object ListAccumulator extends AccumulatorParam[List[Int]] {
    def zero(initialValue: List[Int]): List[Int] = {
      return initialValue
    }

    def addInPlace(v1: List[Int], v2: List[Int]): List[Int] = {
      v1 ::: v2
    }
  }
}


BroadCast Variables
BroadCast variables are shared variables across the worker nodes, for one or more operations. They come in handy for Look Up kind of operations, or Map Joins etc.

It is important to chose right Serialization format that is fast and

Example Code in Java:

I have written an example to calculate simple sentiment analysis in java using broadcast variables and accumulators.

Note:This usecase is just to demonstrate accumulators and broadcast variables. So, please do not mind about the use case.

The code written to analyze the sentiment for Two Pro Kabaddi League teams Hyderabad and Banglore from given text. Simply tokenizes the whole text and searches for pre-defined set of words that maps to both the teams and when encountered the match, accumulates the points.

Percentage gets calculated based on the total points in the end.


import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;

public class SentimentAnalysisWithBoradCast {
   
    public static void main(String... args){
       
        Map<String, Integer> teluguMap = new HashMap<String, Integer>();
        teluguMap.put("Titans",1);
        teluguMap.put("Hyderabad",1);
        teluguMap.put("Telugu",1);
        teluguMap.put("Andhra",1);
        teluguMap.put("Nizam",1);
        teluguMap.put("Rahul",1);
       
        Map<String, Integer> bangloreMap = new HashMap<String, Integer>();
        bangloreMap.put("Bulls",1);
        bangloreMap.put("Blore",1);
        bangloreMap.put("Banglore",1);
        bangloreMap.put("Mohit",1);
        bangloreMap.put("Chillar",1);
        bangloreMap.put("Karnataka",1);
       
        SparkConf conf = new SparkConf();
        JavaSparkContext sc = new JavaSparkContext(conf);
        final Broadcast<Map<String, Integer>> telugu = sc.broadcast(teluguMap);
        final Broadcast<Map<String, Integer>> banglore = sc.broadcast(bangloreMap);
       
        final Accumulator<Integer> teluguPoints= sc.accumulator(0);
        final Accumulator<Integer> banglorePoints= sc.accumulator(0);
       
        JavaRDD<String> lines = sc.textFile(args[0]);
       
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){
            public Iterable<String> call(String input){
                return Arrays.asList(input.split(" "));
            }
        });
       
        words.foreach(new VoidFunction<String>(){
            public void call(String input){
               
                if(telugu.value().get(input) != null)
                {
                    teluguPoints.add(1);
                }
               
                if(banglore.value().get(input) != null)
                {
                    banglorePoints.add(1);
                }
            }
        });
       
        int totalPoints = banglorePoints.value() + teluguPoints.value();
       
        System.out.printf("percentage of Banglore Bulls Sentiment is " + (banglorePoints.value()/(totalPoints * 1.00)) * 100);
        sc.close(); 
    }
}



Pipes:

Can execute any executable file and pass arguments to it using pipe.
Example

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkFiles


/**
 * @author lenovo
 */

object PipesExample {
  def main(args:Array[String]){
    val conf = new SparkConf();
    val sc = new SparkContext(conf);
   
    val distScript = "/usr/local/test.pl"
    sc.addFile(distScript)
   
    val rdd = sc.parallelize(Array("37.75889318222431"))
   
    val piped = rdd.pipe(Seq(SparkFiles.get("test.pl")),Map("SEPARATOR" -> ","))
   
    println(" -- " + piped.collect().mkString(" "));
   
  }
}





 

1 comment:

  1. Excellent article. Very interesting to read. I really love to read such a nice article. Thanks! keep rocking. Big Data Hadoop Online Course

    ReplyDelete