Why this matters
Transform steps are often the most expensive part of ETL. Poorly designed joins, window functions, and UDFs can multiply costs and break SLAs. As an ETL Developer, you routinely:
- Aggregate billions of rows on schedules with tight cutoffs.
- Join wide dimension tables to hot events data without creating skew.
- Convert row-heavy intermediate outputs into compact analytics-ready datasets.
Optimizing transforms reduces runtime, compute spend, and failure rates, and makes pipelines predictable and scalable.
Concept explained simply
Think of your transform as a factory line. Every extra item on the belt (rows), every unnecessary tool (columns), and every re-routing (shuffles) slows production. Optimization is about reducing the items, using faster tools, and minimizing re-routing.
Mental model: The 6 performance levers
- Rows: Filter early; process only what you need.
- Columns: Select only needed fields (projection pruning).
- Algorithm: Choose efficient joins/aggregations and correct window frames.
- Execution placement: Push computation to the most efficient engine (DB pushdown vs. app layer).
- Data movement: Minimize shuffles/redistribution by partitioning and smart join strategies.
- Formats & types: Use columnar formats (like Parquet) and correct data types to reduce CPU/IO.
Deep dive: Join strategies and skew
- Prefer broadcast join when one side is small enough to fit in executor memory.
- Pre-aggregate before join to reduce row count.
- Use partitioning keys that match join keys to reduce shuffle.
- Handle skew by salting keys or using skew hints when supported.
Deep dive: Pushdown and avoiding UDF bottlenecks
- Push filters and simple computations to the source (SQL engine) when it is faster and cheaper.
- Replace row-by-row UDFs with built-in vectorized functions wherever possible.
- When UDFs are unavoidable, batch inputs and minimize Python/Java crossing overhead.
Worked examples
Example 1: SQL transform — early filter + minimal columns + correct window
Before (slow)
SELECT u.user_id, u.country, e.event_time,
ROW_NUMBER() OVER (PARTITION BY u.user_id ORDER BY e.event_time) rn
FROM events e
JOIN users u ON e.user_id = u.user_id
WHERE e.event_date BETWEEN DATE '2024-01-01' AND DATE '2024-01-31';
After (faster)
WITH e_filtered AS (
SELECT user_id, event_time, event_date
FROM events
WHERE event_date BETWEEN DATE '2024-01-01' AND DATE '2024-01-31'
),
proj_users AS (
SELECT user_id, country
FROM users
)
SELECT u.user_id, u.country, e.event_time,
ROW_NUMBER() OVER (PARTITION BY u.user_id ORDER BY e.event_time) AS rn
FROM e_filtered e
JOIN proj_users u USING (user_id);
- We filtered early and projected only needed columns to reduce scanned data.
- We used USING to avoid duplicate key columns and reduce width.
- Window function remains but operates on fewer rows.
Example 2: Spark transform — broadcast and partition-aware aggregation
Before (slow)
val fact = spark.table("clicks")
val dim = spark.table("campaigns")
val joined = fact.join(dim, Seq("campaign_id"))
val out = joined.groupBy("campaign_id").count()
out.write.mode("overwrite").parquet("/tmp/out")
After (faster)
val fact = spark.table("clicks").select("campaign_id")
val dim = spark.table("campaigns").select("campaign_id")
val joined = fact.join(broadcast(dim), Seq("campaign_id"))
val prepart = joined.repartition(col("campaign_id"))
val out = prepart.groupBy("campaign_id").count()
out.write.mode("overwrite").parquet("/tmp/out")
- Broadcast small dimension to avoid large shuffles.
- Repartition on the aggregation key to reduce skew and improve parallelism.
- Project only required columns before join.
Example 3: Pandas/SQL mix — vectorization + chunking + columnar output
Before (slow)
# reads all, applies Python loops
for idx, row in df.iterrows():
df.loc[idx, 'is_high'] = 1 if row['amount'] > 10000 else 0
# then writes CSV
df.to_csv('out.csv', index=False)
After (faster)
# vectorized boolean
df['is_high'] = (df['amount'] > 10000).astype('int8')
# optional: process in chunks if input is huge
# for chunk in pd.read_sql(query, conn, chunksize=1_000_000):
# chunk['is_high'] = (chunk['amount'] > 10000).astype('int8')
# chunk.to_parquet('out_dir', partition_cols=['is_high'])
# write columnar output
df.to_parquet('out_dir', partition_cols=['is_high'])
- Vectorized operations eliminate Python loops.
- Chunking keeps memory stable on large inputs.
- Parquet reduces size and speeds downstream reads.
Step-by-step tuning checklist
- Define the minimal rows and time range needed; filter as early as possible.
- Select only columns used downstream.
- Choose the right join strategy (broadcast vs shuffle; semi/anti join when appropriate).
- Aggregate or pre-aggregate before joins to shrink data.
- Ensure partitioning matches join/group keys where beneficial.
- Replace UDFs with built-in functions; push down filters/computations to the database when faster.
- Prefer columnar formats and correct data types; avoid unnecessary casts.
- Cache only when reused; unpersist after use.
- Measure: capture row counts, shuffle sizes, and runtime per step.
Exercises
These mirror the tasks below. Try them before opening the solutions. Expected outputs describe the target, not exact syntax.
Exercise 1 — Optimize a SQL transform
Rewrite the query to reduce scanned data and speed the window computation.
-- Current
SELECT u.user_id, u.signup_date, e.event_time,
COUNT(*) OVER (PARTITION BY u.user_id ORDER BY e.event_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS c
FROM events e
JOIN users u ON e.user_id = u.user_id
WHERE e.event_date >= DATE '2024-03-01' AND e.event_date < DATE '2024-04-01';
Goal: Early filter, minimal columns, correct frame, no unnecessary wide rows.
Exercise 2 — Tune a Spark transform
Improve this job to avoid unnecessary shuffles and Python UDF overhead.
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
def is_big(x):
return 1 if x and x > 10000 else 0
udf_is_big = F.udf(is_big, IntegerType())
sales = spark.table("sales") # columns: sale_id, country_id, amount
countries = spark.table("countries") # columns: country_id, name
out = sales.join(countries, "country_id") \
.withColumn("is_big", udf_is_big(F.col("amount"))) \
.groupBy("country_id").agg(F.sum("is_big").alias("big_sales"))
out.write.mode("overwrite").parquet("/tmp/sales_summary")
Exercise checklist
- Did you push filters and projections before joins?
- Did you choose an efficient join strategy (broadcast/semi/anti)?
- Did you replace UDFs with built-ins where possible?
- Did you minimize shuffles via partitioning or pre-aggregation?
Common mistakes and self-check
- Mistake: Filtering after joins. Self-check: Are you joining far more rows than needed? Move filters into subqueries/CTEs.
- Mistake: Using UDFs for simple logic. Self-check: Can this be expressed with built-in functions?
- Mistake: Grouping before join keys are aligned. Self-check: Does partitioning match your group/join keys when needed?
- Mistake: Caching everything. Self-check: Is the dataset reused multiple times? If not, avoid caching.
- Mistake: Overly wide selects. Self-check: Are you selecting columns that are never used downstream?
- Mistake: Unbounded window frames by accident. Self-check: Is your window frame the minimal frame that answers the question?
Practical projects
- Retrofit an existing pipeline: Profile row counts and shuffles per step; apply at least three optimizations; document runtime before/after.
- Build a dimension enrichment job: Broadcast-join a small dimension to a large fact; measure performance impact of projection pruning.
- Create an incremental transform: Replace a full refresh with a daily delta ingest and merge; verify data correctness with row-level checks.
Who this is for
- ETL Developers who manage SQL, Spark, or Python-based pipelines.
- Data Engineers aiming to cut costs and improve SLAs on batch jobs.
- Analytic Engineers optimizing warehouse transforms.
Prerequisites
- Comfort with SQL joins, aggregations, and window functions.
- Basic understanding of distributed processing (e.g., Spark concepts of partitioning and shuffle).
- Familiarity with data formats (CSV vs Parquet) and data types.
Learning path
- Master join and window patterns in SQL with attention to filters and frames.
- Learn partitioning, broadcast joins, and avoiding UDFs in Spark.
- Practice profiling: capture input/output row counts and shuffle sizes per step.
- Adopt incremental patterns to reduce processed data volume.
Next steps
- Complete the exercises and run the Quick Test.
- Apply at least two optimizations to a real or sample pipeline and record improvements.
- Share before/after metrics with your team to standardize best practices.
Mini challenge
You have a daily job joining a 500M-row events table to a 50k-row dimensions table and then computing a 7-day rolling count per user. The job runs for 2 hours. List three specific changes you would try first to cut runtime by 50%, and explain why each would help based on the six levers.
Quick Test note: The test is available to everyone; only logged-in users see saved progress.