Sunday 16 August 2015

Scala and Spark Notes.

Adding dependency in SBT:

Eg:

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1";

Sample SBT:

name := "spark_srini_scala"

version := "1.0.0"

scalaVersion := "2.11.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1";
libraryDependencies += "com.fasterxml.jackson.core" % "jackson-databind" % "2.2.2";
libraryDependencies += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.2.2";

Adding SBT Eclipse Plugin:
Go to scala folder in your system:
eg: SBT Folder in my system

C:\Users\lenovo\.sbt\0.13

Create a folder with name plugins in this folder.
and create plugins.sbt file with the following lines

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")

Building Scala Project: Command

sbt clean package

Reading and Writing JSON in Scala
1) Create a top Level Class with Person
2) Try below code

val person = Person("fred", 25)
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)    

val out = new StringWriter
mapper.writeValue(out, person)
val json = out.toString()
println(json)

val person2 = mapper.readValue(json, classOf[Person])
println(person2)


Reading JSON File in Spark:

package spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

/**
 * @author Srini
 */

case class JsonInput(name: String, occupation: String);
 
 object JSONExample { 
 def main(args: Array[String]) {
    val conf = new SparkConf();
    val sc = new SparkContext(conf);

    val lines = sc.textFile(args(0));
    
    println("========= input file is args(0)");
    
    val result = lines.mapPartitions(y => {
      val mapper = new ObjectMapper();
      mapper.registerModule(DefaultScalaModule);
    
      y.flatMap( x => {println("line is " + x);Some(mapper.readValue(x, classOf[JsonInput]))}) 
    }, true)
    
    val mapper = new ObjectMapper();  
    
    val y = result.map(mapper.writeValueAsString(_))
    y.saveAsTextFile(args(1))
  }
}

Saving as Hadoop File:
 
public static class ConvertToWritableTypes implements
PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
            public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
            return new Tuple2(new Text(record._1), new IntWritable(record._2));
       }
}

JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(input);
JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
result.saveAsHadoopFile(fileName, Text.class, IntWritable.class,
SequenceFileOutputFormat.class);  
 
Reading Compressed file can be done using newHadoopApiFile
 
sc.newAPIHadoopFile(inputFile, classOf[LzoJsonInputFormat],
classOf[LongWritable], classOf[MapWritable], conf)
 
 
Writing ProtoBuf file in Spark


  val job = new Job()
  val conf = job.getConfiguration
  LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue], conf);
  val dnaLounge = Places.Venue.newBuilder()
  dnaLounge.setId(1);
  dnaLounge.setName("DNA Lounge")
  dnaLounge.setType(Places.Venue.VenueType.CLUB)
  val data = sc.parallelize(List(dnaLounge.build()))
  val outputData = data.map { pb =>
    val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
    protoWritable.set(pb)
    (null, protoWritable)
  }
  outputData.saveAsNewAPIHadoopFile(outputFile, classOf[Text],
    classOf[ProtobufWritable[Places.Venue]],
    classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]], conf)


Scala's textFile can read the Compressed Files automatically, but it will disable splitting.
If Splitting is important, go for newApiHadoopFile

Accessing Hive data from Spark:

import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0))

JSONs can be simply accessed using the Hive Context. with jsonFile function. 

val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")

JDBC Load Sample
 
 object LoadSimpleJdbc {
      def main(args: Array[String]) {
        if (args.length < 1) {
          println("Usage: [sparkmaster]")
          exit(1)
        }
        val master = args(0)
        val sc = new SparkContext(master, "LoadSimpleJdbc", System.getenv("SPARK_HOME"))
        val data = new JdbcRDD(sc,
          createConnection, "SELECT * FROM panda WHERE ? <= id AND ID <= ?",
          lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
        println(data.collect().toList)
      }
      def createConnection() = {
        Class.forName("com.mysql.jdbc.Driver").newInstance();
        DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
      }
      def extractValues(r: ResultSet) = {
        (r.getInt(1), r.getString(2))
      }
    }


we provide a query that can read a range of the data, as well as a lower
Bound and upperBound value for the parameter to this query. These parameters
allow Spark to query different ranges of the data on different machines, so we
don’t get bottlenecked trying to load all the data on a single node.
 
No SQL DBs: 
For Hbase we need to set below parameter
 
conf.set(TableInputFormat.INPUT_TABLE, "tablename")
 
For Cassandra

conf.set("spark.cassandra.connection.host", "hostname")
Reading from Cassandra can be done using 
sc.cassandraTable

For Elastic Search
 
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1))
jobConf.set(ConfigurationOptions.ES_NODES, args(2)) 
 
For HBase and Elastic Search

Elastic Search:
val currentTweets = sc.hadoopRDD(jobConf,
classOf[EsInputFormat[Object, MapWritable]], classOf[Object],
classOf[MapWritable])

HBase:

val rdd = sc.newAPIHadoopRDD(
conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])


1 comment:

  1. Lucky Club: Best Casino Site for Bonuses, Software and Games
    Lucky Club offers online slots and casino games from the provider Microgaming. Sign up and claim your welcome bonus and discover the best luckyclub.live casino

    ReplyDelete