18 June 2016

A Word Count Example with Cached Partition

scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel

scala> val lines = sc.textFile("hdfs:///user/raj/data.txt", 3)
lines: org.apache.spark.rdd.RDD[String] = hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28

scala> // No of partitions

scala> lines.partitions.size
res0: Int = 3

scala> // flatMap() : One of many transformation

scala> val words = lines.flatMap(x => x.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:30

scala> // Persist the data

scala> val units = words.map ( word => (word, 1) ).
     |                persist(StorageLevel.MEMORY_ONLY)
units: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:32

scala> 

scala> val counts = units.reduceByKey ( (x, y) => x + y )
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:34

// Text file is read to compute the 'counts' RDD
scala> counts.toDebugString
res1: String = 
(3) ShuffledRDD[4] at reduceByKey at <console>:34 []
 +-(3) MapPartitionsRDD[3] at map at <console>:32 []
    |  MapPartitionsRDD[2] at flatMap at <console>:30 []
    |  hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 []
    |  hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 []

scala> // First Action

scala> counts.collect()
res2: Array[(String, Int)] = Array((another,1), (This,2), (is,2), (a,1), (test,2))

scala> val counts2 = units.reduceByKey((x, y) => x * y)
counts2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:34

// Cache value is read to compute the 'counts2' RDD
scala> counts2.toDebugString
res3: String = 
(3) ShuffledRDD[5] at reduceByKey at <console>:34 []
 +-(3) MapPartitionsRDD[3] at map at <console>:32 []
    |      CachedPartitions: 3; MemorySize: 696.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
    |  MapPartitionsRDD[2] at flatMap at <console>:30 []
    |  hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 []
    |  hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 []

scala> // Second Action

scala> counts2.collect()
res4: Array[(String, Int)] = Array((another,1), (This,1), (is,1), (a,1), (test,1))

Broadcast Variable Example

scala> // Sending a value from Driver to Worker Nodes without

scala> // using Broadcast variable

scala> val input = sc.parallelize(List(1, 2, 3))
input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27

scala> val localVal = 2
localVal: Int = 2

scala> val added = input.map( x => x + localVal)
added: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[18] at map at <console>:31

scala> added.foreach(println)
4
3
5

scala> //** Local variable is once again transferred to worked nodes

scala> //   for the next operation

scala> val multiplied = input.map( x => x * 2)
multiplied: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at map at <console>:29

scala> multiplied.foreach(println)
4
6
2

scala> // Sending a read-only value using Broadcast variable

scala> // Can be used to send large read-only values to all worker

scala> // nodes efficiently

scala> val broadcastVar = sc.broadcast(2)
broadcastVar: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(14)

scala> val added = input.map(x => broadcastVar.value + x)
added: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at map at <console>:31

scala> added.foreach(println)
5
3
4

scala> val multiplied = input.map(x => broadcastVar.value * x)
multiplied: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at map at <console>:31

scala> multiplied.foreach(println)
6
4
2

scala> 

28 May 2016

A Word Count example using 'spark-shell'

[raj@Rajkumars-MacBook-Pro ~]$spark-shell --master local[*] 
2016-05-28 15:37:24.325 java[3907:6309927] Unable to load realm info from SCDynamicStore
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val lines = sc.parallelize(List("This is a word", "This is another word"), 7)
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at :27

scala> // No of partitions

scala> lines.partitions.size
res0: Int = 7

scala> // flatMap() : One of many transformation

scala> val words = lines.flatMap(line => line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at flatMap at :29

scala> val units = words.map ( word => (word, 1) )
units: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[2] at map at :31

scala> val counts = units.reduceByKey ( (x, y) => x + y )
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at :33

scala> counts.toDebugString
res1: String = 
(7) ShuffledRDD[3] at reduceByKey at :33 []
 +-(7) MapPartitionsRDD[2] at map at :31 []
    |  MapPartitionsRDD[1] at flatMap at :29 []
    |  ParallelCollectionRDD[0] at parallelize at :27 []

scala> // collect() : One of many actions

scala> counts.collect()
res2: Array[(String, Int)] = Array((This,2), (is,2), (another,1), (a,1), (word,2))


04 May 2016

Label 4...


Accumulator : Example

Note : Use Accumulator only in action to get correct values. Do not use Accumulator in Transformation ; Use it only for debugging purpose in Transformation
scala> val input = sc.parallelize(List(1, 2, 3, 4, 5,
 | 6, 7, 8, 9, 10,
 | 11, 12, 13, 14, 15
 | )) 
input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> println("No of partitions -> " + input.partitions.size)
No of partitions -> 8

scala> val myAccum = sc.accumulator(0, "My Accumulator")
myAccum: org.apache.spark.Accumulator[Int] = 0

scala> // Used inside an action

scala> input.foreach{ x => 
 | //Thread.sleep(50000)
 | myAccum += 1
 | }

scala> println("myAccum -> " + myAccum.value)
myAccum -> 15
c15 > a15