Adding dependency in SBT:
Eg:
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1";
Sample SBT:
name := "spark_srini_scala"
version := "1.0.0"
scalaVersion := "2.11.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1";
libraryDependencies += "com.fasterxml.jackson.core" % "jackson-databind" % "2.2.2";
libraryDependencies += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.2.2";
Adding SBT Eclipse Plugin:
Go to scala folder in your system:
eg: SBT Folder in my system
C:\Users\lenovo\.sbt\0.13
Create a folder with name plugins in this folder.
and create plugins.sbt file with the following lines
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
Building Scala Project: Command
sbt clean package
Reading and Writing JSON in Scala
1) Create a top Level Class with Person
Eg:
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1";
Sample SBT:
name := "spark_srini_scala"
version := "1.0.0"
scalaVersion := "2.11.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1";
libraryDependencies += "com.fasterxml.jackson.core" % "jackson-databind" % "2.2.2";
libraryDependencies += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.2.2";
Adding SBT Eclipse Plugin:
Go to scala folder in your system:
eg: SBT Folder in my system
C:\Users\lenovo\.sbt\0.13
Create a folder with name plugins in this folder.
and create plugins.sbt file with the following lines
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
Building Scala Project: Command
sbt clean package
Reading and Writing JSON in Scala
1) Create a top Level Class with Person
2) Try below code
val person = Person("fred", 25)
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val out = new StringWriter
mapper.writeValue(out, person)
val json = out.toString()
println(json)
val person2 = mapper.readValue(json, classOf[Person])
println(person2)
Reading JSON File in Spark:
package spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
/**
* @author Srini
*/
case class JsonInput(name: String, occupation: String);
object JSONExample {
def main(args: Array[String]) {
val conf = new SparkConf();
val sc = new SparkContext(conf);
val lines = sc.textFile(args(0));
println("========= input file is args(0)");
val result = lines.mapPartitions(y => {
val mapper = new ObjectMapper();
mapper.registerModule(DefaultScalaModule);
y.flatMap( x => {println("line is " + x);Some(mapper.readValue(x, classOf[JsonInput]))})
}, true)
val mapper = new ObjectMapper();
val y = result.map(mapper.writeValueAsString(_))
y.saveAsTextFile(args(1))
}
}
Saving as Hadoop File:
public static class ConvertToWritableTypes implements
PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
return new Tuple2(new Text(record._1), new IntWritable(record._2));
}
}
JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(input); JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes()); result.saveAsHadoopFile(fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class);
Reading Compressed file can be done using newHadoopApiFile
sc.newAPIHadoopFile(inputFile, classOf[LzoJsonInputFormat],
classOf[LongWritable], classOf[MapWritable], conf)
Writing ProtoBuf file in Spark
val job = new Job() val conf = job.getConfiguration LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue], conf); val dnaLounge = Places.Venue.newBuilder() dnaLounge.setId(1); dnaLounge.setName("DNA Lounge") dnaLounge.setType(Places.Venue.VenueType.CLUB) val data = sc.parallelize(List(dnaLounge.build())) val outputData = data.map { pb => val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]); protoWritable.set(pb) (null, protoWritable) } outputData.saveAsNewAPIHadoopFile(outputFile, classOf[Text], classOf[ProtobufWritable[Places.Venue]], classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]], conf)
Scala's textFile can read the Compressed Files automatically, but it will disable splitting.
If Splitting is important, go for newApiHadoopFile
Accessing Hive data from Spark:
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0))
JSONs can be simply accessed using the Hive Context. with jsonFile function.
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")
JDBC Load Sample
object LoadSimpleJdbc {
def main(args: Array[String]) {
if (args.length < 1) {
println("Usage: [sparkmaster]")
exit(1)
}
val master = args(0)
val sc = new SparkContext(master, "LoadSimpleJdbc", System.getenv("SPARK_HOME"))
val data = new JdbcRDD(sc,
createConnection, "SELECT * FROM panda WHERE ? <= id AND ID <= ?",
lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)
}
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r: ResultSet) = {
(r.getInt(1), r.getString(2))
}
}
we provide a query that can read a range of the data, as well as a lower
Bound and upperBound value for the parameter to this query. These parameters
allow Spark to query different ranges of the data on different machines, so we
don’t get bottlenecked trying to load all the data on a single node.
No SQL DBs:
For Hbase we need to set below parameter
conf.set(TableInputFormat.INPUT_TABLE, "tablename")
For Cassandra
conf.set("spark.cassandra.connection.host", "hostname")
Reading from Cassandra can be done using
sc.cassandraTable
For Elastic Search
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1))
jobConf.set(ConfigurationOptions.ES_NODES, args(2))
For HBase and Elastic Search
Elastic Search:
val currentTweets = sc.hadoopRDD(jobConf,
classOf[EsInputFormat[Object, MapWritable]], classOf[Object],
classOf[MapWritable])
HBase:
val rdd = sc.newAPIHadoopRDD(
conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
Lucky Club: Best Casino Site for Bonuses, Software and Games
ReplyDeleteLucky Club offers online slots and casino games from the provider Microgaming. Sign up and claim your welcome bonus and discover the best luckyclub.live casino