Accumulators:
These are global variables and they can be accessed even in distributed mode.
They need three things
1) Declare an accumulator using sc.accumulator(<initialValue>)
2) if scala, then use += to add any value of your wish (eg: accum += 1), if java, then method add
(eg: accum.add(1))
3) get the value of the accumulator using value() method (eg: accum.value())
Note: Accumulators generally used in transformations. In order get the actual value, you need to perform an action first. such as, saveAsTextFile, count etc.
Accumulators are write only variables
Accumulators in transformations may be calculated multiple times in transformations if there are failures or side effects. So its always advisable to use them with actions.
Custom Accumulators:
Custom Accumulator has to extend AccumulatorParam and tow methods
1) for Initialization - method zero
2) for Addition - method addInPlace
Scala Example for Custom Accumulator
object CustomAccumulatorExmp {
def main(args: Array[String]) {
val conf = new SparkConf
val sc = new SparkContext(conf)
val acc = sc.accumulator(List(0))(ListAccumulator)
val lines = sc.textFile(args(0));
val lengths = lines.map { string =>
{
acc += List(string.size);
println("length is " + string.length);
string.length();
}
}
lengths.count();
lengths.first()
acc.value.foreach { x => println(" -- " + x); }
}
object ListAccumulator extends AccumulatorParam[List[Int]] {
def zero(initialValue: List[Int]): List[Int] = {
return initialValue
}
def addInPlace(v1: List[Int], v2: List[Int]): List[Int] = {
v1 ::: v2
}
}
}
BroadCast Variables
BroadCast variables are shared variables across the worker nodes, for one or more operations. They come in handy for Look Up kind of operations, or Map Joins etc.
It is important to chose right Serialization format that is fast and
Example Code in Java:
I have written an example to calculate simple sentiment analysis in java using broadcast variables and accumulators.
Note:This usecase is just to demonstrate accumulators and broadcast variables. So, please do not mind about the use case.
The code written to analyze the sentiment for Two Pro Kabaddi League teams Hyderabad and Banglore from given text. Simply tokenizes the whole text and searches for pre-defined set of words that maps to both the teams and when encountered the match, accumulates the points.
Percentage gets calculated based on the total points in the end.
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
public class SentimentAnalysisWithBoradCast {
public static void main(String... args){
Map<String, Integer> teluguMap = new HashMap<String, Integer>();
teluguMap.put("Titans",1);
teluguMap.put("Hyderabad",1);
teluguMap.put("Telugu",1);
teluguMap.put("Andhra",1);
teluguMap.put("Nizam",1);
teluguMap.put("Rahul",1);
Map<String, Integer> bangloreMap = new HashMap<String, Integer>();
bangloreMap.put("Bulls",1);
bangloreMap.put("Blore",1);
bangloreMap.put("Banglore",1);
bangloreMap.put("Mohit",1);
bangloreMap.put("Chillar",1);
bangloreMap.put("Karnataka",1);
SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(conf);
final Broadcast<Map<String, Integer>> telugu = sc.broadcast(teluguMap);
final Broadcast<Map<String, Integer>> banglore = sc.broadcast(bangloreMap);
final Accumulator<Integer> teluguPoints= sc.accumulator(0);
final Accumulator<Integer> banglorePoints= sc.accumulator(0);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){
public Iterable<String> call(String input){
return Arrays.asList(input.split(" "));
}
});
words.foreach(new VoidFunction<String>(){
public void call(String input){
if(telugu.value().get(input) != null)
{
teluguPoints.add(1);
}
if(banglore.value().get(input) != null)
{
banglorePoints.add(1);
}
}
});
int totalPoints = banglorePoints.value() + teluguPoints.value();
System.out.printf("percentage of Banglore Bulls Sentiment is " + (banglorePoints.value()/(totalPoints * 1.00)) * 100);
sc.close();
}
}
Pipes:
Can execute any executable file and pass arguments to it using pipe.
Example
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkFiles
/**
* @author lenovo
*/
object PipesExample {
def main(args:Array[String]){
val conf = new SparkConf();
val sc = new SparkContext(conf);
val distScript = "/usr/local/test.pl"
sc.addFile(distScript)
val rdd = sc.parallelize(Array("37.75889318222431"))
val piped = rdd.pipe(Seq(SparkFiles.get("test.pl")),Map("SEPARATOR" -> ","))
println(" -- " + piped.collect().mkString(" "));
}
}
These are global variables and they can be accessed even in distributed mode.
They need three things
1) Declare an accumulator using sc.accumulator(<initialValue>)
2) if scala, then use += to add any value of your wish (eg: accum += 1), if java, then method add
(eg: accum.add(1))
3) get the value of the accumulator using value() method (eg: accum.value())
Note: Accumulators generally used in transformations. In order get the actual value, you need to perform an action first. such as, saveAsTextFile, count etc.
Accumulators are write only variables
Accumulators in transformations may be calculated multiple times in transformations if there are failures or side effects. So its always advisable to use them with actions.
Custom Accumulators:
Custom Accumulator has to extend AccumulatorParam and tow methods
1) for Initialization - method zero
2) for Addition - method addInPlace
Scala Example for Custom Accumulator
object CustomAccumulatorExmp {
def main(args: Array[String]) {
val conf = new SparkConf
val sc = new SparkContext(conf)
val acc = sc.accumulator(List(0))(ListAccumulator)
val lines = sc.textFile(args(0));
val lengths = lines.map { string =>
{
acc += List(string.size);
println("length is " + string.length);
string.length();
}
}
lengths.count();
lengths.first()
acc.value.foreach { x => println(" -- " + x); }
}
object ListAccumulator extends AccumulatorParam[List[Int]] {
def zero(initialValue: List[Int]): List[Int] = {
return initialValue
}
def addInPlace(v1: List[Int], v2: List[Int]): List[Int] = {
v1 ::: v2
}
}
}
BroadCast Variables
BroadCast variables are shared variables across the worker nodes, for one or more operations. They come in handy for Look Up kind of operations, or Map Joins etc.
It is important to chose right Serialization format that is fast and
Example Code in Java:
I have written an example to calculate simple sentiment analysis in java using broadcast variables and accumulators.
Note:This usecase is just to demonstrate accumulators and broadcast variables. So, please do not mind about the use case.
The code written to analyze the sentiment for Two Pro Kabaddi League teams Hyderabad and Banglore from given text. Simply tokenizes the whole text and searches for pre-defined set of words that maps to both the teams and when encountered the match, accumulates the points.
Percentage gets calculated based on the total points in the end.
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
public class SentimentAnalysisWithBoradCast {
public static void main(String... args){
Map<String, Integer> teluguMap = new HashMap<String, Integer>();
teluguMap.put("Titans",1);
teluguMap.put("Hyderabad",1);
teluguMap.put("Telugu",1);
teluguMap.put("Andhra",1);
teluguMap.put("Nizam",1);
teluguMap.put("Rahul",1);
Map<String, Integer> bangloreMap = new HashMap<String, Integer>();
bangloreMap.put("Bulls",1);
bangloreMap.put("Blore",1);
bangloreMap.put("Banglore",1);
bangloreMap.put("Mohit",1);
bangloreMap.put("Chillar",1);
bangloreMap.put("Karnataka",1);
SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(conf);
final Broadcast<Map<String, Integer>> telugu = sc.broadcast(teluguMap);
final Broadcast<Map<String, Integer>> banglore = sc.broadcast(bangloreMap);
final Accumulator<Integer> teluguPoints= sc.accumulator(0);
final Accumulator<Integer> banglorePoints= sc.accumulator(0);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){
public Iterable<String> call(String input){
return Arrays.asList(input.split(" "));
}
});
words.foreach(new VoidFunction<String>(){
public void call(String input){
if(telugu.value().get(input) != null)
{
teluguPoints.add(1);
}
if(banglore.value().get(input) != null)
{
banglorePoints.add(1);
}
}
});
int totalPoints = banglorePoints.value() + teluguPoints.value();
System.out.printf("percentage of Banglore Bulls Sentiment is " + (banglorePoints.value()/(totalPoints * 1.00)) * 100);
sc.close();
}
}
Pipes:
Can execute any executable file and pass arguments to it using pipe.
Example
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkFiles
/**
* @author lenovo
*/
object PipesExample {
def main(args:Array[String]){
val conf = new SparkConf();
val sc = new SparkContext(conf);
val distScript = "/usr/local/test.pl"
sc.addFile(distScript)
val rdd = sc.parallelize(Array("37.75889318222431"))
val piped = rdd.pipe(Seq(SparkFiles.get("test.pl")),Map("SEPARATOR" -> ","))
println(" -- " + piped.collect().mkString(" "));
}
}
Excellent article. Very interesting to read. I really love to read such a nice article. Thanks! keep rocking. Big Data Hadoop Online Course
ReplyDelete