Monday, April 29, 2019

SparkStreaming

Reading and Writing JSON

sparkSession.read.json(path_or_rdd)
- Uses a DataFrameReader to read json records (and other formats) from a file or RDD, infer the schema and create a DataFrame
- Can convert to DataSet[T] afterwards

mydataset.write.json
- Uses a DataFrameWriter to write a Dataset as json formatted records (or other formats)

mydataset.toJSON
- Converts the Dataset[T] to Dataset[String] of Json-formatted record strings


Handling Bad Records
- DataFrameReader prepends an extra column _corrup_record:
   - null if the records is valid json
   - json string if its invalid json

DataFrameReader and Writer also supports other formats
- csv
- jdbc
- orc
- text
- other table formats


Structured Streaming - Spark 1.X

Spark started as batch mode system, like MapReduce
- Spark streaming is a mini batch system
   - Each mini-batch of data (for each time interval) is stored in an RDD
   - DStream holds the sequence of mini-batch RDDs, and also offers window functions
   - Time interval typically 1/2 to 60+ seconds

Structured Streaming - Spark 2.X

- Started as moving away from "min-batch" model
Spark streaming is a mini batch system
   - Longer latency of mini-batches is too long for many latency sensitive applications
   - Based on Datasets rather than RDDs
   - still exprimental in Spark 2.x
   - will replace Spark Streaming once it is mature


Larger context of Spark Streaming




Spark - Streaming: Data Pipeline

- Input comes from Kafka, Flume, HDFS/S3, Kinesis and Twitter.
- Batches of input data go from Spark Streaming to Spark Engine
- Batches of processed data go to HDFS, Databases and dashboards


Checkpointing and D-streaming

- For DStreams, Checkpointing is done automatically, otherwise RDD Lineage would grow too long 
- metadata checkpoint is written for every batch
- data checkpoint is configured for DStream.checkpoint(duration) and duration must be in (n * batch_interval) for some n
- default is if batch_interval > 10 sec, checkpointing defaults to every batch
                   if batch_interval < 10 sec, then defaults to closest n, where (n  * batch_interval) > 10 sec


No comments:

Post a Comment