luvv to helpDiscover the Best Free Online Tools

Data Ingestion

Learn Data Ingestion for Data Engineer for free: roadmap, examples, subskills, and a skill exam.

Published: January 8, 2026 | Updated: January 8, 2026

Why Data Ingestion matters for Data Engineers

Data ingestion is how raw data enters your platform: files, APIs, streams, and databases. As a Data Engineer, you design reliable pipelines that bring data in continuously, accurately, and safely, so analysts and ML systems can trust it. Strong ingestion skills unlock dependable dashboards, reproducible ML features, and auditable decision-making.

  • Turn business questions into data sourcing plans.
  • Ingest from APIs, files, message queues, and databases.
  • Handle late data, schema changes, and reprocessing without duplications.
  • Meet SLAs for freshness and availability.

What you’ll be able to do

  • Choose the right ingestion pattern (batch vs streaming vs CDC).
  • Implement incremental loads with watermarks and idempotency.
  • Design backfills and reprocessing plans safely.
  • Handle schema evolution without breaking downstream jobs.

Who this is for

  • Aspiring and junior Data Engineers building their first pipelines.
  • Analytics Engineers and ML Engineers who need production-grade ingestion.
  • Software Engineers moving into data platforms.

Prerequisites

  • Comfort with basic SQL (SELECT, JOIN, GROUP BY).
  • Intro Python scripting (requests, file I/O) or similar language skills.
  • Familiarity with cloud storage concepts (e.g., buckets, object paths) and message queues.

Learning path

Step 1: Source discovery and contracts — list systems, data owners, SLAs, volumes, and data quality constraints.
Step 2: Pick an ingestion pattern — batch files, API pull, streaming, or CDC based on latency, cost, and reliability needs.
Step 3: Implement incremental load — choose keys, set watermarks (timestamps, IDs), and ensure idempotency.
Step 4: Add schema evolution handling — nullable-first changes, schema registry or validations.
Step 5: Backfills and reprocessing — plan safe re-runs, deduping, and late-arrival windows.
Step 6: Observability — add metrics (lag, freshness), alerts, and data quality checks.
Milestone checklist
  • Have a written ingestion spec with SLAs and data contract.
  • One batch pipeline with incremental loads and dedupe.
  • One streaming or CDC feed with at-least-once semantics.
  • Schema changes tested (add column, type widening).
  • Backfill runbook documented.

Worked examples

Example 1 — Batch ingest CSV to object storage with validation

Goal: Load a daily CSV drop into a data lake folder, validate columns, and write date-partitioned files.

# Python 3.x
import csv, io, json, os, datetime
from pathlib import Path

# Simulate landing directory and validated output
LANDING = Path("./landing")
BRONZE = Path("./bronze/orders")
BRONZE.mkdir(parents=True, exist_ok=True)

expected_cols = ["order_id","user_id","amount","updated_at"]

def validate_row(row):
    # basic checks
    assert set(row.keys()) == set(expected_cols)
    float(row["amount"])  # numeric check

# pick today's file
today = datetime.date.today().strftime("%Y-%m-%d")
source_file = LANDING / f"orders_{today}.csv"

partition_dir = BRONZE / f"ingest_date={today}"
partition_dir.mkdir(parents=True, exist_ok=True)

with open(source_file, newline="") as f, open(partition_dir / "part-000.csv", "w", newline="") as out:
    reader = csv.DictReader(f)
    writer = csv.DictWriter(out, fieldnames=expected_cols)
    writer.writeheader()
    count = 0
    for row in reader:
        validate_row(row)
        writer.writerow(row)
        count += 1
print(f"Wrote {count} rows to {partition_dir}")

Notes: Partition by ingest_date for reproducibility. Keep the original file in landing for audits.

Example 2 — API incremental ingestion with pagination and watermarks

Goal: Pull only new/changed records since last run, paginated, with safe re-runs.

import requests, json, time, pathlib
STATE = pathlib.Path("./state_api.json")
state = {"last_updated_at": "1970-01-01T00:00:00Z"}
if STATE.exists():
    state = json.loads(STATE.read_text())

params = {"updated_since": state["last_updated_at"], "limit": 100}
url = "https://example.api/orders"
all_rows = []
while True:
    r = requests.get(url, params=params, timeout=30)
    r.raise_for_status()
    data = r.json()
    rows = data["results"]
    if not rows:
        break
    all_rows.extend(rows)
    # use server-provided next token if available
    if data.get("next_cursor"):
        params["cursor"] = data["next_cursor"]
    else:
        break

# Write out and then move watermark forward to the max seen updated_at
max_updated = state["last_updated_at"]
for row in all_rows:
    if row["updated_at"] > max_updated:
        max_updated = row["updated_at"]

# Important: move watermark AFTER successful write
# and prefer strictly greater-than (>) next time while keeping idempotent dedupe downstream.
STATE.write_text(json.dumps({"last_updated_at": max_updated}))

Notes: Use strict ordering on updated_at. Downstream, dedupe by natural key and latest updated_at.

Example 3 — Streaming: consume Kafka and write rolling files

Goal: Ingest from a Kafka topic with at-least-once semantics and compact small files hourly.

from kafka import KafkaConsumer
import json, time, datetime, pathlib

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='orders_ingestors',
    enable_auto_commit=False,
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

out_dir = pathlib.Path('./stream/bronze')
out_dir.mkdir(parents=True, exist_ok=True)

current_bucket = None
file = None

try:
    for msg in consumer:
        now = datetime.datetime.utcnow().strftime('%Y%m%d%H')
        if current_bucket != now:
            if file:
                file.close()
            current_bucket = now
            file = open(out_dir / f'orders_{now}.jsonl', 'a')
        file.write(json.dumps(msg.value) + "\n")
        # commit periodically for at-least-once
        if msg.offset % 1000 == 0:
            consumer.commit()
finally:
    if file:
        file.close()

Notes: This is at-least-once. Downstream dedupe using an event_id to guarantee idempotency.

Example 4 — CDC connector config (MySQL to Kafka)

Goal: Capture inserts/updates/deletes via binlog CDC.

{
  "name": "mysql-orders-cdc",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "cdc_user",
    "database.password": "*****",
    "database.server.id": "184054",
    "database.server.name": "src_mysql",
    "table.include.list": "shop.orders,shop.order_items",
    "include.schema.changes": "false",
    "tombstones.on.delete": "false",
    "snapshot.mode": "initial",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "max.batch.size": "2048"
  }
}

Notes: Use an unwrap transform to emit the latest row state per change event. Plan downstream merges that handle deletes (soft delete flag or separate topic).

Example 5 — Idempotent upsert with SQL MERGE and backfill

Goal: Upsert bronze rows into a warehouse table without duplicates; safe to re-run.

MERGE INTO warehouse.orders AS t
USING (
  SELECT order_id, user_id, amount, updated_at,
         ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY updated_at DESC) AS rn
  FROM bronze.orders_daily
  WHERE ingest_date = CURRENT_DATE
) AS s
ON t.order_id = s.order_id
WHEN MATCHED AND s.rn = 1 AND s.updated_at > t.updated_at THEN
  UPDATE SET user_id = s.user_id, amount = s.amount, updated_at = s.updated_at
WHEN NOT MATCHED AND s.rn = 1 THEN
  INSERT (order_id, user_id, amount, updated_at)
  VALUES (s.order_id, s.user_id, s.amount, s.updated_at);

Notes: Row_number picks the latest version per key. Re-running is safe because updates only apply when source is newer.

Drills and exercises

  • List three ingestion patterns and when you’d use each.
  • Define a watermark for an orders dataset and justify the column choice.
  • Write a dedupe query for events with potential duplicates.
  • Simulate a backfill of the last 7 days without touching newer data.
  • Sketch a schema evolution plan for adding a nullable column and later making it required.

Common mistakes and debugging tips

Mistake: Using <= watermarks

Using <= on updated_at often re-ingests the last record each run, creating duplicates unless downstream dedupe is perfect. Prefer strict > and keep idempotent merges.

Mistake: No unique event key

Without a stable key (order_id or event_id), dedupe becomes guesswork. Add or derive a deterministic key at ingestion time.

Mistake: Ignoring late data

If your pipeline closes data too quickly, late arrivals are dropped. Keep a grace period (e.g., 24–72 hours) and schedule periodic re-merges.

Mistake: Hard-breaking on new columns

Failing on extra columns wastes runs. Prefer permissive reads with warnings, then explicitly map what you need and log drift.

Debugging tip: Trace one record end-to-end

Pick a single order_id and trace its journey across landing, bronze, silver, and warehouse. Differences reveal where transformations or dedupe failed.

Mini project: Unified orders ingestion

Build a small ingestion stack that pulls:

  • Daily batch CSV of orders.
  • Incremental API of users.
  • Streaming topic of payments.
1. Define contracts and SLAs (latency target, retention, expected daily volumes).
2. Implement batch CSV landing to bronze with schema validation and partitioning.
3. Implement API incremental pull with updated_at watermark and state file.
4. Implement streaming consumer writing hourly JSONL with event_id.
5. Create warehouse tables and MERGE upserts for each source; ensure idempotency.
6. Add observability: record counts, max/min updated_at, and data freshness metrics.
Acceptance criteria
  • Re-runs produce the same warehouse record counts (idempotent).
  • Backfilling a past day does not double-count.
  • Schema change: adding a nullable column does not break the pipeline.

Practical projects

  • File lakehouse: ingest raw CSVs to bronze, compact to parquet in silver, and publish a clean orders table.
  • API to warehouse: schedule incremental pulls of a SaaS dataset and upsert to a fact table.
  • Clickstream stream: consume events, partition by hour, and maintain a 48-hour late-data merge window.

Subskills

  • Source System Discovery: Map owners, SLAs, data contracts, and delivery formats.
  • API Ingestion Patterns: Handle pagination, rate limits, and retries with watermarks.
  • Batch File Ingestion: Land, validate, and partition files safely.
  • Streaming Ingestion Basics: Topics, partitions, offsets, and delivery guarantees.
  • CDC Concepts And Change Capture: Capture inserts, updates, and deletes from DB logs.
  • Incremental Loads And Watermarks: Move only new/changed records reliably.
  • Schema Evolution Handling: Add, widen, and deprecate fields without breakage.
  • Idempotent Loads And Backfills: Safe re-runs, dedupe, and historical rebuilds.

Next steps

  • Practice: implement one pipeline of each pattern (batch, API, streaming, CDC if available).
  • Add metrics and alerts for freshness, volume, and error rates.
  • Take the skill exam below to check your understanding. Progress is saved for logged-in users; everyone can take the exam for free.

Have questions about Data Ingestion?

AI Assistant

Ask questions about this tool