Deserialize Avro Kafka message in pyspark

Recently, I worked on a project to consume Kafka message and ingest into Hive using Spark Structure Streaming. I mainly used python for most of the work with data pipeline construction, and this project is not exception.

Everything moved smoothly at the beginning when launching first Spark Structure Streaming to read simple message in raw text format from Kafka cluster. The problem was rising when I tried to parse the real Kafka message serialized in Avro format.

Looking around and found that from Spark 2.4.0, it had been supporting to_avro and from_avro functions but only for Scala and Java.

Although, starting from Spark 3.0.0-preview, it natively supports pyspark, my cluster is using Spark 2.4.0 then pursuing User Define Function (UDF) is the only way.

Call java class via python function

This approach is suggested from community on Stackoverflow. It requires to attach a separate jar package during Spark submit: --packages org.apache.spark:spark-avro_2.11:2.4.0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from pyspark.sql.column import Column, _to_java_column 
from pyspark.sql.functions import col, struct

def from_avro(col, jsonFormatSchema):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
return Column(f(_to_java_column(col), jsonFormatSchema))


def to_avro(col):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
return Column(f(_to_java_column(col)))

Using fastavro as a python library

With incredible fast in term of performance, fastavro is chosen as part of deserialized the message.

As denoted in below code snippet, main Kafka message is carried in values column of kafka_df. For a demonstration purpose, I use a simple avro schema with 2 columns col1 & col2. The return of deserialize_avro UDF function is a tuple respective to number of fields described within avro schema. Then write the stream out to console for debugging purpose.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
from pyspark.sql.types import *
import io
import fastavro

def deserialize_avro(serialized_msg):
bytes_io = io.BytesIO(serialized_msg)
bytes_io.seek(0)
avro_schema = {
"type": "record",
"name": "struct",
"fields": [
{"name": "col1", "type": "long"},
{"name": "col2", "type": "string"}
]
}

deserialized_msg = fastavro.schemaless_reader(bytes_io, avro_schema)

return ( deserialized_msg["col1"],
deserialized_msg["col2"]
)

if __name__=="__main__":
spark = SparkSession \
.builder \
.appName("consume kafka message") \
.getOrCreate()

kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka01-broker:9092") \
.option("subscribe", "topic_name") \
.option("stopGracefullyOnShutdown", "true") \
.load()

df_schema = StructType([
StructField("col1", LongType(), True),
StructField("col2", StringType(), True)
])

avro_deserialize_udf = psf.udf(deserialize_avro, returnType=df_schema)
parsed_df = kafka_df.withColumn("avro", avro_deserialize_udf(psf.col("value"))).select("avro.*")

query = parsed_df.writeStream.format("console").option("truncate", "true").start()
query.awaitTermination()
Spark & Kafka docker-compose

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×