Monday, 31 August 2015

How to see Logger Debug messages in Hadoop Map Reduce program while executing.

If your programing has Logger like below.

final static org.slf4j.Logger log = LoggerFactor.getLogger(MyMapperProgram.class);

and if you have written any debug messages,

using logger.debug("Hey i am here")

Now, if you want to see this message, normal logger properties or overriding any logger property from command will not be sufficient,

You can execute below command, before running the job in command line.

export HADOOP_ROOT_LOGGER="DEBUG,console"

With this you will be able to see the debug messages.

Wednesday, 26 August 2015

Return Types for all important RDD and Pair RDD Functions with Examples

I have collected some important RDD Functions and collected their return types for reference. 

They are as follows:

Taking two Sequences as inputs:

rdd1 = {"sr","sr2","sr3","sr4"}
rdd2 = {"cs1","cs2","cs3","cs4"} 

cartesian = RDD[(T, U)] - Pairs with each element against all the elements of other RDD

eg: rdd1.cartesian(rdd2)
result: RDD[(String, String)]

(sr,cs3)
(sr,cs4)
(sr,cs1)
(sr,cs2)
(sr2,cs1)
(sr2,cs2)
(sr2,cs3)
(sr2,cs4)
(sr3,cs1)
(sr3,cs2)
(sr4,cs1)
(sr4,cs2)
(sr3,cs3)
(sr3,cs4)
(sr4,cs3)
(sr4,cs4)


collect = Array[T]

eg: rdd1.collect()
result: Array(sr, sr2, sr3, sr4)

count = Long

rdd1.count()
Long = 4


countApprox = PartialResult[BoundedDouble]

rdd1.countApprox(1,0.95)
(final: [4.000, 4.000])

countByValue = Map[T, Long]

rdd1.countByValue()
Map(sr3 -> 1, sr -> 1, sr4 -> 1, sr2 -> 1)

dependencies = Seq[spark.Dependency[_]]

glom = RDD[Array[T]]
Returns number of Arrays as many as number of partitions, for each Partition, and with elements in it as Array

group By = RDD[(K, Seq[T])]

rdd1.groupBy(x => x.length)

result:
(2,CompactBuffer(sr))
(3,CompactBuffer(sr2, sr3, sr4))
 


key By = RDD[(K, T)]

rdd1.keyBy(x => x.length)

(2,sr)
(3,sr2)
(3,sr3)
(3,sr4)


paritioner = Option[Partitioner]

partitions = Array[Partition]

zip = RDD[(T, U)] - RDDs with same number of elements can only be zipped to gether. 

rdd1.zip(rdd2)

(sr,cs1)
(sr2,cs2)
(sr3,cs3)
(sr4,cs4)



Pairs RDD Functions and thier Return Types: 

 
p1 = {(1,sr1),(1,sri1),(2,sr2),(3,sr3),(5,sr5)}
p2 = {(2,cs2),(4,cs4),(1,cs1)}

ip1 = {(1,1),(2,2),(3,3),(1,5),(2,6),(3,7)}
ip2 = {(1,6),(2,7),(3,8),(4,9),(1,11)}
 

cogroup : RDD[(K, (Seq[V], Seq[W]))]

p1.cogroup(p2)
(5,(CompactBuffer(sr5),CompactBuffer()))
(3,(CompactBuffer(sr3),CompactBuffer()))
(1,(CompactBuffer(sr1, sri1),CompactBuffer(cs1)))
(4,(CompactBuffer(),CompactBuffer(cs4)))
(2,(CompactBuffer(sr2),CompactBuffer(cs2)))


p1.cogrou(ip1)

(2,(CompactBuffer(sr2),CompactBuffer(2, 6)))
(1,(CompactBuffer(sr1, sri1),CompactBuffer(1, 5)))
(3,(CompactBuffer(sr3),CompactBuffer(3, 7)))
(5,(CompactBuffer(sr5),CompactBuffer()))

p1.cogroup(p2, ip1)
(4,(CompactBuffer(),CompactBuffer(cs4),CompactBuffer()))
(1,(CompactBuffer(sr1, sri1),CompactBuffer(cs1),CompactBuffer(1, 5)))
(3,(CompactBuffer(sr3),CompactBuffer(),CompactBuffer(3, 7)))
(5,(CompactBuffer(sr5),CompactBuffer(),CompactBuffer()))
(2,(CompactBuffer(sr2),CompactBuffer(cs2),CompactBuffer(2, 6)))
 

groupByKey(): RDD[(K, Seq[V])]

(2,CompactBuffer(sr2))
(1,CompactBuffer(sr1, sri1))
(3,CompactBuffer(sr3))
(5,CompactBuffer(sr5))

groupWith - RDD[(K, (Seq[V], Seq[W]))]

p1.groupWith(ip1)
(2,(CompactBuffer(sr2),CompactBuffer(2, 6)))
(1,(CompactBuffer(sr1, sri1),CompactBuffer(1, 5)))
(3,(CompactBuffer(sr3),CompactBuffer(3, 7)))
(5,(CompactBuffer(sr5),CompactBuffer()))


Join  - RDD[(K, (V, W))]

p1.join(p2)

(2,(sr2,cs2))
(1,(sr1,cs1))
(1,(sri1,cs1))

ip1.join(ip2)

(2,(2,7))
(2,(6,7))
(1,(1,6))
(1,(1,11))
(1,(5,6))
(1,(5,11))
(3,(3,8))
(3,(7,8))


Left Outer Join = RDD[(K, (V, Option[W]))]

ip1.leftOuterJoin(ip2)

(1,(1,Some(6)))
(1,(1,Some(11)))
(1,(5,Some(6)))
(1,(5,Some(11)))
(3,(3,Some(8)))
(3,(7,Some(8)))
(2,(2,Some(7)))
(2,(6,Some(7)))

 Right Outer Join = RDD[(K, (Option[V], W))
(1,(Some(1),6))
(1,(Some(1),11))
(1,(Some(5),6))
(1,(Some(5),11))
(3,(Some(3),8))
(3,(Some(7),8))
(4,(None,9))
(2,(Some(2),7))
(2,(Some(6),7))

Look Up = Seq[V]

ip1.lookup(1)
Seq[Int] = WrappedArray(1, 5)

ip1.lookup(4)
Seq[Int] = WrappedArray()

Thursday, 20 August 2015

Learning Spark - Chapter 8 Notes

Configuration precedence for Spark

1) 1st Priority to configuration used with set() in code
2) 2nd Priority to Command line arguments
3) 3rd Priority to Properties File
4) 4th Priority to Default Properties.




Configuration Property
Default Value
Description
spark.executor.memory
(--executor-memory)
-512m
Amount of memory to use per executor process, in the same format as JVM memory strings (e.g., 512m, 2g)
spark.executor.cores

(--executor-cores
--totalexecutor-cores)
1
Configurations for bounding the number of cores used by the application. In YARN mode spark.executor.cores will assign a specific number of cores to each executor. In standalone and Mesos modes, you can upper-bound the total number of cores across all executors using spark.cores.max.
spark.speculation
False
Setting to true will enable speculative execution of tasks. This means tasks that are running slowly will have a second copy launched on another node. Enabling this can help cut down on straggler tasks in large clusters.
spark.storage.
blockManager
TimeoutIntervalMs
45000
An internal timeout used for tracking the liveness of executors. For jobs that have long garbage collection pauses, tuning this to be 100 seconds (a value of 100000) or higher can prevent thrashing. In future versions of Spark this may be replaced with a general timeout setting, so check current documentation.

spark.executor.
extraJavaOptions                           

spark.executor.
extraClassPath

spark.executor.
extraLibraryPath


These three options allow you to customize the launch behavior of executor JVMs. The three flags add extra Java options, classpath entries, or path entries for the JVM library path. These parameters should be specified as strings (e.g., spark.executor.extraJavaOptions="- XX:+PrintGCDetails-XX:+PrintGCTi meStamps"). Note that while this allows you to manually augment the executor classpath, the recommended way to add dependencies is through the --jars flag to spark-submit (not using this option)
spark.serializer
org.apache.spark.serializer.JavaSerializer
Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. The default of Java Serialization works with any serializable Java object but is quite slow, so we recommend using org.apache.spark.seri alizer.KryoSerializer and configuring Kryo serialization when speed is necessary. Can be any subclass of org.apache.spark.Serial izer.
spark.eventLog.enabled
False
Set to true to enable event logging, which allows completed Spark jobs to be viewed using a history server.
spark.eventLog.dir
file:///<file_path>          
The storage location used for event logging, if enabled. This can set to any File System eg:HDFS.





SPARK_LOCAL_DIRS     Local Directory that spark should use for Shuffle. This property cannot be set using the configuration object.

Spark's default cache operation persists using MEMORY_ONLY storage level.

MEMORY_AND_DESK persists RDDs to Memory and if the storage is not sufficient, then drops the old RDDs to DISK and reads them back to memory, when they are needed.


Some Key points about Spark Shuffle Operation

What is Shuffle.?

The shuffle is Spark’s mechanism for re-distributing data so that is grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.


Performance Impact

  • The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. 
  •  Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them. Specifically, reduceByKey and aggregateByKey create these structures on the map side and 'ByKey operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.
  •  Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are not cleaned up from Spark’s temporary storage until Spark is stopped, which means that long-running Spark jobs may consume available disk space. This is done so the shuffle doesn’t need to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark context.
  • Operations that involve Shuffle 
                      repartition, coalesce, By Key Operations Except Count By Key, Join, Co Group

  
ReduceByKey vs. GroupByKey

Reduce By Key is more efficient than Group By Key, as Reduce involves grouping operations at each partition. A concept similar to Hadoop's Combiner


Word Count example to understand the difference between Reduce By Key and Group By Key.



Reduce By Key:  The reduce operation happens for each partition, and then the results will be shuffled.



Group By Key: Each and every element gets suffled first, and then grouping operation happens. 

Conclusion: Reduce By Key might not fit or solve all the Group By Operations, but wherever possible, it is better to opt for Reduce By Key, Fold By Key or Combine By Key, instead of Group By.

Sunday, 16 August 2015

Learning Spark - Chapter 7 Notes

Spark has pluggable cluster manager. We can configure it with Yarn, Mesos Cluster and can work with Spark's Stand Alone Spark Cluster.


In distributed mode, Spark uses a master/slave architecture with one central coordinator

and many distributed workers. The central coordinator is called the driver.


The driver communicates with a potentially large number of distributed workers called

executors. The driver runs in its own Java process and each executor is a separate Java

process. A driver and its executors are together termed a Spark application.

Driver is process where main() method of your program runs and it is the process running user code that creates Spark Context, RDDs and performs transformations and actions.




When the driver runs, it performs two duties:



1) It is responsible for converting a user program into units of physical execution called tasks.

 At a high level, all Spark programs follow the same structure:

    They create RDDs from some input, derive new RDDs from those using transformations, and perform actions to collect or save data. A Spark program implicitly creates a logical directed acyclic graph (DAG) of operations.When the driver runs, it converts this logical graph into a physical execution


Spark performs several optimizations, such as “pipelining” map transformations together to merge them, and converts the execution graph into a set of stages. Each stage, in turn, consists of multiple tasks. The tasks are bundled up and prepared to be sent to the cluster. Tasks are the smallest unit of work in Spark; a typical user program can launch hundreds or thousands of individual tasks.


2)Scheduling tasks on executors


Given a physical execution plan, a Spark driver must coordinate the scheduling of individual tasks on executors. When executors are started they register themselves with the driver, so it has a complete view of the application’s executors at all times. Each executor represents a process capable of running tasks and storing RDD data. The Spark driver will look at the current set of executors and try to schedule each task in an appropriate location, based on data placement. When tasks execute, they may have a side effect of storing cached data. The driver also tracks the location of cached data and uses it to schedule future tasks that access that data.






The driver exposes information about the running Spark application through a web interface, which by default is available at port 4040. For instance, in local mode, this UI is available at http://localhost:4040


Executors
Spark executors are worker processes responsible for running the individual tasks in
a given Spark job. Executors are launched once at the beginning of a Spark application
and typically run for the entire lifetime of an application, though Spark applications
can continue if executors fail. Executors have two roles. First, they run the tasks
that make up the application and return results to the driver. Second, they provide
in-memory storage for RDDs that are cached by user programs, through a service
called the Block Manager that lives within each executor. Because RDDs are cached
directly inside of executors, tasks can run alongside the cached data.


Values for master

spark://host:port -> connect to a Spark Standalone cluster at the specified port. By default Spark Standalone masters use port 7077.
mesos://host:port  -> Connect to a Mesos cluster master at the specified port. By default Mesos masters listen on port 5050.
yarn -> Connect to a YARN cluster. When running on YARN you’ll need to set the HADOOP_CONF_DIR environment variable to point the location of your Hadoop configuration directory, which contains information about the cluster.

local Run in local mode with a single core.
local[N] Run in local mode with N cores.
local[*] Run in local mode and use as many cores as the machine has.

Deploy Mode Importance:

Whether to launch the driver program locally (“client”) or on one of the worker machines inside the cluster (“cluster”). In client mode spark-submit will run your driver on the same machine where spark-submit is itself being invoked. In cluster mode, the driver will be shipped to execute on a worker node in the cluster. The default is client mode. 

Few Points about Mesos:
 
Unlike the other cluster managers, Mesos offers two modes to share resources between executors on the same cluster.

1) Fine Grained Mode: In “fine-grained” mode, which is the default, executors scale up and down the number of CPUs they claim from Mesos as they execute tasks, and so a machine running multiple executors can dynamically share CPU resources between them.

2) “coarse-grained” mode, Spark allocates a fixed number of CPUs to each executor in advance and never releases them until the application ends, even if the executor is not currently running tasks.

You can enable coarse-grained: mode by passing --conf spark.mesos.coarse=true to spark-submit

The fine-grained Mesos mode is attractive when multiple users share a cluster to run interactive workloads such as shells, because applications will scale down their number of cores when they’re not doing work and still allow other users’ programs to use the cluster. The downside, however, is that scheduling tasks through fine-grained mode adds more latency

you can use a mix of scheduling modes in the same Mesos cluster (i.e., some of your Spark applications might have spark.mesos.coarse set to true and some might not).

Spark on Mesos supports running applications only in the “client” deploy mode—that is, with the driver running on the machine that submitted the application. If you would like to run your driver in the Mesos cluster as well, frameworks like Aurora and Chronos allow you to submit arbitrary scripts to run on Mesos and monitor them.