-
Notifications
You must be signed in to change notification settings - Fork 0
/
TestCombineByKey.scala
51 lines (35 loc) · 1.85 KB
/
TestCombineByKey.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import org.apache.spark.{ SparkConf, SparkContext, HashPartitioner }
/**
* Sample for using PairRDDFunctions.combineByKey(...)
*/
// Data store class for student and their subject score
case class ScoreDetail(studentName: String, subject: String, score: Float)
object TestCombineByKey {
def main(args: Array[String]): Unit = {
val scores = List(ScoreDetail("A", "Math", 98), ScoreDetail("A", "English", 88),
ScoreDetail("B", "Math", 75), ScoreDetail("B", "English", 78),
ScoreDetail("C", "Math", 90), ScoreDetail("C", "English", 80),
ScoreDetail("D", "Math", 91), ScoreDetail("D", "English", 80))
// convert to (key, values) -> (Student Name: String, score: ScoreDetail)
val scoresWithKey = for { i <- scores } yield (i.studentName, i)
val sc = new SparkContext(new SparkConf().setAppName("TestCombineByKeyJob"))
// If data set is reused then cache recommended...
val scoresWithKeyRDD = sc.parallelize(scoresWithKey).partitionBy(new HashPartitioner(3)).cache
println(">>>> Number of partitions: " + scoresWithKeyRDD.getNumPartitions)
println(">>>> Elements in each partition")
scoresWithKeyRDD.foreachPartition(partition => println(partition.length))
// explore each partition...
println(">>>> Exploring partitions' data...")
scoresWithKeyRDD.foreachPartition(
partition => partition.foreach(
item => println(item._2)))
// Combine the scores for each student
val avgScoresRDD = scoresWithKeyRDD.combineByKey(
(x: ScoreDetail) => (x.score, 1) /*createCombiner*/,
(acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1) /*mergeValue*/,
(acc1: (Float, Int), acc2: (Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) /*mergeCombiners*/
// calculate the average
).map( { case(key, value) => (key, value._1/value._2) })
avgScoresRDD.collect.foreach(println)
}
}