Tuesday, November 1, 2016

Spark - aggregateByKey and groupByKey Example

Consider an example of trips and stations

Before we begin with aggregateByKey or groupByKey, lets load the data from text files, create RDDs and print duration of trips.

Create case classes for station and trip
---------
case class Trip(id:Int, duration:Int, startDate:String, startStation:String, startTerminal:String, endDate:String, endStation:String, endTerminal:String, bike:String,subscriberType:String);

case class Station(id:Int,name:String,lat:String,lon:String,docks:String,landmark:String,installDate:String)

Load and map trip to case class
------

val tripWithHeader = sc.textFile("file:////home/vm4learning/Downloads/data/trips/trip_data.csv")

val tripHeader = tripWithHeader.first()

val trips = tripWithHeader.filter(_!=tripHeader).map(_.split(","))

val trip = trips.map( t=>Trip(t(0).toInt,t(1).toInt,t(2),t(3),t(4),t(5),t(6),t(7),t(8),t(9)) )


Load and map station to case class
------

val stationsWithHeader = sc.textFile("file:////home/vm4learning/Downloads/data/stations/station_data.csv")

val stationHeader = stationsWithHeader.first()

val stations = stationsWithHeader.filter(_!=stationHeader).map(_.split(","))

val station = stations.map( s=>Station(s(0),s(1),s(2),s(3),s(4),s(5),s(6),s(7))


Calculate durationByStartStation
--------

val byStartTerminal = trip.keyBy(_.startStation)

byStartTerminal.take(2).foreach(println)
(South Van Ness at Market,Trip(4576,63,8/29/2013 14:13,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber))
(San Jose City Hall,Trip(4607,70,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber))

val durationByStart = byStartTerminal.mapValues(_.duration)
durationByStart: org.apache.spark.rdd.RDD[(String, Int)] = MappedValuesRDD[11] at mapValues at <console>:24
gru
scala> durationByStart.take(2).foreach(println)
(South Van Ness at Market,63)
(San Jose City Hall,70)

Now calculate average using groupByKey

val grouped = durationByStart.groupByKey.mapValues(list=>list.sum/list.size)

scala> grouped.take(3).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)

(San Jose Civic Center,3028)


groupByKey shuffles keys across all partitions.

aggregateByKey on the other hand uses combiner concept of MapReduce, and combines all values within a partition, and then reduces the result from multiple partitions. The combined output from all partitiones when reduced, will involve much less shuffling

scala> val results = durationByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
results: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[14] at aggregateByKey at <console>:26

scala> results.take(3).foreach(println)
(Mountain View City Hall,(2330658,1401))
(California Ave Caltrain Station,(3144492,626))
(San Jose Civic Center,(2483057,820))

scala> val finalAvg = results.mapValues(i=>i._1/i._2)

scala> finalAvg.take(3).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)


Lets see aggregateByKey  in details.
val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
That's just the zeroValue, the initial amount you will start the aggregation.

val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
This is the function (exactly as combiner in MR) to aggregate the values of each key. It keeps the running tally of sum + count so that we could calculate the averages later. It performs the reduce function in the current partition before the data is shuffled out. This reduces the unnecessary data from being transferred out.

val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
This is the function to aggregate the output of the first function -- happens after the shuffle to do
the final aggregation. This adds up all the sums and counts together. A lot more efficient because
everything comes in as compact as possible.

Also, in the same example, try running and see the time difference

scala> finalAvg.take(10).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)
(Yerba Buena Center of the Arts (3rd @ Howard),970)
(SJSU 4th at San Carlos,1966)
(Commercial at Montgomery,761)
(2nd at South Park,683)
(University and Emerson,6545)
(Embarcadero at Sansome,1549)
(Townsend at 7th,746)

scala> grouped.take(10).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)
(Yerba Buena Center of the Arts (3rd @ Howard),970)
(Commercial at Montgomery,761)
(SJSU 4th at San Carlos,1966)
(2nd at South Park,683)
(University and Emerson,6545)
(Embarcadero at Sansome,1549)
(Townsend at 7th,746)

Find the first trip starting at each terminal
val firstTrips = byStartTerminal.reduceByKey((a,b)=> { a.startDate>b.startDate match {case true=>a case false=>b}} )

firstTrips.take(2).foreach(println)
(Mountain View City Hall,Trip(15865,1072,9/9/2013 17:47,Mountain View City Hall,27,9/9/2013 18:05,Rengstorff Avenue / California Street,33,151,Subscriber))

(California Ave Caltrain Station,Trip(15583,824,9/9/2013 13:23,California Ave Caltrain Station,36,9/9/2013 13:37,University and Emerson,35,59,Customer))

1 comment:

  1. Hi where can I get trip_data.csv and other data. Could you please post a link for downloading the data?

    ReplyDelete