Delta Lake Series, Part 5: Performance Optimization
Making Delta Lake queries fast — OPTIMIZE, Z-ordering, data skipping with column statistics, compaction, and partitioning strategies.
The Small Files Problem
Delta Lake’s write model — every insert creates new Parquet files — leads naturally to the small files problem. A streaming job writing every 30 seconds, or a pipeline with many small partition batches, accumulates thousands of tiny files. Each file requires a separate S3 GET request, and listing thousands of files adds overhead to every query plan.
Performance optimization in Delta Lake is largely about solving this: consolidating small files, adding column-level metadata for skipping, and choosing partitioning that matches your query patterns.
OPTIMIZE: File Compaction
OPTIMIZE rewrites small files into larger ones (default target: 1 GB per file):
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3://my-bucket/tables/events")
delta_table.optimize().executeCompaction()
In SQL:
OPTIMIZE events;
-- Optimize only a specific partition
OPTIMIZE events WHERE date = '2024-01-01';
Run OPTIMIZE regularly — daily for batch tables, more frequently for high-frequency streaming tables. After OPTIMIZE, run VACUUM to remove the old small files.
Z-Ordering: Multi-Dimensional Data Skipping
Z-ordering (also called Z-order clustering) co-locates related data within Parquet files based on multiple columns simultaneously. This enables data skipping across those columns — queries that filter on Z-ordered columns skip entire files without reading them.
# OPTIMIZE + Z-order by user_id and event_type
delta_table.optimize().executeZOrderBy("user_id", "event_type")
In SQL:
OPTIMIZE events ZORDER BY (user_id, event_type);
OPTIMIZE events WHERE date >= '2024-01-01' ZORDER BY (user_id, event_type);
After Z-ordering, each Parquet file contains data that is clustered by the Z-order key. The file-level minValues and maxValues statistics stored in the transaction log then cover a narrow range of user_id and event_type values per file — so queries like WHERE user_id = 42 skip most files.
When to Z-order: columns you frequently filter on that are NOT the partition key. Good candidates: user_id, event_type, session_id, country.
Limit Z-order columns: each additional column reduces the effectiveness of skipping for others. Two to four columns is typical; beyond that, the multi-dimensional curve becomes too spread out.
Data Skipping with Column Statistics
Delta Lake automatically records min/max/null statistics for each column in each Parquet file, stored in the transaction log. This enables the query planner to skip entire files for selective filters:
File: part-00000.parquet
Stats: {
"numRecords": 131072,
"minValues": {"user_id": 1, "date": "2024-01-01"},
"maxValues": {"user_id": 500, "date": "2024-01-01"},
"nullCount": {"session_id": 0}
}
For a query WHERE user_id = 9999, Delta Lake reads these stats and skips any file where maxValues.user_id < 9999. This happens before any Parquet data is read — purely based on the transaction log metadata.
By default, Delta Lake collects stats on the first 32 columns. For wide tables, you can control which columns are indexed:
spark.sql("""
ALTER TABLE events
SET TBLPROPERTIES ('delta.dataSkippingNumIndexedCols' = 5)
""")
Or use column statistics collection only on specific columns (Delta 2.3+):
spark.sql("""
ALTER TABLE events
SET TBLPROPERTIES (
'delta.dataSkippingStatsColumns' = 'user_id,date,event_type,country'
)
""")
Partitioning Strategy
Partitioning physically separates data into subdirectories by a column’s value. A query that filters on the partition column skips entire directories — the coarsest and cheapest form of data skipping.
# Write partitioned by date
df.write \
.format("delta") \
.partitionBy("date") \
.save("s3://my-bucket/tables/events")
events/
├── date=2024-01-01/
│ └── part-00000.parquet
├── date=2024-01-02/
│ └── part-00000.parquet
└── date=2024-01-03/
└── part-00000.parquet
Good partition columns:
- Columns used in almost every query’s
WHEREclause - Low-to-medium cardinality (dates, months, regions)
- Columns that naturally segment your data lifecycle (useful for TTL or archival)
Avoid over-partitioning: partitioning by user_id or event_id (high cardinality) creates millions of tiny directories — worse than no partitioning. Rule of thumb: each partition directory should contain at least several hundred MB of data.
For high-cardinality filter columns, use Z-ordering instead of partitioning.
Liquid Clustering (Delta 3.1+)
Z-ordering must be re-applied after new data arrives — it is not automatically maintained. Liquid Clustering is Delta’s next-generation replacement: it clusters data incrementally as part of the normal OPTIMIZE process, without requiring a full table rewrite.
# Create a table with liquid clustering
spark.sql("""
CREATE TABLE events (
date DATE,
user_id BIGINT,
event_type STRING,
country STRING
) USING DELTA
CLUSTER BY (user_id, event_type)
""")
-- Run incremental clustering (replaces ZORDER)
OPTIMIZE events;
With liquid clustering, OPTIMIZE automatically clusters new files incrementally. You get the data skipping benefits of Z-ordering without needing to re-Z-order the entire table periodically.
Use liquid clustering for new tables (Delta 3.1+). Use Z-ordering for existing tables on older Delta versions.
Auto-Optimize (Databricks)
On Databricks, Auto Optimize runs compaction automatically after writes:
spark.sql("""
ALTER TABLE events
SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true', -- coalesce small files at write time
'delta.autoOptimize.autoCompact' = 'true' -- compact after writes automatically
)
""")
optimizeWrite reduces the number of files produced per write by coalescing partitions before writing. autoCompact runs a lightweight OPTIMIZE in the background after each write. Together they eliminate most of the need for scheduled OPTIMIZE jobs.
Measuring Effectiveness
Check file size distribution before and after optimization:
spark.sql("""
SELECT
count() AS num_files,
sum(size) / 1e9 AS total_gb,
avg(size) / 1e6 AS avg_file_mb,
min(size) / 1e6 AS min_file_mb,
max(size) / 1e6 AS max_file_mb
FROM (
SELECT size FROM delta.`s3://my-bucket/tables/events`
)
""")
# Or via DeltaTable
delta_table.detail().select("numFiles", "sizeInBytes").show()
Aim for average file sizes of 128 MB – 1 GB. Many files below 10 MB indicate a compaction problem.
Key Takeaways
- OPTIMIZE compacts small files into larger ones — run regularly, followed by VACUUM
- Z-ordering clusters data by multiple columns for multi-dimensional data skipping — ideal for high-cardinality filter columns
- Column statistics in the transaction log enable file-level skipping before reading any Parquet data
- Partition on low-cardinality columns queried in every WHERE clause; avoid partitioning on high-cardinality columns
- Liquid Clustering (Delta 3.1+) replaces Z-ordering with incremental maintenance — prefer it for new tables
- On Databricks,
autoOptimizeeliminates the need for scheduled compaction jobs
Next: Streaming with Delta