Creatively Technical
Creative solutions to Technical problems in Big Data & Java
Thursday, June 20, 2019
Monday, April 29, 2019
SparkStreaming
Reading and Writing JSON
sparkSession.read.json(path_or_rdd)
- Uses a DataFrameReader to read json records (and other formats) from a file or RDD, infer the schema and create a DataFrame
- Can convert to DataSet[T] afterwards
mydataset.write.json
- Uses a DataFrameWriter to write a Dataset as json formatted records (or other formats)
mydataset.toJSON
- Converts the Dataset[T] to Dataset[String] of Json-formatted record strings
Handling Bad Records
- DataFrameReader prepends an extra column _corrup_record:
- null if the records is valid json
- json string if its invalid json
DataFrameReader and Writer also supports other formats
- csv
- jdbc
- orc
- text
- other table formats
Structured Streaming - Spark 1.X
- Spark started as batch mode system, like MapReduce
- Spark streaming is a mini batch system
- Each mini-batch of data (for each time interval) is stored in an RDD
- DStream holds the sequence of mini-batch RDDs, and also offers window functions
- Time interval typically 1/2 to 60+ seconds
Structured Streaming - Spark 2.X
- Started as moving away from "min-batch" model
- Spark streaming is a mini batch system
- Longer latency of mini-batches is too long for many latency sensitive applications
- Based on Datasets rather than RDDs
- still exprimental in Spark 2.x
- will replace Spark Streaming once it is mature
Larger context of Spark Streaming
Spark - Streaming: Data Pipeline
- Input comes from Kafka, Flume, HDFS/S3, Kinesis and Twitter.
- Batches of input data go from Spark Streaming to Spark Engine
- Batches of processed data go to HDFS, Databases and dashboards
sparkSession.read.json(path_or_rdd)
- Uses a DataFrameReader to read json records (and other formats) from a file or RDD, infer the schema and create a DataFrame
- Can convert to DataSet[T] afterwards
mydataset.write.json
- Uses a DataFrameWriter to write a Dataset as json formatted records (or other formats)
mydataset.toJSON
- Converts the Dataset[T] to Dataset[String] of Json-formatted record strings
Handling Bad Records
- DataFrameReader prepends an extra column _corrup_record:
- null if the records is valid json
- json string if its invalid json
DataFrameReader and Writer also supports other formats
- csv
- jdbc
- orc
- text
- other table formats
Structured Streaming - Spark 1.X
- Spark started as batch mode system, like MapReduce
- Spark streaming is a mini batch system
- Each mini-batch of data (for each time interval) is stored in an RDD
- DStream holds the sequence of mini-batch RDDs, and also offers window functions
- Time interval typically 1/2 to 60+ seconds
Structured Streaming - Spark 2.X
- Started as moving away from "min-batch" model
- Spark streaming is a mini batch system
- Longer latency of mini-batches is too long for many latency sensitive applications
- Based on Datasets rather than RDDs
- still exprimental in Spark 2.x
- will replace Spark Streaming once it is mature
Larger context of Spark Streaming
Spark - Streaming: Data Pipeline
- Input comes from Kafka, Flume, HDFS/S3, Kinesis and Twitter.
- Batches of input data go from Spark Streaming to Spark Engine
- Batches of processed data go to HDFS, Databases and dashboards
Checkpointing and D-streaming
- For DStreams, Checkpointing is done automatically, otherwise RDD Lineage would grow too long
- metadata checkpoint is written for every batch
- data checkpoint is configured for DStream.checkpoint(duration) and duration must be in (n * batch_interval) for some n
- default is if batch_interval > 10 sec, checkpointing defaults to every batch
if batch_interval < 10 sec, then defaults to closest n, where (n * batch_interval) > 10 sec
Spark - Datasets Components and Optimizer
SparkSQL Components
- Catalyst
- Execution Core
- Query planner that translates the logical queries into actual Dataset operations
- SparkSession (2.X) or SQLContext (1.X) is defined in core
- Hive Integration
Catalyst Optimizer
- Backend agnostic - supports both SQL and Dataset code
- Manipulation of trees of relational operators and expressions
Execution Core: SparkSession
Spark 1.X - SQLContext
val sc = new SparkContext(master,none)
val sqlContext = new SQLContext(sc)
Spark 2.X - SparkSession which also creates the SparkContext
val spark = SparkSession.builder().master(master).appName(name).getOrCreate()
val sc = spark.sparkContext
Hive Integration
- Interoperate with Hadoop Hive tables and metastore
-Spark 1.x, use HiveContext an extension of SQLContext
-Spark 2.x, enables Hive support in SparkSession
- Create, read and delete Hive tables
- Use Hive SerDe and UDFs
- Catalyst
- Execution Core
- Query planner that translates the logical queries into actual Dataset operations
- SparkSession (2.X) or SQLContext (1.X) is defined in core
- Hive Integration
Catalyst Optimizer
- Backend agnostic - supports both SQL and Dataset code
- Manipulation of trees of relational operators and expressions
Execution Core: SparkSession
Spark 1.X - SQLContext
val sc = new SparkContext(master,none)
val sqlContext = new SQLContext(sc)
Spark 2.X - SparkSession which also creates the SparkContext
val spark = SparkSession.builder().master(master).appName(name).getOrCreate()
val sc = spark.sparkContext
Hive Integration
- Interoperate with Hadoop Hive tables and metastore
-Spark 1.x, use HiveContext an extension of SQLContext
-Spark 2.x, enables Hive support in SparkSession
- Create, read and delete Hive tables
- Use Hive SerDe and UDFs
Friday, April 26, 2019
Spark Datasets
Datasets
- Distributed collection of structured data, which are analogous to Relational Tables.
- Examples of dataset API is Python Pandas and R Dataframes.
- Deliver better performance than RDDs.
- Supported by Scala, Java, Python and R
- Uses a new Query optimizer - Catalyst Aggressive Internal Optimizations
Datasets vs Dataframes
type DataFrame = Dataset[Row]
- Original API was called DataFrames with uses Rows to represent the rows in the data frame, but Row loses type-safety. Each column is effectivel untyped
- Dataset was introduced in Spark 1.6 for restoring type safety - for catching errors at compile time
- We ca still work with Dataframes as if they are regular types, but in reality we are working with DataSet[Row] instances
- Distributed collection of structured data, which are analogous to Relational Tables.
- Examples of dataset API is Python Pandas and R Dataframes.
- Deliver better performance than RDDs.
- Supported by Scala, Java, Python and R
- Uses a new Query optimizer - Catalyst Aggressive Internal Optimizations
Datasets vs Dataframes
type DataFrame = Dataset[Row]
- Original API was called DataFrames with uses Rows to represent the rows in the data frame, but Row loses type-safety. Each column is effectivel untyped
- Dataset was introduced in Spark 1.6 for restoring type safety - for catching errors at compile time
- We ca still work with Dataframes as if they are regular types, but in reality we are working with DataSet[Row] instances
Thursday, April 25, 2019
Scala - Catching Exceptions & Optional Values
Catching Exceptions
- No Checked Exception in Scala
- Use try-catch expressions to catch an exception
- In the catch clause - define one or more match alternatives
- The finally clause is optional. Scala provides a useful alternative Try
def toInt( s:String ) : Int = try {
| s.toInt
| } catch {
| case _: NumberFormatException => 0
| }
Using Try
- Try executes a given expression and returns true or failure
scala> import scala.util.{Try,Success,Failure}
import scala.util.{Try, Success, Failure}
scala> Try("100".toInt)
res60: scala.util.Try[Int] = Success(100)
scala> Try("Martin".toInt)
res61: scala.util.Try[Int] = Failure(java.lang.NumberFormatException: For input string: "Martin")
Pattern Matching on Try
- Use try in the same way as Option. One way is with Pattern Matching
scala> def makeInt(s:String): Int = Try(s.toInt) match {
| case Success(n) => n
| case Failure(_) => 0
| }
makeInt: (s: String)Int
scala> makeInt("100")
res62: Int = 100
scala> makeInt("Hello")
res63: Int = 0
Optional Values
The Option Class - It is an ADT with two possible types
- Some case class, wrapping a value
- None singleton object
scala> val langs = Map( "s" -> "Scala", "j" -> "Java" )
langs: scala.collection.immutable.Map[String,String] = Map(s -> Scala, j -> Java)
scala> langs.get("s")
res41: Option[String] = Some(Scala)
scala> langs.get("c")
res42: Option[String] = None
- No Checked Exception in Scala
- Use try-catch expressions to catch an exception
- In the catch clause - define one or more match alternatives
- The finally clause is optional. Scala provides a useful alternative Try
def toInt( s:String ) : Int = try {
| s.toInt
| } catch {
| case _: NumberFormatException => 0
| }
Using Try
- Try executes a given expression and returns true or failure
scala> import scala.util.{Try,Success,Failure}
import scala.util.{Try, Success, Failure}
scala> Try("100".toInt)
res60: scala.util.Try[Int] = Success(100)
scala> Try("Martin".toInt)
res61: scala.util.Try[Int] = Failure(java.lang.NumberFormatException: For input string: "Martin")
Pattern Matching on Try
- Use try in the same way as Option. One way is with Pattern Matching
scala> def makeInt(s:String): Int = Try(s.toInt) match {
| case Success(n) => n
| case Failure(_) => 0
| }
makeInt: (s: String)Int
scala> makeInt("100")
res62: Int = 100
scala> makeInt("Hello")
res63: Int = 0
Higher Order Functions on Try
- getOrElse extracts the wrapped value or returns the default
- Try offers Higher Order Functions known from collections
scala> def makeInt(s:String) = Try(s.toInt).getOrElse(0)
makeInt: (s: String)Int
scala> Success("scala").map(_.reverse)
res64: scala.util.Try[String] = Success(alacs)
scala> for {
| lang <- Success("Scala")
| behav <- Success("Rocks")
| } yield s"$lang $behav"
res65: scala.util.Try[String] = Success(Scala Rocks)
Optional Values
The Option Class - It is an ADT with two possible types
- Some case class, wrapping a value
- None singleton object
scala> val langs = Map( "s" -> "Scala", "j" -> "Java" )
langs: scala.collection.immutable.Map[String,String] = Map(s -> Scala, j -> Java)
scala> langs.get("s")
res41: Option[String] = Some(Scala)
scala> langs.get("c")
res42: Option[String] = None
Using Option Instances
Some and None can be used directly
Some("Scala")
Option(null)
Pattern matching on Option
- The type system enforces us to handle an optional value
- One way is with Pattern matching
scala> def langsByFirst( s: String ): String = langs.get(s) match {
| case Some(language) => language
| case None => s"No language starting with $s"
| }
langsByFirst: (s: String)String
scala> langsByFirst("s")
res47: String = Scala
scala> langsByFirst("c")
res48: String = No language starting with c
Higher Order Functions on Option
- Option offers higher order functions known from collections
- Using higher order functions such as map and flatmap on Option, allows to easily compose a chain of calls that handles missing values gracefully
scala> Option("Scala").map(_.reverse)
res49: Option[String] = Some(alacS)
scala> for {
| lang <- Some("Scala")
| behaviour <- Some("rocks")
| } yield s"$lang $behaviour"
res53: Option[String] = Some(Scala rocks)
scala> val langs = Map("s" -> "Scala", "j" -> "Java")
langs: scala.collection.immutable.Map[String,String] = Map(s -> Scala, j -> Java)
- foreach calls a function only if a value is present
scala> langs.get("s").foreach(println)
Scala
scala> langs.get("c").foreach(println)
Obtaining the value
- The getOrElse extracts the wrapped value or returns a default
scala> def langsByFirst(s:String): String = langs.get(s).getOrElse(s"No langs starting with $s")
langsByFirst: (s: String)String
scala> langsByFirst("c")
res56: String = No langs starting with c
Scala Match Expressions
- Expressions are matched against a pattern
- Difference to the switch statement of C or Java
- No fall through to the next alternative
- Match expressions return a value
- MatchError is thrown if no pattern matches
expr match {
case pattern1 => result1
case pattern2 => result2
}
Match pattern
case pattern => result
- case declares a match pattern
- If pattern matches, result will be evaluated and returned
Wildcard Pattern
_ is used as a wildcard to match everything
scala> def whatTimeIsIt(any: Any): String = any match {
| case _ => s"any is no time"
| }
whatTimeIsIt: (any: Any)String
scala> whatTimeIsIt(20)
res18: String = any is no time
- Difference to the switch statement of C or Java
- No fall through to the next alternative
- Match expressions return a value
- MatchError is thrown if no pattern matches
expr match {
case pattern1 => result1
case pattern2 => result2
}
Match pattern
case pattern => result
- case declares a match pattern
- If pattern matches, result will be evaluated and returned
Pattern 1
Wildcard Pattern
_ is used as a wildcard to match everything
scala> def whatTimeIsIt(any: Any): String = any match {
| case _ => s"any is no time"
| }
whatTimeIsIt: (any: Any)String
scala> whatTimeIsIt(20)
res18: String = any is no time
Variable Pattern
scala> def whatTimeIsIt(any: Any): String = any match {
| case any => s"$any is no time"
| }
whatTimeIsIt: (any: Any)String
scala> whatTimeIsIt("winter")
res21: String = winter is no time
Typed Pattern
- Use a type annotation to match only certain annotations
- Typed pattern needs to be composed with the wildcard and variable pattern
scala> def whatTimeIsIt( any:Any ): String = any match {
| case time: Time => s"Time is ${Time.hours}:{Time.minutes}"
| case _ => s"$any is no time"
| }
scala> whatTimeIsIt(Time(9,30))
Constant Pattern
- Use a stable identifier to match something constant
scala> def whatTimeIsIt( any:Any ): String = any match {
| case "12:00" => "High Noon"
| case _ => s"$any is no time"
| }
scala> whatTimeIsIt("12:00")
res23: String = High Noon
scala> whatTimeIsIt("10:00")
res24: String = 10:00 is no time
Pattern 2
Tuple Pattern
- Use tuple syntax to match and decompose tuples
- Tuple pattern is composed with other patterns like the constant pattern or variable pattern
def whatTimeIsIt( any:Any ): String = any match {
case (x, "12:00") => s"From $x to noon"
case _ => s"$any is no time"
}
scala> whatTimeIsIt("5")
res27: String = 5 is no time
scala> whatTimeIsIt("5","12:00")
res28: String = From 5 to noon
Constructor Pattern
- Use constructor syntax to match and decompose case classes
- The constructor pattern is composed with other patterns
- We can build deeply nested structures
def whatTimeIsIt( any:Any ) = any match {
case Time(12,0) => "High Noon"
case Time(12,m) => s"It is $m past 12"
case _ => s"$any is no time"
}
>whatTimeIsIt(Time(12,15))
>It is 15 minutes past 12
Sequence Pattern
Use sequence constructors or append or prepend operators to match and decompose sequences
scala> def matchSeq[A]( seq: Seq[A] ): String = seq match {
| case Seq(1,2,3) => "1 to 3"
| case x +: Nil => s"Only element is $x"
| case _+:x => s"Last element is $x"
| case Nil => "Empty sequence"
| }
matchSeq: [A](seq: Seq[A])String
scala> matchSeq(1 to 3)
res29: String = 1 to 3
scala> matchSeq(Vector(1))
res30: String = Only element is 1
scala> matchSeq(Array(1,2,3,4))
res31: String = Last element is WrappedArray(2, 3, 4)
Pattern Alternatives
- Use | symbol to combine various pattern alternatives
def whatTimeIsIt( any:Any ): String = any match {
case "00:00" | "12:00" => s"Midnight or high noon"
case _ => s"$any is no time"
}
Pattern Binders
- Use @ to bind various pattern alternatives
def whatTimeIsIt(any:Any) : String = any match {
case time @ Time (_,0) => s"time with 0 minutes"
case time @ Time (_,m) => s"time with $m minutes"
case _ => s"$any is no time"
}
Pattern Guards
- Composing patterns gives more control over matching
- If that's not enough, use the if keyword to define pattern guard
def isAfternoon(any:Any) : String = any match {
case Time(h,m) if h>=12 => s"Yes, it is $(h-12): $m pm"
case Time(h,m) => s"No, it is $h:$m am"
case _ => s"$any is no time"
}
Scala Traits
On JVM Single inheritence - Traits in Scala overcomes this limitation.
abstract class Animal
class Bird extends Animal {
def fly: String = "I am flying"
}
class Fish extends Animal {
def swimming: String = "I am swimming"
}
class Duck //Got stuck here
trait Swimmer {
def swimming: String = "I am swimming"
}
- Traits may contain concrete and abstract members
- Traits are abstract and cannot have parameters
- Like classes, traits extend exactly one superclass
Mix-In composition
- The extends either creates a subclass or mixes in the first trait
- The with mixes in subsequent traits
- First keyword has to be extends regardless whether a class or a trait, then use with for further mix-ins
- Mixing in traits, means extending the superclass of the trait
class Fish extends Animal with Swimmer
class Duck extends Bird with Swimmer
trait Foo extends AnyRef
trait Bar extends AnyRef with Foo
Traits Linearization
- Scala takes the class and all of its inherited classes and mixed-in traits and puts them in a single, linear order. Therefore it is always clear what super means:
Inheritence Hierarchy
- Traits must respect the inheritance hierarchy - i.e. a class must not extend two incompatible super classes
scala> class Foo; class Bar; trait Baz extends Foo
defined class Foo
defined class Bar
defined trait Baz
scala> class Qux extends Bar
defined class Qux
scala> class Qux extends Bar with Baz
<console>:14: error: illegal inheritance; superclass Bar
is not a subclass of the superclass Foo
of the mixin trait Baz
class Qux extends Bar with Baz
^
Concrete Members
- If multiple traits defined the same member, must use override
abstract class Animal
class Bird extends Animal {
def fly: String = "I am flying"
}
class Fish extends Animal {
def swimming: String = "I am swimming"
}
class Duck //Got stuck here
trait Swimmer {
def swimming: String = "I am swimming"
}
- Traits may contain concrete and abstract members
- Traits are abstract and cannot have parameters
- Like classes, traits extend exactly one superclass
Mix-In composition
- The extends either creates a subclass or mixes in the first trait
- The with mixes in subsequent traits
- First keyword has to be extends regardless whether a class or a trait, then use with for further mix-ins
- Mixing in traits, means extending the superclass of the trait
class Fish extends Animal with Swimmer
class Duck extends Bird with Swimmer
trait Foo extends AnyRef
trait Bar extends AnyRef with Foo
Traits Linearization
- Scala takes the class and all of its inherited classes and mixed-in traits and puts them in a single, linear order. Therefore it is always clear what super means:
Inheritence Hierarchy
- Traits must respect the inheritance hierarchy - i.e. a class must not extend two incompatible super classes
scala> class Foo; class Bar; trait Baz extends Foo
defined class Foo
defined class Bar
defined trait Baz
scala> class Qux extends Bar
defined class Qux
scala> class Qux extends Bar with Baz
<console>:14: error: illegal inheritance; superclass Bar
is not a subclass of the superclass Foo
of the mixin trait Baz
class Qux extends Bar with Baz
^
Concrete Members
- If multiple traits defined the same member, must use override
Processed Strings in Scala
Digression - String Interpolation
Since scala 2.10, we can define processed strings
- String starting with s embeds expression using $id or ${ }
- String starting with f formats the results of the expressions
- %02d means format the integer with 2 digits, left padding it with zeroes
scala> val n = 20
n: Int = 20
scala> s"Value = $n"
res6: String = Value = 20
scala> f"Hex Value = $n%02x"
res8: String = Hex Value = 14
Since scala 2.10, we can define processed strings
- String starting with s embeds expression using $id or ${ }
- String starting with f formats the results of the expressions
- %02d means format the integer with 2 digits, left padding it with zeroes
scala> val n = 20
n: Int = 20
scala> s"Value = $n"
res6: String = Value = 20
scala> f"Hex Value = $n%02x"
res8: String = Hex Value = 14
Inheritance in Scala
Scala supports inheritence
- Code Reuse
- Specialization
Each class except for Any has exactly one super-class
- Non private members are inherited
- Non final members can be overridden
Subtype polymorphism
- Polymorph means having more than one types
- The Subclass type conforms to the superclass type
Subclass
Bird extends Animal
- default superclass is AnyRef
- can only extend one class
Final and Sealed Classes
- Use final to prevent a class from being extended
final class Animal
class Bird extends Animal (gives error)
- Sealed classes can only be extended within the same source file
- Use it to create and algebric data type (ADT) called Tagged Union
- Example: Option with some and none
sealed class Animal
class Bird extends Animal
final class Fish extends Animal
Accessing superclass members
- Use super to extend superclass members
Uniform Access Principle
For a client it should make no difference, whether a property is implemented through storage or computation
- Properties can be implemented as def or as val
class Animal{
def name: String = "Lion"
val name: String = "Lion"
}
Overriding vals with def
- val is stable, but parameterless def could return different results on different calls
- We can decide to become stable and override a def with a val
class Animal {
def name: Seq[char] = Random.shuffle("Lion".toSeq)
}
class Bird extends Animal {
override val name: Seq[Char] = "Dog"
}
Lazy Vals
- All vals are initialized during object construction
- Use lazy keyword to defer initialization until first usage
- Lazy vals are not final - might show some performance drawbacks
lazy val lazyVame = {
println("I am very lazy")
}
- Code Reuse
- Specialization
Each class except for Any has exactly one super-class
- Non private members are inherited
- Non final members can be overridden
Subtype polymorphism
- Polymorph means having more than one types
- The Subclass type conforms to the superclass type
Subclass
Bird extends Animal
- default superclass is AnyRef
- can only extend one class
Final and Sealed Classes
- Use final to prevent a class from being extended
final class Animal
class Bird extends Animal (gives error)
- Sealed classes can only be extended within the same source file
- Use it to create and algebric data type (ADT) called Tagged Union
- Example: Option with some and none
sealed class Animal
class Bird extends Animal
final class Fish extends Animal
Accessing superclass members
- Use super to extend superclass members
Uniform Access Principle
For a client it should make no difference, whether a property is implemented through storage or computation
- Properties can be implemented as def or as val
class Animal{
def name: String = "Lion"
val name: String = "Lion"
}
Overriding vals with def
- val is stable, but parameterless def could return different results on different calls
- We can decide to become stable and override a def with a val
class Animal {
def name: Seq[char] = Random.shuffle("Lion".toSeq)
}
class Bird extends Animal {
override val name: Seq[Char] = "Dog"
}
Lazy Vals
- All vals are initialized during object construction
- Use lazy keyword to defer initialization until first usage
- Lazy vals are not final - might show some performance drawbacks
lazy val lazyVame = {
println("I am very lazy")
}
Wednesday, April 24, 2019
Functional Collections - Higher Order Functions
- Higher Order collections takes functions as parameters
- Methods taking functions as arguments
- Methods returning a function
- Higher order function provides more abstraction
- Declarative programming (tell what to do, no need on how to do it)
scala> val numbers = Vector(1,2,3)
numbers: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)
scala> numbers.map(number=>number+1)
res51: scala.collection.immutable.Vector[Int] = Vector(2, 3, 4)
- Methods taking functions as arguments
- Methods returning a function
- Higher order function provides more abstraction
- Declarative programming (tell what to do, no need on how to do it)
scala> val numbers = Vector(1,2,3)
numbers: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)
scala> numbers.map(number=>number+1)
res51: scala.collection.immutable.Vector[Int] = Vector(2, 3, 4)
Function Literals
Short notation _stands for one function parameter
scala> numbers.map((number:Int)=>number+1)
res52: scala.collection.immutable.Vector[Int] = Vector(2, 3, 4)
scala> numbers.map(number=>number+1)
res53: scala.collection.immutable.Vector[Int] = Vector(2, 3, 4)
scala> numbers.map(_+1)
res54: scala.collection.immutable.Vector[Int] = Vector(2, 3, 4)
Function Values
Scala has first class functions
- Function values are objects
- Assign function values to variables
- Pass function values as arguments to higher order functions
scala> val addOne = (n:Int)=>n+1
addOne: Int => Int = <function1>
scala> numbers.map(addOne)
res55: scala.collection.immutable.Vector[Int] = Vector(2, 3, 4)
Function Types
If functions are objects, what are their types? REPL shows Int => Int which is syntactic sugar for function1[Int, Int]. All function types define apply method
scala> addOne(7)
res56: Int = 8
Methods and functions
Methods and functions share a lot in common but they are different. If a compiler expects a function but you give a method, method gets lifted to a function
scala> def addOne(n:Int) = n+1
addOne: (n: Int)Int
scala> val addOneFn:Int => Int = addOne
addOneFn: Int => Int = <function1>
Higher Order Function: Map
- Map transforms a collection by applying a function to each element
- Collection type remains but function type may change
scala> val languages = Vector("Scala","Java","Python")
languages: scala.collection.immutable.Vector[String] = Vector(Scala, Java, Python)
scala> languages.map(_.toLowerCase)
res59: scala.collection.immutable.Vector[String] = Vector(scala, java, python)
scala> languages.map(_.length)
res60: scala.collection.immutable.Vector[Int] = Vector(5, 4, 6)
Higher Order Function: FlatMap
- FlatMap transforms a function by applying a function to each element
- Each collection for each element is expanded into a result
scala> Seq("now is", "the time").map(_.split(" "))
res63: Seq[Array[String]] = List(Array(now, is), Array(the, time))
scala> Seq("now is", "the time").flatMap(_.split(" "))
res64: Seq[String] = List(now, is, the, time)
Higher Order Function: Filter
- The predicate is a unary (one argument) function returning a boolean value
- the filter selects elements which satisfies a predicate
- the filterNot selects elements which does not satisfy a predicate
Scala Tuples
Tuples Are
- Not collections but finite heterogeneous containers
- case classes parameterized in each field
- Standard library contains Tuple1 to Tuple22
Tuple fields are created as _1,_2 etc
Tuples 2s pairs can be created with -> operator
scala> Tuple2(1,"a")
res41: (Int, String) = (1,a)
scala> Tuple2(1,2)
res42: (Int, Int) = (1,2)
scala> (1,"a")
res44: (Int, String) = (1,a)
scala> val pair = (1,"a")
pair: (Int, String) = (1,a)
scala> pair._2
res45: String = a
scala> 1->"a"
res46: (Int, String) = (1,a)
- Not collections but finite heterogeneous containers
- case classes parameterized in each field
- Standard library contains Tuple1 to Tuple22
Tuple fields are created as _1,_2 etc
Tuples 2s pairs can be created with -> operator
scala> Tuple2(1,"a")
res41: (Int, String) = (1,a)
scala> Tuple2(1,2)
res42: (Int, Int) = (1,2)
scala> (1,"a")
res44: (Int, String) = (1,a)
scala> val pair = (1,"a")
pair: (Int, String) = (1,a)
scala> pair._2
res45: String = a
scala> 1->"a"
res46: (Int, String) = (1,a)
Scala Collections
Useful operations on scala collections:
scala> Vector(1,2,3)
res0: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)
scala> Seq(1,2,3)
res1: Seq[Int] = List(1, 2, 3)
scala> Set(1,2,3)
res2: scala.collection.immutable.Set[Int] = Set(1, 2, 3)
scala> Set(1,2,"3")
res3: scala.collection.immutable.Set[Any] = Set(1, 2, 3)
Collections has a default apply method
scala> Vector(1,2,3)
res4: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)
scala> Vector.apply(1,2,3)
res5: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)
Type of collection can be passed as parameter in []
scala> Vector[Int](1,2,3)
res6: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)
scala> Vector[String]("a","b","c")
res8: scala.collection.immutable.Vector[String] = Vector(a, b, c)
scala> Vector("a","b","c")
res9: scala.collection.immutable.Vector[String] = Vector(a, b, c)
Collection operations like +, toSet, toSeq, contains, head, last, tail,
scala> val a = Seq(1,2)
a: Seq[Int] = List(1, 2)
scala> val b = Seq(3,4)
b: Seq[Int] = List(3, 4)
scala> a.toSet
res16: scala.collection.immutable.Set[Int] = Set(1, 2)
scala> Map((1,"a"),(2,"b"))
res17: scala.collection.immutable.Map[Int,String] = Map(1 -> a, 2 -> b)
scala> val map = Map((1,"a"),(2,"b"))
map: scala.collection.immutable.Map[Int,String] = Map(1 -> a, 2 -> b)
scala> map.toSet
res18: scala.collection.immutable.Set[(Int, String)] = Set((1,a), (2,b))
scala> map.toSeq
res20: Seq[(Int, String)] = ArrayBuffer((1,a), (2,b))
scala> a.contains(1)
res21: Boolean = true
scala> a.head
res22: Int = 1
scala> a.tail
res23: Seq[Int] = List(2)
Sequence
scala> val numbers = (1,2,3,4)
numbers: (Int, Int, Int, Int) = (1,2,3,4)
scala> val numbers = Seq(1,2,3,4)
numbers: Seq[Int] = List(1, 2, 3, 4)
scala> numbers(0)
res26: Int = 1
scala> numbers:+4
res27: Seq[Int] = List(1, 2, 3, 4, 4)
scala> 0+:numbers
res28: Seq[Int] = List(0, 1, 2, 3, 4)
scala> Seq(1,2,1,3).distinct
res30: Seq[Int] = List(1, 2, 3)
scala> numbers
res31: Seq[Int] = List(1, 2, 3, 4)
scala> val numers = numbers:+4
numers: Seq[Int] = List(1, 2, 3, 4, 4)
scala> numers.distinct
res33: Seq[Int] = List(1, 2, 3, 4)
scala> val numers0 = 0+:numbers:+6
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 6)
scala> val numers0 = 0+:numbers:+6+7
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 13)
scala> val numers0 = 0+:numbers:+6:+7
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 6, 7)
scala> val numers0 = 0+:numbers:+6:+7
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 6, 7)
scala> numbers.+:(0)
res34: Seq[Int] = List(0, 1, 2, 3, 4)
Set has no duplicates
scala> val numbers = Set(1,2,3)
numbers: scala.collection.immutable.Set[Int] = Set(1, 2, 3)
scala> numbers+7
res35: scala.collection.immutable.Set[Int] = Set(1, 2, 3, 7)
scala> numbers(1)
res38: Boolean = true
scala> numbers contains 1
res40: Boolean = true
Maps
Collection of key value pairs with unique keys
scala> val map = Map( 1->"a", 2->"b" )
map: scala.collection.immutable.Map[Int,String] = Map(1 -> a, 2 -> b)
scala> map(1)
res47: String = a
scala> map.get(9)
res48: Option[String] = None
scala> map.getOrElse(1,"z")
res49: String = a
scala> map.getOrElse(9,"z")
res50: String = z
scala> Vector(1,2,3)
res0: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)
scala> Seq(1,2,3)
res1: Seq[Int] = List(1, 2, 3)
scala> Set(1,2,3)
res2: scala.collection.immutable.Set[Int] = Set(1, 2, 3)
scala> Set(1,2,"3")
res3: scala.collection.immutable.Set[Any] = Set(1, 2, 3)
Collections has a default apply method
scala> Vector(1,2,3)
res4: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)
scala> Vector.apply(1,2,3)
res5: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)
Type of collection can be passed as parameter in []
scala> Vector[Int](1,2,3)
res6: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)
scala> Vector[String]("a","b","c")
res8: scala.collection.immutable.Vector[String] = Vector(a, b, c)
scala> Vector("a","b","c")
res9: scala.collection.immutable.Vector[String] = Vector(a, b, c)
Collection operations like +, toSet, toSeq, contains, head, last, tail,
scala> val a = Seq(1,2)
a: Seq[Int] = List(1, 2)
scala> val b = Seq(3,4)
b: Seq[Int] = List(3, 4)
scala> a.toSet
res16: scala.collection.immutable.Set[Int] = Set(1, 2)
scala> Map((1,"a"),(2,"b"))
res17: scala.collection.immutable.Map[Int,String] = Map(1 -> a, 2 -> b)
scala> val map = Map((1,"a"),(2,"b"))
map: scala.collection.immutable.Map[Int,String] = Map(1 -> a, 2 -> b)
scala> map.toSet
res18: scala.collection.immutable.Set[(Int, String)] = Set((1,a), (2,b))
scala> map.toSeq
res20: Seq[(Int, String)] = ArrayBuffer((1,a), (2,b))
scala> a.contains(1)
res21: Boolean = true
scala> a.head
res22: Int = 1
scala> a.tail
res23: Seq[Int] = List(2)
Sequence
scala> val numbers = (1,2,3,4)
numbers: (Int, Int, Int, Int) = (1,2,3,4)
scala> val numbers = Seq(1,2,3,4)
numbers: Seq[Int] = List(1, 2, 3, 4)
scala> numbers(0)
res26: Int = 1
scala> numbers:+4
res27: Seq[Int] = List(1, 2, 3, 4, 4)
scala> 0+:numbers
res28: Seq[Int] = List(0, 1, 2, 3, 4)
scala> Seq(1,2,1,3).distinct
res30: Seq[Int] = List(1, 2, 3)
scala> numbers
res31: Seq[Int] = List(1, 2, 3, 4)
scala> val numers = numbers:+4
numers: Seq[Int] = List(1, 2, 3, 4, 4)
scala> numers.distinct
res33: Seq[Int] = List(1, 2, 3, 4)
scala> val numers0 = 0+:numbers:+6
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 6)
scala> val numers0 = 0+:numbers:+6+7
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 13)
scala> val numers0 = 0+:numbers:+6:+7
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 6, 7)
scala> val numers0 = 0+:numbers:+6:+7
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 6, 7)
scala> numbers.+:(0)
res34: Seq[Int] = List(0, 1, 2, 3, 4)
Set has no duplicates
scala> val numbers = Set(1,2,3)
numbers: scala.collection.immutable.Set[Int] = Set(1, 2, 3)
scala> numbers+7
res35: scala.collection.immutable.Set[Int] = Set(1, 2, 3, 7)
scala> numbers(1)
res38: Boolean = true
scala> numbers contains 1
res40: Boolean = true
Maps
Collection of key value pairs with unique keys
scala> val map = Map( 1->"a", 2->"b" )
map: scala.collection.immutable.Map[Int,String] = Map(1 -> a, 2 -> b)
scala> map(1)
res47: String = a
scala> map.get(9)
res48: Option[String] = None
scala> map.getOrElse(1,"z")
res49: String = a
scala> map.getOrElse(9,"z")
res50: String = z
Scala collections come in 3 flavors (default immutable is used - without imports)
- scala.collection package (base)
- scala.collection.immutable
- scala.collection.mutable
Exception - scala.collection.Seq (because of Arrays)
Immutable collections are persistent
Monday, January 9, 2017
Query space utilization in Vertica
SELECT /*+ label(compressed_table_size)*/
anchor_table_schema,
SUM(used_bytes) / ( 1024^3 ) AS used_compressed_gb
FROM v_monitor.projection_storage
GROUP BY anchor_table_schema
ORDER BY SUM(used_bytes) DESC;
Output:
A_uat 83.9939151955768
B_test 0.3224210254848
anchor_table_schema,
SUM(used_bytes) / ( 1024^3 ) AS used_compressed_gb
FROM v_monitor.projection_storage
GROUP BY anchor_table_schema
ORDER BY SUM(used_bytes) DESC;
Output:
A_uat 83.9939151955768
B_test 0.3224210254848
Wednesday, January 4, 2017
Spark Fundamentals - RDD
The main abstraction Spark provides is a resilient distributed dataset (RDD)
Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.
Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.
RDD is a logical reference of a
dataset
which is partitioned across many server machines in the cluster. RDDs are Immutable and are self recovered in case of failure.dataset
could be the data loaded externally by the user. It could be a json file, csv file or a text file with no specific data structure.
RDDs have the following properties –
- Immutability and partitioning: RDDs composed of collection of records which are partitioned. Partition is basic unit of parallelism in a RDD, and each partition is one logical division of data which is immutable and created through some transformations on existing partitions.Immutability helps to achieve consistency in computations.Users can define their own criteria for partitioning based on keys on which they want to join multiple datasets if needed.
- Coarse grained operations: Coarse grained operations are operations which are applied to all elements in datasets. For example – a map, or filter or groupBy operation which will be performed on all elements in a partition of RDD.
- Fault Tolerance: Since RDDs are created over a set of transformations , it logs those transformations, rather than actual data.Graph of these transformations to produce one RDD is called as Lineage Graph.
For example –
firstRDD=sc.textFile("hdfs://...")
secondRDD=firstRDD.filter(someFunction);
thirdRDD = secondRDD.map(someFunction);
result = thirdRDD.count()
In case of we lose some partition of RDD , we can replay the transformation on that partition in lineage to achieve the same computation, rather than doing data replication across multiple nodes.This characteristic is biggest benefit of RDD , because it saves a lot of efforts in data management and replication and thus achieves faster computations.
- Lazy evaluations: Spark computes RDDs lazily the first time they are used in an action, so that it can pipeline transformations. So , in above example RDD will be evaluated only when count() action is invoked.
- Persistence: Users can indicate which RDDs they will reuse and choose a storage strategy for them (e.g., in-memory storage or on Disk etc.)
These properties of RDDs make them useful for fast computations.
Monday, January 2, 2017
Setting up Spark-scala development environment
Pre-requisites:
1. Oracle Virtualbox
Download and install from https://www.virtualbox.org/ and create a ubuntu VM
2. Java1.7 or 1.8
3. Scala 2.x.x (any version)
Download and install scala. Setup home and PATH
https://www.scala-lang.org/download/install.html
4. Install sbt from http://www.scala-sbt.org/0.13/docs/Using-sbt.html
Setup home and path
5. Install and setup spark from
http://creativelytechie.blogspot.ca/2016/07/steps-to-installing-spark.html
Ensure version compatibility of spark with scala
5. Eclipse IDE in VM
Install eclipse IDE (new or upgrade existing eclipse for scala) from
http://scala-ide.org/download/prev-stable.html
1. Oracle Virtualbox
Download and install from https://www.virtualbox.org/ and create a ubuntu VM
2. Java1.7 or 1.8
3. Scala 2.x.x (any version)
Download and install scala. Setup home and PATH
https://www.scala-lang.org/download/install.html
4. Install sbt from http://www.scala-sbt.org/0.13/docs/Using-sbt.html
Setup home and path
5. Install and setup spark from
http://creativelytechie.blogspot.ca/2016/07/steps-to-installing-spark.html
Ensure version compatibility of spark with scala
5. Eclipse IDE in VM
Install eclipse IDE (new or upgrade existing eclipse for scala) from
http://scala-ide.org/download/prev-stable.html
Tuesday, November 1, 2016
Spark - Broadcast Joins
In continuation to the previous post, using the same example of stations and trips,
scala> val bcStations = sc.broadcast(station.keyBy(_.id).collectAsMap)
bcStations: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,Station]] = Broadcast(19)
scala> val bcStations = sc.broadcast(station.keyBy(_.id))
WARN MemoryStore: Not enough space to store block broadcast_20 in memory! Free memory is 252747804 bytes.
WARN MemoryStore: Persisting block broadcast_20 to disk instead.
bcStations: org.apache.spark.broadcast.Broadcast[org.apache.spark.rdd.RDD[(Int, Station)]] = Broadcast(20)
scala> val bcStations = sc.broadcast(station.keyBy(_.id).collectAsMap)
bcStations: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,Station]] = Broadcast(19)
scala> val bcStations = sc.broadcast(station.keyBy(_.id))
WARN MemoryStore: Not enough space to store block broadcast_20 in memory! Free memory is 252747804 bytes.
WARN MemoryStore: Persisting block broadcast_20 to disk instead.
bcStations: org.apache.spark.broadcast.Broadcast[org.apache.spark.rdd.RDD[(Int, Station)]] = Broadcast(20)
scala> bcStations.value.take(3).foreach(println)
(83,Station(83,Mezes Park,37.491269,-122.236234,15,Redwood City,2/20/2014))
(77,Station(77,Market at Sansome,37.789625,-122.400811,27,San Francisco,8/25/2013))
(23,Station(23,San Mateo County Center,37.488501,-122.231061,15,Redwood City,8/15/2013))
Spark - aggregateByKey and groupByKey Example
Consider an example of trips and stations
Before we begin with aggregateByKey or groupByKey, lets load the data from text files, create RDDs and print duration of trips.
Create case classes for station and trip
---------
case class Trip(id:Int, duration:Int, startDate:String, startStation:String, startTerminal:String, endDate:String, endStation:String, endTerminal:String, bike:String,subscriberType:String);
case class Station(id:Int,name:String,lat:String,lon:String,docks:String,landmark:String,installDate:String)
Load and map trip to case class
------
val tripWithHeader = sc.textFile("file:////home/vm4learning/Downloads/data/trips/trip_data.csv")
val tripHeader = tripWithHeader.first()
val trips = tripWithHeader.filter(_!=tripHeader).map(_.split(","))
val trip = trips.map( t=>Trip(t(0).toInt,t(1).toInt,t(2),t(3),t(4),t(5),t(6),t(7),t(8),t(9)) )
Load and map station to case class
------
val stationsWithHeader = sc.textFile("file:////home/vm4learning/Downloads/data/stations/station_data.csv")
val stationHeader = stationsWithHeader.first()
val stations = stationsWithHeader.filter(_!=stationHeader).map(_.split(","))
val station = stations.map( s=>Station(s(0),s(1),s(2),s(3),s(4),s(5),s(6),s(7))
Calculate durationByStartStation
--------
val byStartTerminal = trip.keyBy(_.startStation)
byStartTerminal.take(2).foreach(println)
(South Van Ness at Market,Trip(4576,63,8/29/2013 14:13,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber))
(San Jose City Hall,Trip(4607,70,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber))
val durationByStart = byStartTerminal.mapValues(_.duration)
durationByStart: org.apache.spark.rdd.RDD[(String, Int)] = MappedValuesRDD[11] at mapValues at <console>:24
gru
scala> durationByStart.take(2).foreach(println)
(South Van Ness at Market,63)
(San Jose City Hall,70)
Now calculate average using groupByKey
val grouped = durationByStart.groupByKey.mapValues(list=>list.sum/list.size)
scala> grouped.take(3).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)
groupByKey shuffles keys across all partitions.
aggregateByKey on the other hand uses combiner concept of MapReduce, and combines all values within a partition, and then reduces the result from multiple partitions. The combined output from all partitiones when reduced, will involve much less shuffling
scala> val results = durationByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
results: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[14] at aggregateByKey at <console>:26
scala> results.take(3).foreach(println)
(Mountain View City Hall,(2330658,1401))
(California Ave Caltrain Station,(3144492,626))
(San Jose Civic Center,(2483057,820))
Lets see aggregateByKey in details.
val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
That's just the zeroValue, the initial amount you will start the aggregation.
val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
This is the function (exactly as combiner in MR) to aggregate the values of each key. It keeps the running tally of sum + count so that we could calculate the averages later. It performs the reduce function in the current partition before the data is shuffled out. This reduces the unnecessary data from being transferred out.
val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
This is the function to aggregate the output of the first function -- happens after the shuffle to do
the final aggregation. This adds up all the sums and counts together. A lot more efficient because
everything comes in as compact as possible.
Also, in the same example, try running and see the time difference
scala> finalAvg.take(10).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)
(Yerba Buena Center of the Arts (3rd @ Howard),970)
(SJSU 4th at San Carlos,1966)
(Commercial at Montgomery,761)
(2nd at South Park,683)
(University and Emerson,6545)
(Embarcadero at Sansome,1549)
(Townsend at 7th,746)
scala> grouped.take(10).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)
(Yerba Buena Center of the Arts (3rd @ Howard),970)
(Commercial at Montgomery,761)
(SJSU 4th at San Carlos,1966)
(2nd at South Park,683)
(University and Emerson,6545)
(Embarcadero at Sansome,1549)
(Townsend at 7th,746)
Find the first trip starting at each terminal
val firstTrips = byStartTerminal.reduceByKey((a,b)=> { a.startDate>b.startDate match {case true=>a case false=>b}} )
firstTrips.take(2).foreach(println)
(Mountain View City Hall,Trip(15865,1072,9/9/2013 17:47,Mountain View City Hall,27,9/9/2013 18:05,Rengstorff Avenue / California Street,33,151,Subscriber))
(California Ave Caltrain Station,Trip(15583,824,9/9/2013 13:23,California Ave Caltrain Station,36,9/9/2013 13:37,University and Emerson,35,59,Customer))
Before we begin with aggregateByKey or groupByKey, lets load the data from text files, create RDDs and print duration of trips.
Create case classes for station and trip
---------
case class Trip(id:Int, duration:Int, startDate:String, startStation:String, startTerminal:String, endDate:String, endStation:String, endTerminal:String, bike:String,subscriberType:String);
case class Station(id:Int,name:String,lat:String,lon:String,docks:String,landmark:String,installDate:String)
Load and map trip to case class
------
val tripWithHeader = sc.textFile("file:////home/vm4learning/Downloads/data/trips/trip_data.csv")
val tripHeader = tripWithHeader.first()
val trips = tripWithHeader.filter(_!=tripHeader).map(_.split(","))
val trip = trips.map( t=>Trip(t(0).toInt,t(1).toInt,t(2),t(3),t(4),t(5),t(6),t(7),t(8),t(9)) )
Load and map station to case class
------
val stationsWithHeader = sc.textFile("file:////home/vm4learning/Downloads/data/stations/station_data.csv")
val stationHeader = stationsWithHeader.first()
val stations = stationsWithHeader.filter(_!=stationHeader).map(_.split(","))
val station = stations.map( s=>Station(s(0),s(1),s(2),s(3),s(4),s(5),s(6),s(7))
Calculate durationByStartStation
--------
val byStartTerminal = trip.keyBy(_.startStation)
byStartTerminal.take(2).foreach(println)
(South Van Ness at Market,Trip(4576,63,8/29/2013 14:13,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber))
(San Jose City Hall,Trip(4607,70,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber))
val durationByStart = byStartTerminal.mapValues(_.duration)
durationByStart: org.apache.spark.rdd.RDD[(String, Int)] = MappedValuesRDD[11] at mapValues at <console>:24
gru
scala> durationByStart.take(2).foreach(println)
(South Van Ness at Market,63)
(San Jose City Hall,70)
Now calculate average using groupByKey
val grouped = durationByStart.groupByKey.mapValues(list=>list.sum/list.size)
scala> grouped.take(3).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)
groupByKey shuffles keys across all partitions.
aggregateByKey on the other hand uses combiner concept of MapReduce, and combines all values within a partition, and then reduces the result from multiple partitions. The combined output from all partitiones when reduced, will involve much less shuffling
scala> val results = durationByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
results: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[14] at aggregateByKey at <console>:26
scala> results.take(3).foreach(println)
(Mountain View City Hall,(2330658,1401))
(California Ave Caltrain Station,(3144492,626))
(San Jose Civic Center,(2483057,820))
scala> val finalAvg = results.mapValues(i=>i._1/i._2)
scala> finalAvg.take(3).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)
Lets see aggregateByKey in details.
val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
That's just the zeroValue, the initial amount you will start the aggregation.
val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
This is the function (exactly as combiner in MR) to aggregate the values of each key. It keeps the running tally of sum + count so that we could calculate the averages later. It performs the reduce function in the current partition before the data is shuffled out. This reduces the unnecessary data from being transferred out.
val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
This is the function to aggregate the output of the first function -- happens after the shuffle to do
the final aggregation. This adds up all the sums and counts together. A lot more efficient because
everything comes in as compact as possible.
Also, in the same example, try running and see the time difference
scala> finalAvg.take(10).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)
(Yerba Buena Center of the Arts (3rd @ Howard),970)
(SJSU 4th at San Carlos,1966)
(Commercial at Montgomery,761)
(2nd at South Park,683)
(University and Emerson,6545)
(Embarcadero at Sansome,1549)
(Townsend at 7th,746)
scala> grouped.take(10).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)
(Yerba Buena Center of the Arts (3rd @ Howard),970)
(Commercial at Montgomery,761)
(SJSU 4th at San Carlos,1966)
(2nd at South Park,683)
(University and Emerson,6545)
(Embarcadero at Sansome,1549)
(Townsend at 7th,746)
val firstTrips = byStartTerminal.reduceByKey((a,b)=> { a.startDate>b.startDate match {case true=>a case false=>b}} )
firstTrips.take(2).foreach(println)
(Mountain View City Hall,Trip(15865,1072,9/9/2013 17:47,Mountain View City Hall,27,9/9/2013 18:05,Rengstorff Avenue / California Street,33,151,Subscriber))
(California Ave Caltrain Station,Trip(15583,824,9/9/2013 13:23,California Ave Caltrain Station,36,9/9/2013 13:37,University and Emerson,35,59,Customer))
Wednesday, July 20, 2016
Creating an Oozie coordinator job for scheduling a Shell script
Step 1: Create coordinator.properties file
oozie.use.system.libpath=true
nameNode=hdfs://<name>:8020
jobTracker=hdfs://<name>:8021
dfs.namenode.kerberos.principal=<value from hdfs-site.xml>
yarn.resourcemanager.principal=<value from hdfs-site.xml>
dfs.web.authentication.kerberos.keytab=<value from hdfs-site.xml>
dfs.web.authentication.kerberos.principal=<value from hdfs-site.xml>
nfs.kerberos.principal=<value from hdfs-site.xml>
nfs.keytab.file=/<value from hdfs-site.xml>
queueName=default
rootDirectory=<path>
oozie.coord.application.path=${nameNode}/${rootDirectory}/coordinator.xml
shellScript=<shell script.sh name>
shellScriptPath=<shell script path>
emailToAddress=<email address>
oozie.email.smtp.host=<value from oozie-site.xml>
oozie.email.smtp.port=<value from oozie-site.xml>
oozie_web=<value from oozie-site.xml>
Step 2: Create coordinator.xml file
<coordinator-app name="name" frequency="${coord:days(1)}" start="2015-07-13T20:20Z" end="2015-07-13T20:59Z" timezone="Canada/Eastern" xmlns="uri:oozie:coordinator:0.1">
<action>
<workflow>
<app-path>${nameNode}/${rootDirectory}/workflow.xml</app-path>
</workflow>
</action>
</coordinator-app>
Step 3: Create workflow.xml
This workflow.xml file sends email on success and failure to the respective configured in coordinator.properties
<workflow-app xmlns='uri:oozie:workflow:0.3' name='shell-wf'>
<start to='shell1' />
<action name='shell1'>
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>${shellScript}</exec>
<argument>D1</argument>
<argument>D2</argument>
<file>${shellScript}#${shellScript}</file> <!--Copy the executable to compute node's current working directory -->
</shell>
<ok to="sendEmail"/>
<error to="kill-email"/>
</action>
<action name="sendEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<subject>Email notifications for job ${wf:id()} success</subject>
<body>The job wf ${wf:id()} successfully completed.</body>
</email>
<ok to="end"/>
<error to="end"/>
</action>
<action name="kill-email">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<subject>Email notifications for job ${wf:id()} failure</subject>
<body>The job wf ${wf:id()} Failed. More details about the job : ${oozie_web}/${wf:id()}//</body>
</email>
<ok to="killAction"/>
<error to="killAction"/>
</action>
<kill name="killAction">
<message>Script failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>
oozie.use.system.libpath=true
nameNode=hdfs://<name>:8020
jobTracker=hdfs://<name>:8021
dfs.namenode.kerberos.principal=<value from hdfs-site.xml>
yarn.resourcemanager.principal=<value from hdfs-site.xml>
dfs.web.authentication.kerberos.keytab=<value from hdfs-site.xml>
dfs.web.authentication.kerberos.principal=<value from hdfs-site.xml>
nfs.kerberos.principal=<value from hdfs-site.xml>
nfs.keytab.file=/<value from hdfs-site.xml>
queueName=default
rootDirectory=<path>
oozie.coord.application.path=${nameNode}/${rootDirectory}/coordinator.xml
shellScript=<shell script.sh name>
shellScriptPath=<shell script path>
emailToAddress=<email address>
oozie.email.smtp.host=<value from oozie-site.xml>
oozie.email.smtp.port=<value from oozie-site.xml>
oozie_web=<value from oozie-site.xml>
Step 2: Create coordinator.xml file
<coordinator-app name="name" frequency="${coord:days(1)}" start="2015-07-13T20:20Z" end="2015-07-13T20:59Z" timezone="Canada/Eastern" xmlns="uri:oozie:coordinator:0.1">
<action>
<workflow>
<app-path>${nameNode}/${rootDirectory}/workflow.xml</app-path>
</workflow>
</action>
</coordinator-app>
Step 3: Create workflow.xml
This workflow.xml file sends email on success and failure to the respective configured in coordinator.properties
<workflow-app xmlns='uri:oozie:workflow:0.3' name='shell-wf'>
<start to='shell1' />
<action name='shell1'>
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>${shellScript}</exec>
<argument>D1</argument>
<argument>D2</argument>
<file>${shellScript}#${shellScript}</file> <!--Copy the executable to compute node's current working directory -->
</shell>
<ok to="sendEmail"/>
<error to="kill-email"/>
</action>
<action name="sendEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<subject>Email notifications for job ${wf:id()} success</subject>
<body>The job wf ${wf:id()} successfully completed.</body>
</email>
<ok to="end"/>
<error to="end"/>
</action>
<action name="kill-email">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<subject>Email notifications for job ${wf:id()} failure</subject>
<body>The job wf ${wf:id()} Failed. More details about the job : ${oozie_web}/${wf:id()}//</body>
</email>
<ok to="killAction"/>
<error to="killAction"/>
</action>
<kill name="killAction">
<message>Script failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>
Saturday, July 16, 2016
Steps to Installing Spark
Check java version else install jdk
$ sudo apt-get -y update
$ sudo apt-get -y install openjdk-7-jdk
Download pre-built spark
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-hadoop1.tgz
$ tar -zxf spark-1.1.0-bin-hadoop1.tgz
Test Installation$ cd spark-1.1.0-bin-hadoop1
$ ./bin/spark-shell
spark> val textFile = sc.textFile("README.md")
spark> textFile.count()
Monday, May 9, 2016
Ubuntu - List groups and adding users
To list all groups in ubuntu
To view a particular user
cat /etc/group
To view a particular user
cat /etc/group | grep [username]
To add a user to a specific group
sudo usermod -G name-of-group -a 'name-of-user'
Saturday, May 7, 2016
Saturday, February 20, 2016
Compression Settings in Mapreduce
1. To read compressed input files, and the output is also compressed
In the driver class, add the below:
jobConf.setBoolean("mapred.output.compress",true)
jobConf.setClass("mapred.output.compression.codec","GzipCodec.class","CompressionCodec.class")
Running the program over compressed input:
% hadoop jar MaxTempWithCompression input/tanu/input.txt.gz output
% gunzip -c output/part-r-00000.gz
1949 111
1950 20
2. To compress the mapper output
jonconf.setCompressMapOutput(true);
jobConf.setMapOutputCompressorClass(GzipCodec.class);
In the driver class, add the below:
jobConf.setBoolean("mapred.output.compress",true)
jobConf.setClass("mapred.output.compression.codec","GzipCodec.class","CompressionCodec.class")
Running the program over compressed input:
% hadoop jar MaxTempWithCompression input/tanu/input.txt.gz output
% gunzip -c output/part-r-00000.gz
1949 111
1950 20
2. To compress the mapper output
jonconf.setCompressMapOutput(true);
jobConf.setMapOutputCompressorClass(GzipCodec.class);
Thursday, February 18, 2016
Designing MR Jobs to Improve Debugging
When dealing with tons of jobs each day, with tera,peta,zeta bytes of data in xmls and other files, designing our MR jobs efficiently becomes crucial to Big Data handling.
I will discuss certain ways that can help us design and develop an efficient MR job.
1. Implement Exception Handling
Most basic is to ensure implementing exception handling with all possible exceptions in the catch block, in order of most special to generic ones. Also, do not leave the catch block empty. Write proper code, that will help debugging issues. Few helpful System.err.println can be given to be viewed in Yarn logs (How to view sysouts in yarn logs) for debugging.
If you want the job to run successfully, then do not throw the exception. Rather catch it.
2. Use Counters
Consider a real world scenario where we have thousands of xml each day, and due to 1 or 2 invalid XMLs, the complete job fails.
A solution for such problem is using Counters. That can ensure how many xmls failed and on which line no, and still the job can continue successfully. The Invalid xmls can be later processed after doing the correction if needed.
Steps:
1. Create an enum in your mapper class ( outside map method)
enum InvalidXML{
Counter;
}
2. Write you XML processing code and in catch block
catch(XMLStreamException e){
context.setStatus("Detected Invalid Record in Xml. Check Logs);
context.getCounter(InvalidXML.Counter).increment(1);
}
We can also print the input file name using
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
context.setStatus("Detected Invalid Record in Xml " + fileName+ ". Check Logs");
3. When this jobs Runs, it will complete successfully, but in Job Logs it will display the status as below
On clicking logs, we can see
Another use of Counters is in displaying the Total no of Files Processed or Records Processed. For that, counters need to be incremented in the try block.
Counters will display files and issues for the current date (how many records failed etc), but if we need to debug issues for an older date. For that we can do the following
3. Use MultiOutputFormat in catch block to write errors to separate text files with date etc.
The MultiOutputFormat class simplifies writing output data to multiple outputs.
write(String alias, K key, V value, org.apache.hadoop.mapreduce.TaskInputOutputContext context)
In the catch block, we can write something like below
catch(SomeException e){
MultiOutputFormat.write("XMLErrorLogs"+date, storeId, businessDate+e.getMessage(), context);
}
The name of the text files can be ErrorLogs etc along with date etc.
The key can be your Id on which you would like to search in hive
The value can be the content you wish to search along with the errorMessage.
Once these logs are loaded into Hive, we can easily query to see, which files from which Stores are giving more issues. Or on an earlier business date, what errors we got in which files and for which stores.
Lot of relevant information can be stored and queried for valuable analysis. This can really help to debug and support huge big data application.
Hope this article will help many.
I will discuss certain ways that can help us design and develop an efficient MR job.
1. Implement Exception Handling
Most basic is to ensure implementing exception handling with all possible exceptions in the catch block, in order of most special to generic ones. Also, do not leave the catch block empty. Write proper code, that will help debugging issues. Few helpful System.err.println can be given to be viewed in Yarn logs (How to view sysouts in yarn logs) for debugging.
If you want the job to run successfully, then do not throw the exception. Rather catch it.
2. Use Counters
Consider a real world scenario where we have thousands of xml each day, and due to 1 or 2 invalid XMLs, the complete job fails.
A solution for such problem is using Counters. That can ensure how many xmls failed and on which line no, and still the job can continue successfully. The Invalid xmls can be later processed after doing the correction if needed.
Steps:
1. Create an enum in your mapper class ( outside map method)
enum InvalidXML{
Counter;
}
2. Write you XML processing code and in catch block
catch(XMLStreamException e){
context.setStatus("Detected Invalid Record in Xml. Check Logs);
context.getCounter(InvalidXML.Counter).increment(1);
}
We can also print the input file name using
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
context.setStatus("Detected Invalid Record in Xml " + fileName+ ". Check Logs");
3. When this jobs Runs, it will complete successfully, but in Job Logs it will display the status as below
On clicking logs, we can see
Another use of Counters is in displaying the Total no of Files Processed or Records Processed. For that, counters need to be incremented in the try block.
Counters will display files and issues for the current date (how many records failed etc), but if we need to debug issues for an older date. For that we can do the following
3. Use MultiOutputFormat in catch block to write errors to separate text files with date etc.
The MultiOutputFormat class simplifies writing output data to multiple outputs.
write(String alias, K key, V value, org.apache.hadoop.mapreduce.TaskInputOutputContext context)
In the catch block, we can write something like below
catch(SomeException e){
MultiOutputFormat.write("XMLErrorLogs"+date, storeId, businessDate+e.getMessage(), context);
}
The name of the text files can be ErrorLogs etc along with date etc.
The key can be your Id on which you would like to search in hive
The value can be the content you wish to search along with the errorMessage.
Once these logs are loaded into Hive, we can easily query to see, which files from which Stores are giving more issues. Or on an earlier business date, what errors we got in which files and for which stores.
Lot of relevant information can be stored and queried for valuable analysis. This can really help to debug and support huge big data application.
Hope this article will help many.
Wednesday, February 17, 2016
Sqoop - Export (Session 5)
HDFS - Sqoop -> RDBMS
Before export, prepare the database by creating the target table.
Example: Exporting zip_profits table from hive to mysql
mysql> Create table ms_zip_profits (zip INT, profit DECIMAL(8,2));
OK
% Sqoop export --connect jdbc:mysql://localhost/hadoopguide -m 1
--table ms_zip_profits --export-dir /user/hive/warehouse/zip_profits
--input-fields-terminated-by '\t'
OK
% mysql hadoopguide -e 'select * from ms_zip_profits'
How Sqoop Export Works
Architechture of Sqoop Export is very similar to how Sqoop Import works
- Sqoop picks a strategy based on the connect string, jdbc or other
- Then generates a Java class based on target table definition
* This class can parse input records from text files
* Insert values of appropriate types into tables
* Ability to read columns from ResultSet
- Launches mapreduce job that
* Reads source data files from HDFS
* Parses records using generated Java class
* Executes the chosen JDBC strategy
Sqoop JDBC based strategy builds up Batch Insert statements, each adding multiple records to the target table.
Separate threads are used to read from HDFS and communicate with database.
Parallelism
Most MR jobs picks the degree of parallelism based on the number and size of files to process, Sqoop uses the CombineFileInputFormat to group up input files into a smaller no of map tasks
Exports
Sqoop can also export records that is not in Hive table. It can export delimited text files that are output of MapReduce jobs. It can also export records in a Sequence file to an output table though some restrictions apply.
Example of sequence file import-export
% sqoop import --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets -m 1 --class-name WidgetHolder --as-sequencefile \
> --target-dir widget_sequence_files --bindir .
...
10/07/05 17:09:13 INFO mapreduce.ImportJobBase: Retrieved 3 records.
% mysql hadoopguide
mysql> CREATE TABLE widgets2(id INT, widget_name VARCHAR(100),
-> price DOUBLE, designed DATE, version INT, notes VARCHAR(200));
Query OK, 0 rows affected (0.03 sec)
mysql> exit;
% sqoop export --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets2 -m 1 --class-name WidgetHolder \
> --jar-file widgets.jar --export-dir widget_sequence_files
...
10/07/05 17:26:44 INFO mapreduce.ExportJobBase: Exported 3 records.
More Examples of Sqoop Export
sqoop list-tables --driver com.teradata.jdbc.TeraDriver --connect connectString --username hue --password hue
sqoop eval --driver com.teradata.jdbc.TeraDriver --connect connectString --username hue --password hue --query "delete STG.POS_TABLE all"
sqoop export --sqoop.export.records.per.statement=5000 --driver com.teradata.jdbc.TeraDriver --connect connectString --username hue --password hue --columns "TERR_CD,POS_BUSN_DT,gbal_id_nu,pos_area_typ_shrt_ds,pos_prd_dlvr_meth_cd,dypt_id_nu,dypt_ds,dy_of_cal_wk_ds,cd,digl_offr_trn_cnt_qt,digl_offr_trn_itm_cnt_qt,digl_offr_net_trn_am,digl_offr_grss_trn_am,non_digl_offr_trn_cnt_qt,non_digl_offr_trn_itm_cnt_qt,non_digl_offr_net_trn_am,non_digl_offr_grss_trn_am,tot_trn_cnt_qt,tot_trn_itm_cnt_qt,tot_net_trn_am,tot_grss_trn_am,digl_offr_tot_ord_sc_qt,non_digl_offr_tot_ord_sc_qt,tot_ord_tm_sc_qt,curn_iso_nu"
--table STG.TABLE --input-fields-terminated-by '\t' -m 1 --export-dir /user/hive/warehouse/DLY_REC --batch
The above query was not working with batch of 10000, reducing the number worked fine.
Subscribe to:
Posts (Atom)