Why Data Pipelines matter for ML Engineers
Models are only as good as the data that feeds them. As a Machine Learning Engineer, you’ll build and operate pipelines that collect, clean, transform, and deliver data for training, evaluation, and real-time inference. Strong pipeline skills unlock reliable features, reproducible experiments, faster iteration, and dependable production systems.
- Turn raw logs and events into high-quality features.
- Keep models fresh with incremental loads and streaming updates.
- Prevent data leaks and training-serving skew.
- Automate, backfill, and document everything for repeatability.
Who this is for
- ML/AI practitioners moving from notebooks to production.
- Data scientists who need reliable training datasets.
- Software/data engineers supporting ML workloads.
Prerequisites
- Comfortable with Python and basic SQL.
- Familiar with data formats (CSV, JSON, Parquet) and Git.
- Basic ML workflow knowledge (train/validate/test, features/labels).
What you’ll be able to do
- Design batch and streaming pipelines for ML training and serving.
- Implement ETL/ELT for features and labels with validation.
- Run CDC and incremental loads with idempotency and backfills.
- Orchestrate jobs on a schedule with parameters and retries.
- Manage schema changes without breaking downstream jobs.
- Build point-in-time correct training datasets and feature pipelines.
- Track lineage and document your pipelines clearly.
Learning path (practical roadmap)
- Foundations: Batch vs streaming, ETL vs ELT, file formats, partitioning.
- Data correctness: Validation rules, deduplication, null handling, constraints.
- Incrementality: CDC, watermarks, MERGE/UPSERT, exactly-once via idempotency keys.
- Orchestration: Parameterized jobs, retries, alerts, backfills.
- Feature pipelines: Point-in-time joins, windowed aggregates, avoiding leakage.
- Operations: Schema evolution, lineage, documentation, cost/performance tuning.
Design trade-offs to consider
- Batch vs streaming: latency vs cost/complexity.
- ETL vs ELT: central compute vs source/warehouse constraints.
- Storage: row vs columnar; partition depth vs small-file problem.
- Orchestration: time-based vs event-driven triggers.
Worked examples
Example 1: Batch ETL with validation and incremental load
Goal: Ingest daily events CSV to Parquet, validate, deduplicate, and load incrementally by date.
import os, glob, pandas as pd
from datetime import datetime
RUN_DATE = os.environ.get("RUN_DATE", "2025-01-01") # yyyy-mm-dd
src_pattern = f"/data/raw/events/date={RUN_DATE}/*.csv"
out_path = f"/data/curated/events/date={RUN_DATE}/part-000.parquet"
# Load
files = glob.glob(src_pattern)
df = pd.concat((pd.read_csv(p) for p in files), ignore_index=True) if files else pd.DataFrame()
# Validate
assert set(["event_id","user_id","event_ts","amount"]).issubset(df.columns)
# Basic checks
bad = df[df["event_id"].isna() | df["user_id"].isna()]
if len(bad) > 0:
raise ValueError(f"Nulls in key columns: {len(bad)} rows")
# Deduplicate by event_id keeping latest timestamp
if "ingest_ts" not in df.columns:
df["ingest_ts"] = datetime.utcnow().isoformat()
df = df.sort_values(["event_id","ingest_ts"]).drop_duplicates(subset=["event_id"], keep="last")
# Write
os.makedirs(os.path.dirname(out_path), exist_ok=True)
df.to_parquet(out_path, index=False)
print("Loaded:", len(df))Notes:
- Idempotent writes: use deterministic output path by date.
- Incremental: driven by RUN_DATE; orchestration sets this per day.
Example 2: CDC and MERGE for upserts
Goal: Apply change data capture (inserts/updates/deletes) into a dimensions table.
-- Staging table: user_changes (op, user_id, name, email, updated_at, is_deleted)
MERGE INTO dim_users AS d
USING (
SELECT user_id, name, email, updated_at, is_deleted FROM user_changes
) AS s
ON d.user_id = s.user_id
WHEN MATCHED AND s.is_deleted = TRUE THEN DELETE
WHEN MATCHED AND s.is_deleted = FALSE AND s.updated_at >= d.updated_at THEN
UPDATE SET name = s.name, email = s.email, updated_at = s.updated_at
WHEN NOT MATCHED AND s.is_deleted = FALSE THEN
INSERT (user_id, name, email, updated_at) VALUES (s.user_id, s.name, s.email, s.updated_at);
Notes:
- Use updated_at as a watermark to prevent out-of-order overwrites.
- Soft deletes handled by DELETE clause or tombstone flag.
Example 3: Streaming pipeline for real-time features
Goal: Compute a 10-minute rolling purchase count per user for serving.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window
spark = SparkSession.builder.getOrCreate()
source = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "purchases")
.load())
# Parse and cast
from pyspark.sql.functions import from_json, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", StringType()),
StructField("event_ts", StringType()),
StructField("amount", DoubleType())
])
parsed = source.select(from_json(col("value").cast("string"), schema).alias("e")).select("e.*")
parsed = parsed.withColumn("event_ts", to_timestamp("event_ts"))
agg = (parsed
.withWatermark("event_ts", "20 minutes")
.groupBy(window(col("event_ts"), "10 minutes"), col("user_id"))
.count() # purchases_10m
)
query = (agg
.writeStream
.outputMode("update")
.format("memory")
.queryName("feat_purchases_10m")
.start())
# In production, write to a feature store or key-value cache
Notes:
- Watermarks bound state and handle late events.
- For serving, publish results to a store keyed by user_id.
Example 4: Orchestration, parameters, and backfills
Goal: Run the same job for a date range safely.
# job.py
import os
from datetime import datetime
RUN_DATE = os.environ["RUN_DATE"] # required
# Fetch data for RUN_DATE, write to deterministic partition path
# Ensure idempotency: overwrite same partition or use MERGE keyed by primary keys
# backfill.sh
START=2025-01-01
END=2025-01-10
current=$START
while [ "$current" != "$(date -I -d "$END + 1 day")" ]; do
RUN_DATE=$current python job.py || exit 1
current=$(date -I -d "$current + 1 day")
done
Notes:
- Pass run parameters via environment or CLI flags.
- Idempotent writes make reruns safe.
Example 5: Point-in-time correct training dataset
Goal: Join features to labels without leakage.
-- events(user_id, event_ts, amount)
-- churn_labels(user_id, label_ts, churned) -- label is for period after label_ts
WITH feats AS (
SELECT
user_id,
event_ts,
SUM(amount) OVER (
PARTITION BY user_id
ORDER BY event_ts
RANGE BETWEEN INTERVAL 7 DAY PRECEDING AND CURRENT ROW
) AS spend_7d
FROM events
),
joined AS (
SELECT l.user_id, l.label_ts, l.churned, f.spend_7d
FROM churn_labels l
JOIN feats f
ON f.user_id = l.user_id
AND f.event_ts <= l.label_ts -- no future info
)
SELECT * FROM joined;
Notes:
- Never join features generated after the label timestamp.
- Prefer declarative point-in-time joins or time-travel when available.
Drills and quick exercises
- Write a validation rule set for one dataset: required columns, ranges, uniqueness keys.
- Convert a daily batch job to accept a RUN_DATE parameter and rerun it for the last 7 days.
- Implement a simple CDC MERGE on a dimension table; test insert, update, and delete.
- Add a watermark to an aggregation and test behavior with late events.
- Create a backfill script that stops on the first failure and resumes from that date.
- Rename a column in a controlled manner: add new, populate, switch reads, drop old.
Common mistakes and debugging tips
- Leaky joins: Joining features with timestamps after the label_ts. Fix with point-in-time filters.
- Non-idempotent outputs: Appending duplicates on reruns. Fix by overwriting partitions or MERGE on keys.
- Missing watermarks: Streaming state growing unbounded. Fix with withWatermark and retention limits.
- Unstable schemas: Breaking downstream jobs. Fix with contracts, default values, and versioned schemas.
- Silent data drift: Changes in distributions unnoticed. Add statistical checks and alerts.
- Small files problem: Too many tiny output files. Coalesce or compact output periodically.
- Over-partitioning: date/hour/user shards lead to many empty files. Choose partition columns wisely.
Debugging checklist
- Confirm run parameters (dates, sources, paths) are correct.
- Validate row counts at each stage; reconcile with source.
- Check dedup logic and primary key definition.
- Inspect a sample of failures; capture reasons in a dead-letter path.
- Re-run a single partition locally to reproduce issues.
Mini project: End-to-end churn pipeline
Build a small pipeline that powers a churn model prototype.
- Ingest: Batch-load daily events (purchases, logins) to Parquet partitioned by date.
- Validation: Enforce required columns, deduplicate by event_id.
- Features: Compute 7d/30d spend and login counts per user (batch).
- Labels: Define churned if no login for 30 days after label_ts.
- Training set: Point-in-time join features to labels with label_ts cutoff.
- Orchestration: Parameterize RUN_DATE and backfill the last 60 days.
- Docs & lineage: Document sources, columns, owners, and dependencies.
Stretch goals
- Add streaming updates to keep 10-minute aggregates fresh.
- Implement CDC on a user dimension and integrate into features.
- Add schema evolution: introduce a new event field with defaults and monitor adoption.
Subskills
- Batch And Streaming Basics: Understand latency, throughput, and cost trade-offs to choose the right mode.
- ETL ELT Concepts For ML: Decide what to transform where; design storage and transformations for features/labels.
- CDC And Incremental Loads: Keep data fresh efficiently with watermarks, MERGE/UPSERT, and tombstones.
- Data Validation And Quality Checks: Enforce contracts, dedup, detect drift, and handle nulls.
- Orchestration Basics And Scheduling: Parameterize runs, handle retries, alerts, and backfills.
- Backfills And Idempotency: Make reruns safe; write deterministically and avoid duplicates.
- Managing Schema Changes: Evolve schemas without breaking downstream; version and document changes.
- Building Training Datasets And Labels: Create point-in-time correct datasets and clear labeling logic.
- Feature Engineering Pipelines: Implement windowed aggregates and transformations aligned to serving.
- Data Lineage And Documentation: Track sources-to-sinks, owners, and assumptions for trust and audits.
Next steps
- Pick one dataset and build a minimal pipeline with validation and incremental load.
- Add a point-in-time training dataset and a small feature set.
- Orchestrate it with parameters and test a week-long backfill.
- Document lineage and known limitations. Iterate as data and needs evolve.