Thursday, June 20, 2019

Spark: Narrow and Wide Dependencies

Narrow Dependencies: Where each partition of the parent RDD is used by Atmost one partition of the child RDD

Wide Dependencies: (Requires Shuffle) where multiple child partitions may depend upon it


Monday, April 29, 2019

SparkStreaming

Reading and Writing JSON

sparkSession.read.json(path_or_rdd)
- Uses a DataFrameReader to read json records (and other formats) from a file or RDD, infer the schema and create a DataFrame
- Can convert to DataSet[T] afterwards

mydataset.write.json
- Uses a DataFrameWriter to write a Dataset as json formatted records (or other formats)

mydataset.toJSON
- Converts the Dataset[T] to Dataset[String] of Json-formatted record strings


Handling Bad Records
- DataFrameReader prepends an extra column _corrup_record:
   - null if the records is valid json
   - json string if its invalid json

DataFrameReader and Writer also supports other formats
- csv
- jdbc
- orc
- text
- other table formats


Structured Streaming - Spark 1.X

Spark started as batch mode system, like MapReduce
- Spark streaming is a mini batch system
   - Each mini-batch of data (for each time interval) is stored in an RDD
   - DStream holds the sequence of mini-batch RDDs, and also offers window functions
   - Time interval typically 1/2 to 60+ seconds

Structured Streaming - Spark 2.X

- Started as moving away from "min-batch" model
Spark streaming is a mini batch system
   - Longer latency of mini-batches is too long for many latency sensitive applications
   - Based on Datasets rather than RDDs
   - still exprimental in Spark 2.x
   - will replace Spark Streaming once it is mature


Larger context of Spark Streaming




Spark - Streaming: Data Pipeline

- Input comes from Kafka, Flume, HDFS/S3, Kinesis and Twitter.
- Batches of input data go from Spark Streaming to Spark Engine
- Batches of processed data go to HDFS, Databases and dashboards


Checkpointing and D-streaming

- For DStreams, Checkpointing is done automatically, otherwise RDD Lineage would grow too long 
- metadata checkpoint is written for every batch
- data checkpoint is configured for DStream.checkpoint(duration) and duration must be in (n * batch_interval) for some n
- default is if batch_interval > 10 sec, checkpointing defaults to every batch
                   if batch_interval < 10 sec, then defaults to closest n, where (n  * batch_interval) > 10 sec


Spark - Datasets Components and Optimizer

SparkSQL Components
- Catalyst
- Execution Core
    - Query planner that translates the logical queries into actual Dataset operations
    - SparkSession (2.X) or SQLContext (1.X) is defined in core
- Hive Integration

Catalyst Optimizer
- Backend agnostic - supports both SQL and Dataset code
- Manipulation of trees of relational operators and expressions

Execution Core: SparkSession

Spark 1.X - SQLContext

val sc = new SparkContext(master,none)
val sqlContext = new SQLContext(sc)

Spark 2.X - SparkSession which also creates the SparkContext

val spark = SparkSession.builder().master(master).appName(name).getOrCreate()
val sc = spark.sparkContext


Hive Integration

- Interoperate with Hadoop Hive tables and metastore
   -Spark 1.x, use HiveContext an extension of SQLContext
   -Spark 2.x, enables Hive support in SparkSession
- Create, read and delete Hive tables
- Use Hive SerDe and UDFs

Friday, April 26, 2019

Spark Datasets

Datasets
- Distributed collection of structured data, which are analogous to Relational Tables.
- Examples of dataset API is Python Pandas and R Dataframes.
- Deliver better performance than RDDs.
- Supported by Scala, Java, Python and R
- Uses a new Query optimizer - Catalyst Aggressive Internal Optimizations


Datasets vs Dataframes

type DataFrame = Dataset[Row]

- Original API was called DataFrames with uses Rows to represent the rows in the data frame, but Row loses type-safety. Each column is effectivel untyped
- Dataset was introduced in Spark 1.6 for restoring type safety - for catching errors at compile time
- We ca still work with Dataframes as if they are regular types, but in reality we are working with DataSet[Row] instances

Thursday, April 25, 2019

Scala - Catching Exceptions & Optional Values

Catching Exceptions

- No Checked Exception in Scala
- Use try-catch expressions to catch an exception
- In the catch clause - define one or more match alternatives
- The finally clause is optional. Scala provides a useful alternative Try

def toInt( s:String ) : Int = try {
     | s.toInt
     | } catch {
     | case _: NumberFormatException => 0
     | }


Using Try

- Try executes a given expression and returns true or failure

scala> import scala.util.{Try,Success,Failure}
import scala.util.{Try, Success, Failure}

scala> Try("100".toInt)
res60: scala.util.Try[Int] = Success(100)

scala> Try("Martin".toInt)
res61: scala.util.Try[Int] = Failure(java.lang.NumberFormatException: For input string: "Martin")


Pattern Matching on Try

- Use try in the same way as Option. One way is with Pattern Matching

scala> def makeInt(s:String): Int = Try(s.toInt) match {
     | case Success(n) => n
     | case Failure(_) => 0
     | }
makeInt: (s: String)Int

scala> makeInt("100")
res62: Int = 100

scala> makeInt("Hello")
res63: Int = 0


Higher Order Functions on Try

- getOrElse  extracts the wrapped value or returns the default
- Try offers Higher Order Functions known from collections

scala> def makeInt(s:String) = Try(s.toInt).getOrElse(0)
makeInt: (s: String)Int

scala> Success("scala").map(_.reverse)
res64: scala.util.Try[String] = Success(alacs)

scala> for {
     | lang <- Success("Scala")
     | behav <- Success("Rocks")
     | } yield s"$lang $behav"
res65: scala.util.Try[String] = Success(Scala Rocks)


Optional Values

The Option Class - It is an ADT with two possible types
- Some case class, wrapping a value
- None singleton object

scala> val langs = Map( "s" -> "Scala", "j" -> "Java"  )
langs: scala.collection.immutable.Map[String,String] = Map(s -> Scala, j -> Java)

scala> langs.get("s")
res41: Option[String] = Some(Scala)

scala> langs.get("c")
res42: Option[String] = None


Using Option Instances

Some and None can be used directly 

Some("Scala")
Option(null)

Pattern matching on Option

- The type system enforces us to handle an optional value
- One way is with Pattern matching

scala> def langsByFirst( s: String ): String = langs.get(s) match {
     | case Some(language) => language
     | case None => s"No language starting with $s"
     | }
langsByFirst: (s: String)String

scala> langsByFirst("s")
res47: String = Scala

scala> langsByFirst("c")
res48: String = No language starting with c


Higher Order Functions on Option

- Option offers higher order functions known from collections
- Using higher order functions such as map and flatmap on Option, allows to easily compose a chain of calls that handles missing values gracefully

scala> Option("Scala").map(_.reverse)
res49: Option[String] = Some(alacS)

scala> for {
     | lang <- Some("Scala")
     | behaviour <- Some("rocks")
     | } yield s"$lang $behaviour"
res53: Option[String] = Some(Scala rocks)

scala> val langs = Map("s" -> "Scala", "j" -> "Java")
langs: scala.collection.immutable.Map[String,String] = Map(s -> Scala, j -> Java)

- foreach calls a function only if a value is present

scala> langs.get("s").foreach(println)
Scala

scala> langs.get("c").foreach(println)


Obtaining the value 

- The getOrElse extracts the wrapped value or returns a default

scala> def langsByFirst(s:String): String = langs.get(s).getOrElse(s"No langs starting with $s")
langsByFirst: (s: String)String

scala> langsByFirst("c")
res56: String = No langs starting with c

Scala Match Expressions

- Expressions are matched against a pattern
- Difference to the switch statement of C or Java
   - No fall through to the next alternative
   - Match expressions return a value
- MatchError is thrown if no pattern matches

expr match {
  case pattern1 => result1
  case pattern2 => result2
}

Match pattern 
case pattern => result

- case declares a match pattern
- If pattern matches, result will be evaluated and returned

Pattern 1

Wildcard Pattern

 _ is used as a wildcard to match everything

scala> def whatTimeIsIt(any: Any): String = any match {
     | case _ => s"any is no time"
     | }
whatTimeIsIt: (any: Any)String

scala> whatTimeIsIt(20)
res18: String = any is no time


Variable Pattern

scala> def whatTimeIsIt(any: Any): String = any match {
     | case any => s"$any is no time"
     | }
whatTimeIsIt: (any: Any)String

scala> whatTimeIsIt("winter")
res21: String = winter is no time


Typed Pattern

- Use a type annotation to match only certain annotations
- Typed pattern needs to be composed with the wildcard and variable pattern

scala> def whatTimeIsIt( any:Any ): String = any match {
     | case time: Time => s"Time is ${Time.hours}:{Time.minutes}"
     | case _ => s"$any is no time"
     | }

scala> whatTimeIsIt(Time(9,30))

Constant Pattern

- Use a stable identifier to match something constant

scala> def whatTimeIsIt( any:Any ): String = any match {
     | case "12:00" =>  "High Noon"
     | case _ => s"$any is no time"
     | }

scala> whatTimeIsIt("12:00")
res23: String = High Noon

scala> whatTimeIsIt("10:00")
res24: String = 10:00 is no time

Pattern 2

Tuple Pattern 

- Use tuple syntax to match and decompose tuples
- Tuple pattern is composed with other patterns like the constant pattern or variable pattern

def whatTimeIsIt( any:Any ): String = any match {
   case (x, "12:00") => s"From $x to noon"
   case _ => s"$any is no time"
}

scala> whatTimeIsIt("5")
res27: String = 5 is no time

scala> whatTimeIsIt("5","12:00")
res28: String = From 5 to noon


Constructor Pattern

- Use constructor syntax to match and decompose case classes
- The constructor pattern is  composed with other patterns
- We can build deeply nested structures

def whatTimeIsIt( any:Any ) = any match {
   case Time(12,0) => "High Noon"
   case Time(12,m) => s"It is $m past 12"
   case _ => s"$any is no time"

>whatTimeIsIt(Time(12,15))
>It is 15 minutes past 12


Sequence Pattern

Use sequence constructors or append or prepend operators to match and decompose sequences

scala> def matchSeq[A]( seq: Seq[A] ): String = seq match {
     | case Seq(1,2,3) => "1 to 3"
     | case x +: Nil => s"Only element is $x"
     | case _+:x => s"Last element is $x"
     | case Nil => "Empty sequence"
     | }
matchSeq: [A](seq: Seq[A])String

scala> matchSeq(1 to 3)
res29: String = 1 to 3

scala> matchSeq(Vector(1))
res30: String = Only element is 1

scala> matchSeq(Array(1,2,3,4))
res31: String = Last element is WrappedArray(2, 3, 4)


Pattern Alternatives

- Use | symbol to combine various pattern alternatives

def whatTimeIsIt( any:Any ): String = any match {
   case  "00:00" | "12:00" => s"Midnight or high noon"
   case _ => s"$any is no time"
}

Pattern Binders

- Use @ to bind various pattern alternatives

def whatTimeIsIt(any:Any) : String = any match {
  case time @ Time (_,0) => s"time with 0 minutes"
  case time @ Time (_,m) => s"time with $m minutes"
  case _ => s"$any is no time"
}  

Pattern Guards

- Composing patterns gives more control over matching
- If that's not enough, use the if keyword to define pattern guard

def isAfternoon(any:Any) : String = any match {
  case Time(h,m) if h>=12  => s"Yes, it is $(h-12): $m pm"
  case Time(h,m) => s"No, it is $h:$m am"
  case _ => s"$any is no time"
}  


Scala Traits

On JVM Single inheritence - Traits in Scala overcomes this limitation.

abstract class Animal

class Bird extends Animal {
    def fly: String = "I am flying"
}

class Fish extends Animal {
   def swimming: String = "I am swimming"
}

class Duck //Got stuck here

trait Swimmer {
  def swimming: String = "I am swimming"
}

- Traits may contain concrete and abstract members
- Traits are abstract and cannot have parameters
- Like classes, traits extend exactly one superclass


Mix-In composition

- The extends either creates a subclass or mixes in the first trait
- The with mixes in subsequent traits
- First keyword has to be extends regardless whether a class or a trait, then use with for further mix-ins
- Mixing in traits, means extending the superclass of the trait

class Fish extends Animal with Swimmer
class Duck extends Bird with Swimmer

trait Foo extends AnyRef
trait Bar extends AnyRef with Foo


Traits Linearization

- Scala takes the class and all of its inherited classes and mixed-in traits and puts them in a single, linear order. Therefore it is always clear what super means:



Inheritence Hierarchy

- Traits must respect the inheritance hierarchy - i.e. a class must not extend two incompatible super classes

scala> class Foo; class Bar; trait Baz extends Foo
defined class Foo
defined class Bar
defined trait Baz

scala> class Qux extends Bar
defined class Qux

scala> class Qux extends Bar with Baz
<console>:14: error: illegal inheritance; superclass Bar
 is not a subclass of the superclass Foo
 of the mixin trait Baz
       class Qux extends Bar with Baz
                                  ^

Concrete Members

- If multiple traits defined the same member, must use override




Scala Type Hierarchy

- AnyVal is for value type
- AnyRef is for reference type
- Null is a bottom type for reference types
- Nothing is the general bottom type

Processed Strings in Scala

Digression - String Interpolation

Since scala 2.10, we can define processed strings
- String starting with s embeds expression using $id or ${ }
- String starting with f formats the results of the expressions
- %02d means format the integer with 2 digits, left padding it with zeroes

scala> val n = 20
n: Int = 20

scala> s"Value = $n"
res6: String = Value = 20

scala> f"Hex Value = $n%02x"
res8: String = Hex Value = 14


Inheritance in Scala

Scala supports inheritence
- Code Reuse
- Specialization

Each class except for Any has exactly one super-class
- Non private members are inherited
- Non final members can be overridden

Subtype polymorphism
- Polymorph means having more than one types
- The Subclass type conforms to the superclass type

Subclass
Bird extends Animal

- default superclass is AnyRef
- can only extend one class

Final and Sealed Classes

- Use final to prevent a class from being extended

final class Animal

class Bird extends Animal (gives error)

Sealed classes can only be extended within the same source file
- Use it to create and algebric data type (ADT) called Tagged Union
- Example: Option with some and none

sealed class Animal

class Bird extends Animal
final class Fish extends Animal

Accessing superclass members

- Use super to extend superclass members


Uniform Access Principle
For a client it should make no difference, whether a property is implemented through storage or computation

- Properties can be implemented as def or as val

class Animal{
     def name: String = "Lion"
     val name: String = "Lion"
}

Overriding vals with def

- val is stable, but parameterless def could return different results on different calls
- We can decide to become stable and override a def with a val

class Animal {
    def name: Seq[char] = Random.shuffle("Lion".toSeq)
}

class Bird extends Animal {
    override val name: Seq[Char] = "Dog"
}
 

Lazy Vals

- All vals are initialized during object construction
- Use lazy keyword to defer initialization until first usage
- Lazy vals are not final - might show some performance drawbacks

lazy val lazyVame = {
   println("I am very lazy")
}


Wednesday, April 24, 2019

Functional Collections - Higher Order Functions

- Higher Order collections takes functions as parameters
   - Methods taking functions as arguments
   - Methods returning a function
- Higher order function provides more abstraction
- Declarative programming (tell what to do, no need on how to do it)

scala> val numbers = Vector(1,2,3)
numbers: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)

scala> numbers.map(number=>number+1)
res51: scala.collection.immutable.Vector[Int] = Vector(2, 3, 4)

Function Literals
Short notation _stands for one function parameter

scala> numbers.map((number:Int)=>number+1)
res52: scala.collection.immutable.Vector[Int] = Vector(2, 3, 4)

scala> numbers.map(number=>number+1)
res53: scala.collection.immutable.Vector[Int] = Vector(2, 3, 4)

scala> numbers.map(_+1)
res54: scala.collection.immutable.Vector[Int] = Vector(2, 3, 4)


Function Values
Scala has first class functions 

- Function values are objects
- Assign function values to variables
- Pass function values as arguments to higher order functions

scala> val addOne = (n:Int)=>n+1
addOne: Int => Int = <function1>

scala> numbers.map(addOne)
res55: scala.collection.immutable.Vector[Int] = Vector(2, 3, 4)


Function Types

If functions are objects, what are their types? REPL shows Int => Int which is syntactic sugar for function1[Int, Int]. All function types define apply method

scala> addOne(7)
res56: Int = 8


Methods and functions

Methods and functions share a lot in common but they are different. If a compiler expects a function but you give a method, method gets lifted to a function

scala> def addOne(n:Int) = n+1
addOne: (n: Int)Int

scala> val addOneFn:Int => Int = addOne
addOneFn: Int => Int = <function1>


Higher Order Function: Map

- Map transforms a collection by applying a function to each element
- Collection type remains but function type may change

scala> val languages = Vector("Scala","Java","Python")
languages: scala.collection.immutable.Vector[String] = Vector(Scala, Java, Python)

scala> languages.map(_.toLowerCase)
res59: scala.collection.immutable.Vector[String] = Vector(scala, java, python)

scala> languages.map(_.length)
res60: scala.collection.immutable.Vector[Int] = Vector(5, 4, 6)


Higher Order Function: FlatMap

- FlatMap transforms a function by applying a function to each element
- Each collection for each element is expanded into a result

scala> Seq("now is", "the time").map(_.split(" "))
res63: Seq[Array[String]] = List(Array(now, is), Array(the, time))

scala> Seq("now is", "the time").flatMap(_.split(" "))
res64: Seq[String] = List(now, is, the, time)

Higher Order Function: Filter

- The predicate is a unary (one argument) function returning a boolean value
- the filter selects elements which satisfies a predicate
- the filterNot selects elements which does not satisfy a predicate


Scala Tuples

Tuples Are
- Not collections but finite heterogeneous containers
- case classes parameterized in each field
- Standard library contains Tuple1 to Tuple22

Tuple fields are created as _1,_2 etc
Tuples 2s pairs can be created with -> operator

scala> Tuple2(1,"a")
res41: (Int, String) = (1,a)

scala> Tuple2(1,2)
res42: (Int, Int) = (1,2)

scala> (1,"a")
res44: (Int, String) = (1,a)

scala> val pair = (1,"a")
pair: (Int, String) = (1,a)

scala> pair._2
res45: String = a

scala> 1->"a"
res46: (Int, String) = (1,a)

Scala Collections

Useful operations on scala collections:

scala> Vector(1,2,3)
res0: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)

scala> Seq(1,2,3)
res1: Seq[Int] = List(1, 2, 3)

scala> Set(1,2,3)
res2: scala.collection.immutable.Set[Int] = Set(1, 2, 3)

scala> Set(1,2,"3")
res3: scala.collection.immutable.Set[Any] = Set(1, 2, 3)

Collections has a default apply method

scala> Vector(1,2,3)
res4: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)

scala> Vector.apply(1,2,3)
res5: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)

Type of collection can be passed as parameter in []

scala> Vector[Int](1,2,3)
res6: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)

scala> Vector[String]("a","b","c")
res8: scala.collection.immutable.Vector[String] = Vector(a, b, c)

scala> Vector("a","b","c")
res9: scala.collection.immutable.Vector[String] = Vector(a, b, c)

                 
Collection operations like +, toSet, toSeq, contains, head, last, tail, 

scala> val a = Seq(1,2)
a: Seq[Int] = List(1, 2)

scala> val b = Seq(3,4)
b: Seq[Int] = List(3, 4)

scala> a.toSet
res16: scala.collection.immutable.Set[Int] = Set(1, 2)

scala> Map((1,"a"),(2,"b"))
res17: scala.collection.immutable.Map[Int,String] = Map(1 -> a, 2 -> b)

scala> val map = Map((1,"a"),(2,"b"))
map: scala.collection.immutable.Map[Int,String] = Map(1 -> a, 2 -> b)

scala> map.toSet
res18: scala.collection.immutable.Set[(Int, String)] = Set((1,a), (2,b))

scala> map.toSeq
res20: Seq[(Int, String)] = ArrayBuffer((1,a), (2,b))

scala> a.contains(1)
res21: Boolean = true

scala> a.head
res22: Int = 1

scala> a.tail
res23: Seq[Int] = List(2)

Sequence

scala> val numbers = (1,2,3,4)
numbers: (Int, Int, Int, Int) = (1,2,3,4)

scala> val numbers = Seq(1,2,3,4)
numbers: Seq[Int] = List(1, 2, 3, 4)

scala> numbers(0)
res26: Int = 1

scala> numbers:+4
res27: Seq[Int] = List(1, 2, 3, 4, 4)

scala> 0+:numbers
res28: Seq[Int] = List(0, 1, 2, 3, 4)

scala> Seq(1,2,1,3).distinct
res30: Seq[Int] = List(1, 2, 3)

scala> numbers
res31: Seq[Int] = List(1, 2, 3, 4)

scala> val numers = numbers:+4
numers: Seq[Int] = List(1, 2, 3, 4, 4)

scala> numers.distinct
res33: Seq[Int] = List(1, 2, 3, 4)

scala> val numers0 = 0+:numbers:+6
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 6)

scala> val numers0 = 0+:numbers:+6+7
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 13)

scala> val numers0 = 0+:numbers:+6:+7
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 6, 7)


scala> val numers0 = 0+:numbers:+6:+7
numers0: Seq[Int] = List(0, 1, 2, 3, 4, 6, 7)

scala> numbers.+:(0)
res34: Seq[Int] = List(0, 1, 2, 3, 4)


Set has no duplicates

scala> val numbers = Set(1,2,3)
numbers: scala.collection.immutable.Set[Int] = Set(1, 2, 3)

scala> numbers+7
res35: scala.collection.immutable.Set[Int] = Set(1, 2, 3, 7)

scala> numbers(1)
res38: Boolean = true

scala> numbers contains 1
res40: Boolean = true


Maps
Collection of key value pairs with unique keys

scala> val map = Map( 1->"a", 2->"b" )
map: scala.collection.immutable.Map[Int,String] = Map(1 -> a, 2 -> b)

scala> map(1)
res47: String = a

scala> map.get(9)
res48: Option[String] = None

scala> map.getOrElse(1,"z")
res49: String = a

scala> map.getOrElse(9,"z")
res50: String = z


Scala collections come in 3 flavors (default immutable is used - without imports)
- scala.collection package (base)
- scala.collection.immutable 
- scala.collection.mutable

Exception - scala.collection.Seq (because of Arrays)
Immutable collections are persistent

Monday, January 9, 2017

Query space utilization in Vertica

SELECT /*+ label(compressed_table_size)*/
       anchor_table_schema,
     
       SUM(used_bytes) / ( 1024^3 ) AS used_compressed_gb
FROM   v_monitor.projection_storage
GROUP  BY anchor_table_schema
ORDER  BY SUM(used_bytes) DESC;


Output:
A_uat 83.9939151955768
B_test 0.3224210254848

Wednesday, January 4, 2017

Spark Fundamentals - RDD

The main abstraction Spark provides is a resilient distributed dataset (RDD)

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.


RDD is a logical reference of a dataset which is partitioned across many server machines in the cluster. RDDs are Immutable and are self recovered in case of failure.
dataset could be the data loaded externally by the user. It could be a json file, csv file or a text file with no specific data structure.
enter image description here

RDDs have the following properties –
  1. Immutability and partitioning: RDDs composed of collection of records which are partitioned. Partition is basic unit of parallelism in a RDD, and each partition is one logical division of data which is immutable and created through some transformations on existing partitions.Immutability helps to achieve consistency in computations.
    Users can define their own criteria for partitioning based on keys on which they want to join multiple datasets if needed.
  2. Coarse grained operations: Coarse grained operations are operations which are applied to all elements in datasets. For example – a map, or filter or groupBy operation which will be performed on all elements in a partition of RDD.
  3. Fault Tolerance: Since RDDs are created over a set of transformations , it logs those transformations, rather than actual data.Graph of these transformations to produce one RDD is called as Lineage Graph.
For example –
firstRDD=sc.textFile("hdfs://...")

secondRDD=firstRDD.filter(someFunction);

thirdRDD = secondRDD.map(someFunction);

result = thirdRDD.count()
In case of we lose some partition of RDD , we can replay the transformation on that partition in lineage to achieve the same computation, rather than doing data replication across multiple nodes.This characteristic is biggest benefit of RDD , because it saves a lot of efforts in data management and replication and thus achieves faster computations.
  1. Lazy evaluations: Spark computes RDDs lazily the first time they are used in an action, so that it can pipeline transformations. So , in above example RDD will be evaluated only when count() action is invoked.
  2. Persistence: Users can indicate which RDDs they will reuse and choose a storage strategy for them (e.g., in-memory storage or on Disk etc.)
These properties of RDDs make them useful for fast computations.

Monday, January 2, 2017

Setting up Spark-scala development environment

Pre-requisites:

1. Oracle Virtualbox
    Download and install from https://www.virtualbox.org/ and create a ubuntu VM

2. Java1.7 or 1.8

3. Scala 2.x.x (any version)
    Download and install scala. Setup home and PATH
    https://www.scala-lang.org/download/install.html

4. Install sbt from http://www.scala-sbt.org/0.13/docs/Using-sbt.html
    Setup home and path

5. Install and setup spark from
    http://creativelytechie.blogspot.ca/2016/07/steps-to-installing-spark.html
    Ensure version compatibility of spark with scala


5. Eclipse IDE in VM
    Install eclipse IDE (new or upgrade existing eclipse for scala) from 
    http://scala-ide.org/download/prev-stable.html



Tuesday, November 1, 2016

Spark - Broadcast Joins

In continuation to the previous post, using the same example of stations and trips,

scala> val bcStations = sc.broadcast(station.keyBy(_.id).collectAsMap)
bcStations: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,Station]] = Broadcast(19)

scala> val bcStations = sc.broadcast(station.keyBy(_.id))
 WARN MemoryStore: Not enough space to store block broadcast_20 in memory! Free memory is 252747804 bytes.
 WARN MemoryStore: Persisting block broadcast_20 to disk instead.
bcStations: org.apache.spark.broadcast.Broadcast[org.apache.spark.rdd.RDD[(Int, Station)]] = Broadcast(20)

scala> bcStations.value.take(3).foreach(println)
(83,Station(83,Mezes Park,37.491269,-122.236234,15,Redwood City,2/20/2014))
(77,Station(77,Market at Sansome,37.789625,-122.400811,27,San Francisco,8/25/2013))
(23,Station(23,San Mateo County Center,37.488501,-122.231061,15,Redwood City,8/15/2013))

Spark - aggregateByKey and groupByKey Example

Consider an example of trips and stations

Before we begin with aggregateByKey or groupByKey, lets load the data from text files, create RDDs and print duration of trips.

Create case classes for station and trip
---------
case class Trip(id:Int, duration:Int, startDate:String, startStation:String, startTerminal:String, endDate:String, endStation:String, endTerminal:String, bike:String,subscriberType:String);

case class Station(id:Int,name:String,lat:String,lon:String,docks:String,landmark:String,installDate:String)

Load and map trip to case class
------

val tripWithHeader = sc.textFile("file:////home/vm4learning/Downloads/data/trips/trip_data.csv")

val tripHeader = tripWithHeader.first()

val trips = tripWithHeader.filter(_!=tripHeader).map(_.split(","))

val trip = trips.map( t=>Trip(t(0).toInt,t(1).toInt,t(2),t(3),t(4),t(5),t(6),t(7),t(8),t(9)) )


Load and map station to case class
------

val stationsWithHeader = sc.textFile("file:////home/vm4learning/Downloads/data/stations/station_data.csv")

val stationHeader = stationsWithHeader.first()

val stations = stationsWithHeader.filter(_!=stationHeader).map(_.split(","))

val station = stations.map( s=>Station(s(0),s(1),s(2),s(3),s(4),s(5),s(6),s(7))


Calculate durationByStartStation
--------

val byStartTerminal = trip.keyBy(_.startStation)

byStartTerminal.take(2).foreach(println)
(South Van Ness at Market,Trip(4576,63,8/29/2013 14:13,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber))
(San Jose City Hall,Trip(4607,70,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber))

val durationByStart = byStartTerminal.mapValues(_.duration)
durationByStart: org.apache.spark.rdd.RDD[(String, Int)] = MappedValuesRDD[11] at mapValues at <console>:24
gru
scala> durationByStart.take(2).foreach(println)
(South Van Ness at Market,63)
(San Jose City Hall,70)

Now calculate average using groupByKey

val grouped = durationByStart.groupByKey.mapValues(list=>list.sum/list.size)

scala> grouped.take(3).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)

(San Jose Civic Center,3028)


groupByKey shuffles keys across all partitions.

aggregateByKey on the other hand uses combiner concept of MapReduce, and combines all values within a partition, and then reduces the result from multiple partitions. The combined output from all partitiones when reduced, will involve much less shuffling

scala> val results = durationByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
results: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[14] at aggregateByKey at <console>:26

scala> results.take(3).foreach(println)
(Mountain View City Hall,(2330658,1401))
(California Ave Caltrain Station,(3144492,626))
(San Jose Civic Center,(2483057,820))

scala> val finalAvg = results.mapValues(i=>i._1/i._2)

scala> finalAvg.take(3).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)


Lets see aggregateByKey  in details.
val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
That's just the zeroValue, the initial amount you will start the aggregation.

val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
This is the function (exactly as combiner in MR) to aggregate the values of each key. It keeps the running tally of sum + count so that we could calculate the averages later. It performs the reduce function in the current partition before the data is shuffled out. This reduces the unnecessary data from being transferred out.

val results = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 +
value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
This is the function to aggregate the output of the first function -- happens after the shuffle to do
the final aggregation. This adds up all the sums and counts together. A lot more efficient because
everything comes in as compact as possible.

Also, in the same example, try running and see the time difference

scala> finalAvg.take(10).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)
(Yerba Buena Center of the Arts (3rd @ Howard),970)
(SJSU 4th at San Carlos,1966)
(Commercial at Montgomery,761)
(2nd at South Park,683)
(University and Emerson,6545)
(Embarcadero at Sansome,1549)
(Townsend at 7th,746)

scala> grouped.take(10).foreach(println)
(Mountain View City Hall,1663)
(California Ave Caltrain Station,5023)
(San Jose Civic Center,3028)
(Yerba Buena Center of the Arts (3rd @ Howard),970)
(Commercial at Montgomery,761)
(SJSU 4th at San Carlos,1966)
(2nd at South Park,683)
(University and Emerson,6545)
(Embarcadero at Sansome,1549)
(Townsend at 7th,746)

Find the first trip starting at each terminal
val firstTrips = byStartTerminal.reduceByKey((a,b)=> { a.startDate>b.startDate match {case true=>a case false=>b}} )

firstTrips.take(2).foreach(println)
(Mountain View City Hall,Trip(15865,1072,9/9/2013 17:47,Mountain View City Hall,27,9/9/2013 18:05,Rengstorff Avenue / California Street,33,151,Subscriber))

(California Ave Caltrain Station,Trip(15583,824,9/9/2013 13:23,California Ave Caltrain Station,36,9/9/2013 13:37,University and Emerson,35,59,Customer))

Wednesday, July 20, 2016

Creating an Oozie coordinator job for scheduling a Shell script

Step 1: Create coordinator.properties file

oozie.use.system.libpath=true
nameNode=hdfs://<name>:8020
jobTracker=hdfs://<name>:8021
dfs.namenode.kerberos.principal=<value from hdfs-site.xml>
yarn.resourcemanager.principal=<value from hdfs-site.xml>
dfs.web.authentication.kerberos.keytab=<value from hdfs-site.xml>
dfs.web.authentication.kerberos.principal=<value from hdfs-site.xml>
nfs.kerberos.principal=<value from hdfs-site.xml>
nfs.keytab.file=/<value from hdfs-site.xml>
queueName=default
rootDirectory=<path>
oozie.coord.application.path=${nameNode}/${rootDirectory}/coordinator.xml
shellScript=<shell script.sh name>
shellScriptPath=<shell script path>
emailToAddress=<email address>
oozie.email.smtp.host=<value from oozie-site.xml>
oozie.email.smtp.port=<value from oozie-site.xml>
oozie_web=<value from oozie-site.xml>


Step 2: Create coordinator.xml file

<coordinator-app name="name" frequency="${coord:days(1)}" start="2015-07-13T20:20Z" end="2015-07-13T20:59Z" timezone="Canada/Eastern" xmlns="uri:oozie:coordinator:0.1">
   <action>
      <workflow>
         <app-path>${nameNode}/${rootDirectory}/workflow.xml</app-path>
      </workflow>
   </action>

</coordinator-app>

Step 3: Create workflow.xml

This workflow.xml file sends email on success and failure to the respective configured in coordinator.properties

<workflow-app xmlns='uri:oozie:workflow:0.3' name='shell-wf'>
    <start to='shell1' />
    <action name='shell1'>
        <shell xmlns="uri:oozie:shell-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                  <name>mapred.job.queue.name</name>
                  <value>${queueName}</value>
                </property>
            </configuration>
            <exec>${shellScript}</exec>
            <argument>D1</argument>
            <argument>D2</argument>
            <file>${shellScript}#${shellScript}</file> <!--Copy the executable to compute node's current working directory -->
        </shell>
       <ok to="sendEmail"/>
        <error to="kill-email"/>
    </action>
    <action name="sendEmail">
        <email xmlns="uri:oozie:email-action:0.1">
              <to>${emailToAddress}</to>
              <subject>Email notifications for job ${wf:id()} success</subject>
              <body>The job wf ${wf:id()} successfully completed.</body>
        </email>
        <ok to="end"/>
        <error to="end"/>
    </action>
     <action name="kill-email">
        <email xmlns="uri:oozie:email-action:0.1">
              <to>${emailToAddress}</to>
              <subject>Email notifications for job ${wf:id()} failure</subject>
              <body>The job wf ${wf:id()} Failed. More details about the job : ${oozie_web}/${wf:id()}//</body>
        </email>
        <ok to="killAction"/>
        <error to="killAction"/>
    </action>
    <kill name="killAction">
        <message>Script failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name='end' />
</workflow-app>



Saturday, July 16, 2016

Steps to Installing Spark

Check java version else install jdk

 $ sudo apt-get -y update
$ sudo apt-get -y install openjdk-7-jdk
Download pre-built spark
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-hadoop1.tgz
$ tar -zxf spark-1.1.0-bin-hadoop1.tgz
Test Installation

$ cd spark-1.1.0-bin-hadoop1
$ ./bin/spark-shell
spark> val textFile = sc.textFile("README.md")
spark> textFile.count()

Monday, May 9, 2016

Ubuntu - List groups and adding users

To list all groups in ubuntu

cat /etc/group

To view a particular user

cat /etc/group | grep [username]

To add a user to a specific group

sudo usermod -G name-of-group -a 'name-of-user'

Saturday, February 20, 2016

Compression Settings in Mapreduce

1. To read compressed input files, and the output is also compressed
   
     In the driver class, add the below:

 jobConf.setBoolean("mapred.output.compress",true)      
jobConf.setClass("mapred.output.compression.codec","GzipCodec.class","CompressionCodec.class")

    Running the program over compressed input:

    % hadoop jar MaxTempWithCompression input/tanu/input.txt.gz output

    % gunzip -c output/part-r-00000.gz
         1949    111
         1950    20

2. To compress the mapper output
 
 jonconf.setCompressMapOutput(true);
 jobConf.setMapOutputCompressorClass(GzipCodec.class);

Thursday, February 18, 2016

Designing MR Jobs to Improve Debugging

When dealing with tons of jobs each day, with tera,peta,zeta bytes of data in xmls and other files, designing our MR jobs efficiently becomes crucial to Big Data handling.

I will discuss certain ways that can help us design and develop an efficient MR job.

1. Implement Exception Handling

Most basic is to ensure implementing exception handling with all possible exceptions in the catch block, in order of most special to generic ones. Also, do not leave the catch block empty. Write proper code, that will help debugging issues. Few helpful System.err.println can be given to be viewed in Yarn logs (How to view sysouts in yarn logs) for debugging.

If you want the job to run successfully, then do not throw the exception. Rather catch it.

2. Use Counters

Consider a real world scenario where we have thousands of xml each day, and due to 1 or 2 invalid XMLs, the complete job fails.
A solution for such problem is using Counters. That can ensure how many xmls failed and on which line no, and still the job can continue successfully. The Invalid xmls can be later processed after doing the correction if needed.

Steps:
1. Create an enum in your mapper class ( outside map method)

enum InvalidXML{
     Counter;
}

2. Write you XML processing code and in catch block

catch(XMLStreamException e){
      context.setStatus("Detected Invalid Record in Xml. Check Logs);
      context.getCounter(InvalidXML.Counter).increment(1);
}

We can also print the input file name using
      String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
      context.setStatus("Detected Invalid Record in Xml " + fileName+ ". Check Logs");

3. When this jobs Runs, it will complete successfully, but in Job Logs it will display the status as below



On clicking logs, we can see


Another use of Counters is in displaying the Total no of Files Processed or Records Processed. For that, counters need to be incremented in the try block.

Counters will display files and issues for the current date (how many records failed etc), but if we need to debug issues for an older date. For that we can do the following

3. Use MultiOutputFormat in catch block to write errors to separate text files with date etc.

The MultiOutputFormat class simplifies writing output data to multiple outputs.
write(String alias, K key, V value, org.apache.hadoop.mapreduce.TaskInputOutputContext context)

In the catch block, we can write something like below
catch(SomeException e){
     MultiOutputFormat.write("XMLErrorLogs"+date, storeId,  businessDate+e.getMessage(), context);
}

The name of the text files can be ErrorLogs etc along with date etc.
The key can be your Id on which you would like to search in hive
The value can be the content you wish to search along with the errorMessage.

Once these logs are loaded into Hive, we can easily query to see, which files from which Stores are giving more issues. Or on an earlier business date, what errors we got in which files and for which stores.
Lot of relevant information can be stored and queried for valuable analysis. This can really help to debug and support huge big data application.

Hope this article will help many.

Wednesday, February 17, 2016

Sqoop - Export (Session 5)

HDFS - Sqoop -> RDBMS

Before export, prepare the database by creating the target table.

Example: Exporting zip_profits table from hive to mysql

mysql> Create table ms_zip_profits (zip INT, profit DECIMAL(8,2));
OK

% Sqoop export --connect jdbc:mysql://localhost/hadoopguide -m 1
--table ms_zip_profits --export-dir /user/hive/warehouse/zip_profits 
--input-fields-terminated-by '\t'

OK

% mysql hadoopguide -e 'select * from ms_zip_profits'

How Sqoop Export Works
Architechture of Sqoop Export is very similar to how Sqoop Import works

- Sqoop picks a strategy based on the connect string, jdbc or other
- Then generates a Java class based on target table definition 
            * This class can parse input records from text files 
            * Insert values of appropriate types into tables
            * Ability to read columns from ResultSet
- Launches mapreduce job that
            * Reads source data files from HDFS
            * Parses records using generated Java class
            * Executes the chosen JDBC strategy


Sqoop JDBC based strategy builds up Batch Insert statements, each adding multiple records to the target table.
Separate threads are used to read from HDFS and communicate with database.

Parallelism
Most MR jobs picks the degree of parallelism based on the number and size of files to process, Sqoop uses the CombineFileInputFormat to group up input files into a smaller no of map tasks

Exports
Sqoop can also export records that is not in Hive table. It can export delimited text files that are output of MapReduce jobs. It can also export records in a Sequence file to an output table though some restrictions apply.

Example of sequence file import-export

% sqoop import --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets -m 1 --class-name WidgetHolder --as-sequencefile \
> --target-dir widget_sequence_files --bindir .
...
10/07/05 17:09:13 INFO mapreduce.ImportJobBase: Retrieved 3 records.

% mysql hadoopguide
mysql> CREATE TABLE widgets2(id INT, widget_name VARCHAR(100),
-> price DOUBLE, designed DATE, version INT, notes VARCHAR(200));
Query OK, 0 rows affected (0.03 sec)

mysql> exit;

% sqoop export --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets2 -m 1 --class-name WidgetHolder \
> --jar-file widgets.jar --export-dir widget_sequence_files
...
10/07/05 17:26:44 INFO mapreduce.ExportJobBase: Exported 3 records.

More Examples of Sqoop Export

sqoop list-tables --driver com.teradata.jdbc.TeraDriver --connect connectString --username hue --password hue 

sqoop eval --driver com.teradata.jdbc.TeraDriver --connect connectString --username hue --password hue --query "delete STG.POS_TABLE all"

sqoop export --sqoop.export.records.per.statement=5000 --driver com.teradata.jdbc.TeraDriver --connect connectString --username hue --password hue --columns "TERR_CD,POS_BUSN_DT,gbal_id_nu,pos_area_typ_shrt_ds,pos_prd_dlvr_meth_cd,dypt_id_nu,dypt_ds,dy_of_cal_wk_ds,cd,digl_offr_trn_cnt_qt,digl_offr_trn_itm_cnt_qt,digl_offr_net_trn_am,digl_offr_grss_trn_am,non_digl_offr_trn_cnt_qt,non_digl_offr_trn_itm_cnt_qt,non_digl_offr_net_trn_am,non_digl_offr_grss_trn_am,tot_trn_cnt_qt,tot_trn_itm_cnt_qt,tot_net_trn_am,tot_grss_trn_am,digl_offr_tot_ord_sc_qt,non_digl_offr_tot_ord_sc_qt,tot_ord_tm_sc_qt,curn_iso_nu" 
--table STG.TABLE --input-fields-terminated-by '\t' -m 1 --export-dir /user/hive/warehouse/DLY_REC --batch 

The above query was not working with batch of 10000, reducing the number worked fine.