luvv to helpDiscover the Best Free Online Tools

Data Pipelines

Learn Data Pipelines for Machine Learning Engineer for free: roadmap, examples, subskills, and a skill exam.

Published: January 1, 2026 | Updated: January 1, 2026

Why Data Pipelines matter for ML Engineers

Models are only as good as the data that feeds them. As a Machine Learning Engineer, you’ll build and operate pipelines that collect, clean, transform, and deliver data for training, evaluation, and real-time inference. Strong pipeline skills unlock reliable features, reproducible experiments, faster iteration, and dependable production systems.

  • Turn raw logs and events into high-quality features.
  • Keep models fresh with incremental loads and streaming updates.
  • Prevent data leaks and training-serving skew.
  • Automate, backfill, and document everything for repeatability.

Who this is for

  • ML/AI practitioners moving from notebooks to production.
  • Data scientists who need reliable training datasets.
  • Software/data engineers supporting ML workloads.

Prerequisites

  • Comfortable with Python and basic SQL.
  • Familiar with data formats (CSV, JSON, Parquet) and Git.
  • Basic ML workflow knowledge (train/validate/test, features/labels).

What you’ll be able to do

  • Design batch and streaming pipelines for ML training and serving.
  • Implement ETL/ELT for features and labels with validation.
  • Run CDC and incremental loads with idempotency and backfills.
  • Orchestrate jobs on a schedule with parameters and retries.
  • Manage schema changes without breaking downstream jobs.
  • Build point-in-time correct training datasets and feature pipelines.
  • Track lineage and document your pipelines clearly.

Learning path (practical roadmap)

  1. Foundations: Batch vs streaming, ETL vs ELT, file formats, partitioning.
  2. Data correctness: Validation rules, deduplication, null handling, constraints.
  3. Incrementality: CDC, watermarks, MERGE/UPSERT, exactly-once via idempotency keys.
  4. Orchestration: Parameterized jobs, retries, alerts, backfills.
  5. Feature pipelines: Point-in-time joins, windowed aggregates, avoiding leakage.
  6. Operations: Schema evolution, lineage, documentation, cost/performance tuning.
Design trade-offs to consider
  • Batch vs streaming: latency vs cost/complexity.
  • ETL vs ELT: central compute vs source/warehouse constraints.
  • Storage: row vs columnar; partition depth vs small-file problem.
  • Orchestration: time-based vs event-driven triggers.

Worked examples

Example 1: Batch ETL with validation and incremental load

Goal: Ingest daily events CSV to Parquet, validate, deduplicate, and load incrementally by date.

import os, glob, pandas as pd
from datetime import datetime

RUN_DATE = os.environ.get("RUN_DATE", "2025-01-01")  # yyyy-mm-dd
src_pattern = f"/data/raw/events/date={RUN_DATE}/*.csv"
out_path = f"/data/curated/events/date={RUN_DATE}/part-000.parquet"

# Load
files = glob.glob(src_pattern)
df = pd.concat((pd.read_csv(p) for p in files), ignore_index=True) if files else pd.DataFrame()

# Validate
assert set(["event_id","user_id","event_ts","amount"]).issubset(df.columns)

# Basic checks
bad = df[df["event_id"].isna() | df["user_id"].isna()]
if len(bad) > 0:
    raise ValueError(f"Nulls in key columns: {len(bad)} rows")

# Deduplicate by event_id keeping latest timestamp
if "ingest_ts" not in df.columns:
    df["ingest_ts"] = datetime.utcnow().isoformat()

df = df.sort_values(["event_id","ingest_ts"]).drop_duplicates(subset=["event_id"], keep="last")

# Write
os.makedirs(os.path.dirname(out_path), exist_ok=True)
df.to_parquet(out_path, index=False)
print("Loaded:", len(df))

Notes:

  • Idempotent writes: use deterministic output path by date.
  • Incremental: driven by RUN_DATE; orchestration sets this per day.

Example 2: CDC and MERGE for upserts

Goal: Apply change data capture (inserts/updates/deletes) into a dimensions table.

-- Staging table: user_changes (op, user_id, name, email, updated_at, is_deleted)
MERGE INTO dim_users AS d
USING (
  SELECT user_id, name, email, updated_at, is_deleted FROM user_changes
) AS s
ON d.user_id = s.user_id
WHEN MATCHED AND s.is_deleted = TRUE THEN DELETE
WHEN MATCHED AND s.is_deleted = FALSE AND s.updated_at >= d.updated_at THEN
  UPDATE SET name = s.name, email = s.email, updated_at = s.updated_at
WHEN NOT MATCHED AND s.is_deleted = FALSE THEN
  INSERT (user_id, name, email, updated_at) VALUES (s.user_id, s.name, s.email, s.updated_at);

Notes:

  • Use updated_at as a watermark to prevent out-of-order overwrites.
  • Soft deletes handled by DELETE clause or tombstone flag.

Example 3: Streaming pipeline for real-time features

Goal: Compute a 10-minute rolling purchase count per user for serving.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window

spark = SparkSession.builder.getOrCreate()
source = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "broker:9092")
  .option("subscribe", "purchases")
  .load())

# Parse and cast
from pyspark.sql.functions import from_json, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
schema = StructType([
    StructField("event_id", StringType()),
    StructField("user_id", StringType()),
    StructField("event_ts", StringType()),
    StructField("amount", DoubleType())
])
parsed = source.select(from_json(col("value").cast("string"), schema).alias("e")).select("e.*")
parsed = parsed.withColumn("event_ts", to_timestamp("event_ts"))

agg = (parsed
  .withWatermark("event_ts", "20 minutes")
  .groupBy(window(col("event_ts"), "10 minutes"), col("user_id"))
  .count()  # purchases_10m
)

query = (agg
  .writeStream
  .outputMode("update")
  .format("memory")
  .queryName("feat_purchases_10m")
  .start())

# In production, write to a feature store or key-value cache

Notes:

  • Watermarks bound state and handle late events.
  • For serving, publish results to a store keyed by user_id.

Example 4: Orchestration, parameters, and backfills

Goal: Run the same job for a date range safely.

# job.py
import os
from datetime import datetime

RUN_DATE = os.environ["RUN_DATE"]  # required
# Fetch data for RUN_DATE, write to deterministic partition path
# Ensure idempotency: overwrite same partition or use MERGE keyed by primary keys
# backfill.sh
START=2025-01-01
END=2025-01-10
current=$START
while [ "$current" != "$(date -I -d "$END + 1 day")" ]; do
  RUN_DATE=$current python job.py || exit 1
  current=$(date -I -d "$current + 1 day")
done

Notes:

  • Pass run parameters via environment or CLI flags.
  • Idempotent writes make reruns safe.

Example 5: Point-in-time correct training dataset

Goal: Join features to labels without leakage.

-- events(user_id, event_ts, amount)
-- churn_labels(user_id, label_ts, churned)  -- label is for period after label_ts
WITH feats AS (
  SELECT
    user_id,
    event_ts,
    SUM(amount) OVER (
      PARTITION BY user_id
      ORDER BY event_ts
      RANGE BETWEEN INTERVAL 7 DAY PRECEDING AND CURRENT ROW
    ) AS spend_7d
  FROM events
),
joined AS (
  SELECT l.user_id, l.label_ts, l.churned, f.spend_7d
  FROM churn_labels l
  JOIN feats f
    ON f.user_id = l.user_id
   AND f.event_ts <= l.label_ts  -- no future info
)
SELECT * FROM joined;

Notes:

  • Never join features generated after the label timestamp.
  • Prefer declarative point-in-time joins or time-travel when available.

Drills and quick exercises

  • Write a validation rule set for one dataset: required columns, ranges, uniqueness keys.
  • Convert a daily batch job to accept a RUN_DATE parameter and rerun it for the last 7 days.
  • Implement a simple CDC MERGE on a dimension table; test insert, update, and delete.
  • Add a watermark to an aggregation and test behavior with late events.
  • Create a backfill script that stops on the first failure and resumes from that date.
  • Rename a column in a controlled manner: add new, populate, switch reads, drop old.

Common mistakes and debugging tips

  • Leaky joins: Joining features with timestamps after the label_ts. Fix with point-in-time filters.
  • Non-idempotent outputs: Appending duplicates on reruns. Fix by overwriting partitions or MERGE on keys.
  • Missing watermarks: Streaming state growing unbounded. Fix with withWatermark and retention limits.
  • Unstable schemas: Breaking downstream jobs. Fix with contracts, default values, and versioned schemas.
  • Silent data drift: Changes in distributions unnoticed. Add statistical checks and alerts.
  • Small files problem: Too many tiny output files. Coalesce or compact output periodically.
  • Over-partitioning: date/hour/user shards lead to many empty files. Choose partition columns wisely.
Debugging checklist
  • Confirm run parameters (dates, sources, paths) are correct.
  • Validate row counts at each stage; reconcile with source.
  • Check dedup logic and primary key definition.
  • Inspect a sample of failures; capture reasons in a dead-letter path.
  • Re-run a single partition locally to reproduce issues.

Mini project: End-to-end churn pipeline

Build a small pipeline that powers a churn model prototype.

  1. Ingest: Batch-load daily events (purchases, logins) to Parquet partitioned by date.
  2. Validation: Enforce required columns, deduplicate by event_id.
  3. Features: Compute 7d/30d spend and login counts per user (batch).
  4. Labels: Define churned if no login for 30 days after label_ts.
  5. Training set: Point-in-time join features to labels with label_ts cutoff.
  6. Orchestration: Parameterize RUN_DATE and backfill the last 60 days.
  7. Docs & lineage: Document sources, columns, owners, and dependencies.
Stretch goals
  • Add streaming updates to keep 10-minute aggregates fresh.
  • Implement CDC on a user dimension and integrate into features.
  • Add schema evolution: introduce a new event field with defaults and monitor adoption.

Subskills

  • Batch And Streaming Basics: Understand latency, throughput, and cost trade-offs to choose the right mode.
  • ETL ELT Concepts For ML: Decide what to transform where; design storage and transformations for features/labels.
  • CDC And Incremental Loads: Keep data fresh efficiently with watermarks, MERGE/UPSERT, and tombstones.
  • Data Validation And Quality Checks: Enforce contracts, dedup, detect drift, and handle nulls.
  • Orchestration Basics And Scheduling: Parameterize runs, handle retries, alerts, and backfills.
  • Backfills And Idempotency: Make reruns safe; write deterministically and avoid duplicates.
  • Managing Schema Changes: Evolve schemas without breaking downstream; version and document changes.
  • Building Training Datasets And Labels: Create point-in-time correct datasets and clear labeling logic.
  • Feature Engineering Pipelines: Implement windowed aggregates and transformations aligned to serving.
  • Data Lineage And Documentation: Track sources-to-sinks, owners, and assumptions for trust and audits.

Next steps

  • Pick one dataset and build a minimal pipeline with validation and incremental load.
  • Add a point-in-time training dataset and a small feature set.
  • Orchestrate it with parameters and test a week-long backfill.
  • Document lineage and known limitations. Iterate as data and needs evolve.

Data Pipelines — Skill Exam

This exam checks practical understanding of Data Pipelines for ML Engineers. 12 questions, mixed difficulty. You can retry any time. Everyone can take the exam for free. Only logged‑in users have their progress and results saved.Tips: Read carefully, watch for point‑in‑time correctness, idempotency, and incremental logic.

12 questions70% to pass

Have questions about Data Pipelines?

AI Assistant

Ask questions about this tool