Structured Streaming: Essentials

Arun Jijo
DataKare Solutions
Published in
4 min readMar 3, 2019

--

This is the second chapter under the series “Structured Streaming” which center around covering all the essential details to set up a Structured Streaming query. Peruse the previous chapter here for getting introduced to Structured Streaming.

Sources

Sources in Structured Streaming refers to the streaming data sources which brings data into Structured Streaming. As of spark 2.4 the built in data sources are as follows,

  • Kafka

Kafka source reads data from Kafka brokers and it is compatible with Kafka broker versions 0.10.0 or higher versions. Follow this link which focuses on Spark-Kafka integration for more details.

  • File source

File source reads the files as a streams of data from a directory. For ex, reading log files from the HDFS directory which was collected using flume at regular intervals.

We will leave all the nitty-gritty details of file sources and how to use it in production for later parts of this series.

  • Socket source

To read the text input from a socket connection. Socket source should be used only for testing purpose.

Schema: Socket source has two schemas,

1. Contains only one column named “value” and of string type.

2. Contains two columns named “value” and “timestamp” of which the former is of String type and the latter is of Timestamp type.

val socketDF = spark.readStream.format("socket")
.option("host", "localhost")
.option("port", 9999)
//set to false or ignore this parameter to set the schema to type 1
.option("includeTimestamp",true)
.load()
  • Rate source

Rate source is likewise utilized for testing, yet here the rate source will naturally produce some example data for testing.

Schema: Rate source has two columns named “value” of long type and “timestamp” of Timestamp type.

val socketDF = spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()

Schema Inference

For file sources, it is recommended to provide the schema so that the schema will be consistent. To automatically infer the schema, it is necessary to set the configuration parameter “spark.sql.streaming.schemaInference” to true which by default is false.

For sources like Kafka, Socket, Rate the schema is fixed and won’t possible to change.

Sinks

Structured Streaming uses sinks to add the output of each batch to a destination. Spark is shipped with the following sinks as of spark 2.4,

  • File sinks

To write the output to a file in a directory. The supported file formats are ORC, JSON, CSV, Parquet.

writeStream
.format("json")
.option("path", "destinationdir")
.start()
  • Kafka sinks

To publish the output to Kafka topics.

writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "updates")
.start()
  • Foreach and Forechbatch sinks

Both sinks are used to run some arbitrary computations on the records. Both have their own merits. More details of each sink will be covered later in this series.

  • Memory sinks

The memory sink is designed for debugging purpose, the complete output will be stored in memory. So, it is recommended to use this sink in a low volume of data.

writeStream
.format("memory")
.queryName("tableName")
.start()
  • Console sinks

As like memory sink console sink is also designed for debugging and stores the data will be stored in memory. Instead of storing the data as in-memory table it will print it to the console.

writeStream
.format("console")
.start()

By implementing StreamSinkProvider, it is possible to develop custom sinks.

Output Modes

Output mode defines how to write out the data to the sink. Like if it just needs to add the new information (rows) or to update the rows with new information. Spark supports three output modes which needs to be specified at the time of setup the sinks, if not the default is append.

  • Append

Append mode writes only the new rows to the output table since the last trigger.

  • Update

Update mode writes only the rows which have been updated since the last trigger. If the query doesn’t contain any type of aggregations, it behaves like append mode.

  • Complete

Complete mode writes out all the output rows to the resultant table.

Triggers

Triggers defines how often structured streaming should check for new data in the source and updates the result.

· Fixed interval: The query will be executed in a micro batch mode. However, if a certain query hasn’t been processed within the interval, the streaming engine will start processing the next batch only after current batch is finished

· One-Time-Micro-Batch: The query will be executed only once and then it will shut down by itself. It is useful if it is necessary to process the data after a long interval, like schedule the application to run only two times in a day to avoid unnecessary cost.

· Continuous Processing: To achieve low latency as low as ~1 ms. But it is in the experimental stage right now, not recommended in production.

· Default: By default if any type of triggers is not explicitly specified, the streaming engine will look for the new data as soon as the previous batches continue.

df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()

In the next chapter of this series, we will dive into fault-tolerance semantics and how it is achieved in Structured Streaming.

If you have any queries feel free to contact me on twitter and follow data kare solutions for more Big data articles.

--

--

Arun Jijo
DataKare Solutions

Data engineer at DataKare Solutions who gained expertise at Apache Nifi, Kafka, Spark and passionate in Java.