Delta Lake Series, Part 6: Streaming & CDC
Writing to Delta with Structured Streaming, exactly-once guarantees, reading Delta as a stream, and Change Data Feed for downstream propagation.
Delta Lake as a Streaming Sink and Source
One of Delta Lake’s most powerful properties is that the same table works for both batch and streaming workloads. A dashboard batch job and a Kafka-consuming stream processor can read and write the same Delta table concurrently — with full ACID guarantees.
Delta Lake integrates natively with Spark’s Structured Streaming API. You can write a stream to Delta and read a Delta table as a stream, treating each new commit as a micro-batch of changes.
Writing a Stream to Delta
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, LongType, TimestampType
spark = SparkSession.builder.appName("streaming-to-delta").getOrCreate()
# Define schema of incoming Kafka events
schema = StructType() \
.add("user_id", LongType()) \
.add("event_type", StringType()) \
.add("country", StringType()) \
.add("timestamp", TimestampType())
# Read from Kafka
kafka_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user-events") \
.load()
events = kafka_stream \
.select(from_json(col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Write to Delta
query = events.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3://my-bucket/checkpoints/events") \
.trigger(processingTime="30 seconds") \
.start("s3://my-bucket/tables/events")
query.awaitTermination()
The checkpoint location is critical: Structured Streaming stores its progress (Kafka offsets and Delta commit versions) in this directory. On restart, the stream resumes exactly from where it left off.
Exactly-Once Writes
Delta Lake + Structured Streaming provides end-to-end exactly-once guarantees, even across failures:
- Structured Streaming tracks the Kafka offset of each micro-batch in the checkpoint
- Delta Lake writes each micro-batch as an atomic commit
- If the job fails mid-write, the partial Parquet files are not referenced in the transaction log — they are invisible
- On restart, the stream re-reads from the last committed Kafka offset and re-writes the micro-batch as a new Delta commit
Delta Lake uses idempotent writes to handle the case where a micro-batch was written to Parquet but the Delta commit failed: it checks whether the transaction ID already exists in the log and skips the write if so.
Enable idempotent writes explicitly:
query = events.writeStream \
.format("delta") \
.option("checkpointLocation", "s3://...") \
.option("txnAppId", "kafka-to-delta-events") \
.option("txnVersion", "1") \
.start("s3://my-bucket/tables/events")
Merge Streaming (Upsert Stream)
For streaming CDC — where events represent inserts, updates, and deletes — use foreachBatch to apply a MERGE on each micro-batch:
from delta.tables import DeltaTable
def upsert_to_delta(micro_batch_df, batch_id):
target = DeltaTable.forPath(spark, "s3://my-bucket/tables/users")
target.alias("t").merge(
micro_batch_df.alias("s"),
"t.user_id = s.user_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.whenNotMatchedBySourceDelete() \ # delete rows not in source
.execute()
query = cdc_stream.writeStream \
.foreachBatch(upsert_to_delta) \
.option("checkpointLocation", "s3://my-bucket/checkpoints/users") \
.trigger(processingTime="1 minute") \
.start()
foreachBatch passes each micro-batch as a regular DataFrame to your function. This gives full control over the write logic while preserving exactly-once semantics through the checkpoint.
Reading Delta as a Stream
A Delta table is itself a valid streaming source. Each new commit to the table is treated as a micro-batch:
# Read new rows as they are appended to the Delta table
stream = spark.readStream \
.format("delta") \
.load("s3://my-bucket/tables/events")
# Process and write downstream
stream.writeStream \
.format("delta") \
.option("checkpointLocation", "s3://my-bucket/checkpoints/enriched") \
.start("s3://my-bucket/tables/enriched-events")
This enables Delta-to-Delta pipelines: a chain of streaming jobs where each stage reads from an upstream Delta table and writes to a downstream one. Every stage is independently restartable from its checkpoint.
Control how much history to start from:
spark.readStream \
.format("delta") \
.option("startingVersion", 10) \ # start from version 10
.load("s3://my-bucket/tables/events")
spark.readStream \
.format("delta") \
.option("startingTimestamp", "2024-01-01") \ # start from this timestamp
.load("s3://my-bucket/tables/events")
Change Data Feed (CDF) Streaming
The standard Delta stream source emits all rows in new commits (both inserts and rewrites from UPDATE/MERGE). Change Data Feed is more precise: it emits only the actual changed rows with their change type (insert, update_preimage, update_postimage, delete).
Enable CDF on the source table:
ALTER TABLE events SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
Read it as a stream:
cdf_stream = spark.readStream \
.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", 0) \
.load("s3://my-bucket/tables/events")
# Each row includes: _change_type, _commit_version, _commit_timestamp
cdf_stream.select("_change_type", "user_id", "country", "_commit_version").show()
CDF is the foundation for:
- Propagating changes downstream: replicate only changed rows to Elasticsearch, a cache, or another Delta table
- Audit logs: stream every row-level change to an audit table
- Incremental ML feature computation: recompute features only for changed entities
Streaming Triggers
Control how frequently the stream processes data:
# Process every 30 seconds (continuous micro-batching)
.trigger(processingTime="30 seconds")
# Process once and stop (useful for scheduled batch-style runs)
.trigger(once=True)
# Process all available data, then stop (Delta 2.0+, more efficient than once=True)
.trigger(availableNow=True)
# Continuous processing (experimental, sub-second latency)
.trigger(continuous="1 second")
availableNow=True is useful for migrating batch pipelines to incremental: it processes all new data since the last checkpoint and exits, behaving like a scheduled batch job but with streaming’s incremental semantics.
Compacting Streaming Tables
Streaming writes produce many small files (one or a few files per micro-batch). Run OPTIMIZE regularly to compact them:
# Schedule this as a separate job, separate from the streaming job
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3://my-bucket/tables/events")
delta_table.optimize().executeZOrderBy("user_id", "event_type")
On Databricks, enable autoCompact to compact automatically:
ALTER TABLE events SET TBLPROPERTIES ('delta.autoOptimize.autoCompact' = 'true');
Key Takeaways
- Delta Lake integrates natively with Structured Streaming — write streams as Delta commits and read Delta as a stream
- Exactly-once is guaranteed by combining Structured Streaming’s checkpoint (Kafka offset tracking) with Delta’s atomic commits and idempotent writes
- Use
foreachBatch+ MERGE for streaming CDC upserts — full control over write semantics with exactly-once checkpoint guarantees - Change Data Feed exposes row-level
insert,update, anddeleteevents — ideal for downstream propagation and audit logs availableNow=Truetrigger runs incremental batch-style processing — a clean middle ground between streaming and batch- Stream tables accumulate small files — compact with OPTIMIZE or enable
autoCompact
That wraps the Delta Lake Series. You now have the foundation to build reliable, governed, high-performance data lakes: from understanding why plain Parquet fails at scale, through ACID transactions and schema enforcement, to time travel for auditability, performance tuning with Z-ordering and compaction, and unified batch-streaming pipelines with Structured Streaming and Change Data Feed.