Recently, I worked on a project to consume Kafka message and ingest into Hive using Spark Structure Streaming. I mainly used python for most of the work with data pipeline construction, and this project is not exception.
Everything moved smoothly at the beginning when launching first Spark Structure Streaming to read simple message in raw text format from Kafka cluster. The problem was rising when I tried to parse the real Kafka message serialized in Avro format.
Looking around and found that from Spark 2.4.0, it had been supporting to_avro and from_avro functions but only for Scala and Java.
Although, starting from Spark 3.0.0-preview, it natively supports pyspark, my cluster is using Spark 2.4.0 then pursuing User Define Function (UDF) is the only way.
Call java class via python function
This approach is suggested from community on Stackoverflow. It requires to attach a separate jar package during Spark submit: --packages org.apache.spark:spark-avro_2.11:2.4.0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from pyspark.sql.column import Column, _to_java_column from pyspark.sql.functions import col, struct
With incredible fast in term of performance, fastavro is chosen as part of deserialized the message.
As denoted in below code snippet, main Kafka message is carried in values column of kafka_df. For a demonstration purpose, I use a simple avro schema with 2 columns col1 & col2. The return of deserialize_avro UDF function is a tuple respective to number of fields described within avro schema. Then write the stream out to console for debugging purpose.
Comments