Spark Streaming Series, Part 2: Sources and Sinks
Reading from Kafka and files, writing to Delta Lake and databases — the connectors that power real-time pipelines.
The Source-Processor-Sink Pattern
A streaming pipeline has three parts:
- Source — where data comes from (Kafka, files, a rate generator)
- Processor — your Spark logic (filters, aggregations, joins)
- Sink — where results go (Delta Lake, Kafka, databases)
This part covers sources and sinks. The processor is just Spark SQL — you already know that.
Sources: Reading Streams
Kafka Source
Kafka is the standard source for streaming data. Every Kafka topic is an unbounded stream of messages.
events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "user-events,page-views") \
.option("startingOffsets", "latest") \
.load()
The Kafka source delivers each message as:
- key (bytes)
- value (bytes)
- topic (string)
- partition (int)
- offset (long)
- timestamp (long)
Key Options
subscribe vs assign: Subscribe to topics by name (most common). Assign to specific partitions (rare, for advanced control).
# Subscribe to topics (consumer group manages offsets)
.option("subscribe", "topic1,topic2")
# Assign to specific partitions (manual offset management)
.option("assign", '{"topic1":[0,1], "topic2":[2]}')
startingOffsets: Where to begin when the job starts.
.option("startingOffsets", "latest") # from the end (newest data)
.option("startingOffsets", "earliest") # from the beginning (reprocess all history)
.option("startingOffsets", '{"topic1":{"0":100}}') # specific offset per partition
failOnDataLoss: Crash if offsets are lost (brokers overflowed old data).
.option("failOnDataLoss", "false") # ignore data loss and continue (risky)
Parsing Kafka Messages
Kafka messages are raw bytes. You must parse them:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
from pyspark.sql import functions as F
schema = StructType([
StructField("user_id", LongType()),
StructField("event_type", StringType()),
StructField("value", DoubleType()),
StructField("timestamp", LongType()),
])
# Kafka delivers value as bytes; cast to string, parse JSON
events = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "events") \
.load()
parsed = events.select(
F.from_json(F.col("value").cast("string"), schema).alias("data")
).select("data.*")
For other formats (Avro, Protobuf), Spark has dedicated functions. JSON is most common.
File Source
Read new files as they arrive in a directory:
files = spark.readStream \
.format("parquet") \
.option("path", "/data/incoming") \
.option("maxFilesPerTrigger", 1) \
.load()
The file source watches a directory and processes new files in each micro-batch. Useful for data dumps from external systems.
maxFilesPerTrigger: How many files to read per batch. Smaller = lower latency, more batches.
Rate Source (Testing)
Generate synthetic events at a fixed rate:
events = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 100) \
.option("numPartitions", 4) \
.load()
Generates rowsPerSecond events, each with a timestamp and a sequential id. Perfect for testing without Kafka.
Sinks: Writing Results
Console Sink (Testing)
Write to stdout:
result.writeStream \
.format("console") \
.outputMode("append") \
.start()
Useful for debugging. Shows results as they are computed.
Memory Sink (Testing)
Store results in memory:
result.writeStream \
.format("memory") \
.outputMode("append") \
.queryName("my_result") \
.start()
# Later, query the in-memory table
spark.sql("SELECT * FROM my_result").show()
Good for unit tests. The table persists across queries.
Kafka Sink (Event Streaming)
Write results back to Kafka:
result.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("topic", "results") \
.option("checkpointLocation", "s3://bucket/checkpoint/") \
.start()
The value column is serialized to JSON and sent. The key column (if present) sets the message key.
result_keyed = result.select(
F.col("user_id").cast("string").alias("key"),
F.to_json(F.struct("*")).alias("value")
)
result_keyed.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("topic", "results") \
.option("checkpointLocation", "s3://bucket/checkpoint/") \
.start()
Delta Lake Sink (Production)
Write to a Delta table:
result.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3://bucket/checkpoint/") \
.mode("append") \
.option("path", "s3://bucket/data/results") \
.start()
Or using a named table:
result.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3://bucket/checkpoint/") \
.toTable("results") # creates or appends to the table
Delta Lake is the production standard for Spark. It provides:
- ACID transactions — updates are atomic
- Time travel — query historical snapshots
- Schema enforcement — reject incompatible writes
- Compaction — automatic and via OPTIMIZE
- Cheap deletes — mark rows deleted without rewriting
Always use Delta for production sinks.
JDBC Sink (Databases)
Write to a SQL database via JDBC. Structured Streaming does not have native JDBC sink support. Use foreach:
def write_to_db(df, epoch_id):
df.write.format("jdbc") \
.option("url", "jdbc:postgresql://localhost/mydb") \
.option("dbtable", "results") \
.option("user", "postgres") \
.option("password", "...") \
.mode("append") \
.save()
result.writeStream \
.foreachBatch(write_to_db) \
.outputMode("append") \
.start()
The .foreachBatch() sink calls your function for each micro-batch as a DataFrame. Inside, you can use any batch write logic.
Foreach Sink (Custom Logic)
For custom processing (HTTP calls, logging, caching):
def process_row(row):
# Send to an API, log, etc.
print(f"Processing: {row}")
result.writeStream \
.foreach(process_row) \
.outputMode("append") \
.start()
This calls process_row for each individual row. Useful but slower than batch-oriented sinks.
Complete Example: Kafka to Delta Lake
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, LongType, StringType
spark = SparkSession.builder.appName("KafkaToDeltalake").getOrCreate()
schema = StructType([
StructField("user_id", LongType()),
StructField("event_type", StringType()),
])
# Read from Kafka
events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON
parsed = events.select(
F.from_json(F.col("value").cast("string"), schema).alias("data")
).select("data.*")
# Transform
enriched = parsed \
.withColumn("ingestion_time", F.current_timestamp()) \
.filter(F.col("event_type") != "test")
# Write to Delta Lake
query = enriched.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3://bucket/ck/events/") \
.option("path", "s3://bucket/data/events/") \
.trigger(processingTime="30 seconds") \
.start()
query.awaitTermination()
Run this and every 30 seconds, new events from Kafka are parsed, transformed, and written to Delta Lake with exactly-once semantics.
Key Takeaways
- Kafka is the standard unbounded source; configure with
subscribe,startingOffsets - Kafka messages are bytes; parse with
from_json(value.cast("string"), schema) - Sources: Kafka, files, rate (for testing)
- Sinks: Delta Lake (production), Kafka (streaming), console/memory (testing), JDBC (via foreachBatch)
- Delta Lake provides ACID, time travel, and schema enforcement — use it for production
- Check for checkpoint locations; every writeStream must have one
Next: Time Windows