Structured Streaming: Kafka integration

Arun Jijo
DataKare Solutions
Published in
5 min readFeb 10, 2019

--

This article focuses on explaining how to integrate Spark’s new stream processing engine Structured Streaming with Apache Kafka brokers 0.10 and higher along with all necessary configuration details.

Apache Kafka

Apache Kafka is a distributed reliable, fault tolerant publish-subscribe messaging system. Kafka works on top of two major primitives producer and subscriber clients. Kafka stores the data as topics, for parallelism the topics are divided into partitions.

Producer client publishes the messages to certain topics which are in turn consumed by one or more consumer groups. As records are published to Kafka topics, Kafka assigns a sequential “ID” to each record known as offset. The published records will be retained in the Kafka cluster for a certain period of time irrespective of whether the record is consumed or not. All of these features make Kafka an excellent candidate for real-time streaming applications.

Kafka integration in Structured Streaming

Structured Streaming is shipped with both Kafka source and Kafka Sink. I.e. It is possible to publish and consume messages from Kafka brokers without hassle.

Let’s dive into the Kafka source with an example.

Adding Dependency

Add the Structured Streaming dependencies for Kafka for developing the application. For maven/sbt projects use the following definitions.

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.4.0//spark version

Reading Records from Kafka topics

For creating Kafka Source Internally Structured Streaming uses ‘KafkaSourceProvider’ which requires Subscribe and bootstrap server configuration properties to initiate the Kafka source.

# Construct a streaming DataFrame that reads from topic person
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "person")
.option("startingOffsets", "earliest")
.load()

The example mentioned above creates an unbounded Dataframe subscribed to topic named “person”. It's also possible to set the topic name by using a regex pattern. In that case the configuration parameter “subscribe” needs to be replaced by “subscribePattern”. Structured streaming also provide a way to consume only certain partitions of a topic by using “assign” parameter, to do so, the value should be of the type JSON format and it is possible to subscribe to more than one format.

Specifying a topic name along with the partitions to consume.

option("assign", "{"topicA":[0,1],"topicB":[2,4]}")

‘kafka.bootstrap.servers’ configuration parameter points to the location of the Kafka bootstrap servers. It should be in ‘hostname:port’ format and should be comma separated if there is more than one server.

An important note to spark developers who used DStreams is that option ‘auto.offset.reset’ is not supported in Structured Streaming. Instead of that configuration parameter ‘startingOffsets’ needs to be set to ‘latest’ or ‘earliest’. If this parameter is not configured structured streaming will use the default value which is ‘latest’. ‘startingOffsets’ also allows to explicitly configure the offsets and partitions to consume from.

The configuration format should be of JSON type,

option("startingOffsets","""{"person":{"0":23,"1":-2},"cusomer": {"0":-1}}""")

In the above mentioned JSON string denotes two topic names ‘person’ and ‘customer’ along with their partitions and offsets. Here ‘-2’ denotes earliest and ‘-1’ denotes latest.

And another thing to note is this parameter will only take into effect only when the streaming query is started and if the query is stopped spark will consume from the offset where it left off once restarted.

Other Optional Configurations

  • maxOffsetsPerTrigger :- To specify the amount of offsets to processed per trigger interval .
  • kafkaConsumer.pollTimeoutMs :- Timeout to wait to poll data from Kafka to executors. Value should be specified in milliseconds.
  • fetchOffset.numRetries :- Number of times to retry fetching Kafka offset before giving up.
  • fetchOffset.retryIntervalMs :- Intervel between each retry to fetch Kafka offsets in milliseconds.

Schema

The Kafka source has fixed schema which covers all the necessary details required for traceability. Let’s take a closer look at the schema of our streaming dataframe which we have created earlier.

root|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

The schema consists of all the metadata associated with a Kafka message. As we know the messages in the Kafka clusters will be stored in serialized form, the data type of the ‘value’ field is binary which holds the actual value. Structured streaming doesn’t have any inbuilt deserializers even for the common formats like string and integer. For deserializing the data we need to rely on spark SQL functions. If one is not available for your particular data type, then there is no other option than developing a custom deserializer.

Example:

For this example, we will be reusing the Streaming dataframe which we have created earlier and was subscribed to the topic ‘person’. Each record under topic ‘person’ is of the format,

{“name”:”xyz”,”age”:12,”address”:”xyz”}

Create the schema for our data

val personSchema:StructType = new StructType()
.add("name", DataTypes.StringType)
.add("age", DataTypes.IntegerType)
.add("address", DataTypes.StringType)

Deserializing value

To convert the bytes back to String, Spark SQL contains a built in function named ‘CAST’

//casting from binary to String
val castedDf = df.selectExpr("CAST(value AS STRING)")

At this point the dataframe looks like,

+--------------------+
| value|
+--------------------+
|{"name":"xyz”,”age..|
+--------------------+

Spark SQL has some amazing built in functions to play with JSON data. To deserialize JSON data, it is recommended to use from_json function and to serialize use to_json function.

val jsonDf =  castedDf.select(from_json($"value",personSchema).as("values"))

Now if we look at the schema, it is found that the value field is of type struct.

root
|-- values: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)
| |-- address: string (nullable = true)

It takes just a select query to flatten the data.

val flattenedJsonDf = jsonDf.select($"values.*")

After flattening the schema looks like,

root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- address: string (nullable = true)

Printing data frame to console

flattenedJsonDf.writeStream
.format("console")
.option("truncate","false")
.option("checkpointLocation", "checkpoint/directory").
.trigger(Trigger.ProcessingTime("10 second"))
.start()
.awaitTermination()

Create a Kafka Batch Query

Spark also provides a feature to fetch the data from Kafka in batch mode. In batch mode Spark will consume all the messages at once. Kafka in batch mode requires two important parameters Starting offsets and ending offsets, if not specified spark will consider the default configuration which is,

  • startingOffsets — earliest
  • endingOffsets — latest
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "person,customer")
.option("startingOffsets", """{"person":{"0":50,"1":-2},"customer":{"0":-2}}""")
.option("endingOffsets", """{"person":{"0":1000,"1":-1},"customer":{"0":-1}}""")
.load()

An important thing to note is ‘endingOffsets’ will be only applicable to Kafka batch query and not for streaming query.

If you have any queries please feel free to contact me on twitter and follow data kare solutions for more Big data articles.

--

--

Arun Jijo
DataKare Solutions

Data engineer at DataKare Solutions who gained expertise at Apache Nifi, Kafka, Spark and passionate in Java.