Why Data Ingestion matters for Data Engineers
Data ingestion is how raw data enters your platform: files, APIs, streams, and databases. As a Data Engineer, you design reliable pipelines that bring data in continuously, accurately, and safely, so analysts and ML systems can trust it. Strong ingestion skills unlock dependable dashboards, reproducible ML features, and auditable decision-making.
- Turn business questions into data sourcing plans.
- Ingest from APIs, files, message queues, and databases.
- Handle late data, schema changes, and reprocessing without duplications.
- Meet SLAs for freshness and availability.
What you’ll be able to do
- Choose the right ingestion pattern (batch vs streaming vs CDC).
- Implement incremental loads with watermarks and idempotency.
- Design backfills and reprocessing plans safely.
- Handle schema evolution without breaking downstream jobs.
Who this is for
- Aspiring and junior Data Engineers building their first pipelines.
- Analytics Engineers and ML Engineers who need production-grade ingestion.
- Software Engineers moving into data platforms.
Prerequisites
- Comfort with basic SQL (SELECT, JOIN, GROUP BY).
- Intro Python scripting (requests, file I/O) or similar language skills.
- Familiarity with cloud storage concepts (e.g., buckets, object paths) and message queues.
Learning path
Milestone checklist
- Have a written ingestion spec with SLAs and data contract.
- One batch pipeline with incremental loads and dedupe.
- One streaming or CDC feed with at-least-once semantics.
- Schema changes tested (add column, type widening).
- Backfill runbook documented.
Worked examples
Example 1 — Batch ingest CSV to object storage with validation
Goal: Load a daily CSV drop into a data lake folder, validate columns, and write date-partitioned files.
# Python 3.x
import csv, io, json, os, datetime
from pathlib import Path
# Simulate landing directory and validated output
LANDING = Path("./landing")
BRONZE = Path("./bronze/orders")
BRONZE.mkdir(parents=True, exist_ok=True)
expected_cols = ["order_id","user_id","amount","updated_at"]
def validate_row(row):
# basic checks
assert set(row.keys()) == set(expected_cols)
float(row["amount"]) # numeric check
# pick today's file
today = datetime.date.today().strftime("%Y-%m-%d")
source_file = LANDING / f"orders_{today}.csv"
partition_dir = BRONZE / f"ingest_date={today}"
partition_dir.mkdir(parents=True, exist_ok=True)
with open(source_file, newline="") as f, open(partition_dir / "part-000.csv", "w", newline="") as out:
reader = csv.DictReader(f)
writer = csv.DictWriter(out, fieldnames=expected_cols)
writer.writeheader()
count = 0
for row in reader:
validate_row(row)
writer.writerow(row)
count += 1
print(f"Wrote {count} rows to {partition_dir}")
Notes: Partition by ingest_date for reproducibility. Keep the original file in landing for audits.
Example 2 — API incremental ingestion with pagination and watermarks
Goal: Pull only new/changed records since last run, paginated, with safe re-runs.
import requests, json, time, pathlib
STATE = pathlib.Path("./state_api.json")
state = {"last_updated_at": "1970-01-01T00:00:00Z"}
if STATE.exists():
state = json.loads(STATE.read_text())
params = {"updated_since": state["last_updated_at"], "limit": 100}
url = "https://example.api/orders"
all_rows = []
while True:
r = requests.get(url, params=params, timeout=30)
r.raise_for_status()
data = r.json()
rows = data["results"]
if not rows:
break
all_rows.extend(rows)
# use server-provided next token if available
if data.get("next_cursor"):
params["cursor"] = data["next_cursor"]
else:
break
# Write out and then move watermark forward to the max seen updated_at
max_updated = state["last_updated_at"]
for row in all_rows:
if row["updated_at"] > max_updated:
max_updated = row["updated_at"]
# Important: move watermark AFTER successful write
# and prefer strictly greater-than (>) next time while keeping idempotent dedupe downstream.
STATE.write_text(json.dumps({"last_updated_at": max_updated}))
Notes: Use strict ordering on updated_at. Downstream, dedupe by natural key and latest updated_at.
Example 3 — Streaming: consume Kafka and write rolling files
Goal: Ingest from a Kafka topic with at-least-once semantics and compact small files hourly.
from kafka import KafkaConsumer
import json, time, datetime, pathlib
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='orders_ingestors',
enable_auto_commit=False,
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
out_dir = pathlib.Path('./stream/bronze')
out_dir.mkdir(parents=True, exist_ok=True)
current_bucket = None
file = None
try:
for msg in consumer:
now = datetime.datetime.utcnow().strftime('%Y%m%d%H')
if current_bucket != now:
if file:
file.close()
current_bucket = now
file = open(out_dir / f'orders_{now}.jsonl', 'a')
file.write(json.dumps(msg.value) + "\n")
# commit periodically for at-least-once
if msg.offset % 1000 == 0:
consumer.commit()
finally:
if file:
file.close()
Notes: This is at-least-once. Downstream dedupe using an event_id to guarantee idempotency.
Example 4 — CDC connector config (MySQL to Kafka)
Goal: Capture inserts/updates/deletes via binlog CDC.
{
"name": "mysql-orders-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "cdc_user",
"database.password": "*****",
"database.server.id": "184054",
"database.server.name": "src_mysql",
"table.include.list": "shop.orders,shop.order_items",
"include.schema.changes": "false",
"tombstones.on.delete": "false",
"snapshot.mode": "initial",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"max.batch.size": "2048"
}
}
Notes: Use an unwrap transform to emit the latest row state per change event. Plan downstream merges that handle deletes (soft delete flag or separate topic).
Example 5 — Idempotent upsert with SQL MERGE and backfill
Goal: Upsert bronze rows into a warehouse table without duplicates; safe to re-run.
MERGE INTO warehouse.orders AS t
USING (
SELECT order_id, user_id, amount, updated_at,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY updated_at DESC) AS rn
FROM bronze.orders_daily
WHERE ingest_date = CURRENT_DATE
) AS s
ON t.order_id = s.order_id
WHEN MATCHED AND s.rn = 1 AND s.updated_at > t.updated_at THEN
UPDATE SET user_id = s.user_id, amount = s.amount, updated_at = s.updated_at
WHEN NOT MATCHED AND s.rn = 1 THEN
INSERT (order_id, user_id, amount, updated_at)
VALUES (s.order_id, s.user_id, s.amount, s.updated_at);
Notes: Row_number picks the latest version per key. Re-running is safe because updates only apply when source is newer.
Drills and exercises
- List three ingestion patterns and when you’d use each.
- Define a watermark for an orders dataset and justify the column choice.
- Write a dedupe query for events with potential duplicates.
- Simulate a backfill of the last 7 days without touching newer data.
- Sketch a schema evolution plan for adding a nullable column and later making it required.
Common mistakes and debugging tips
Mistake: Using <= watermarks
Using <= on updated_at often re-ingests the last record each run, creating duplicates unless downstream dedupe is perfect. Prefer strict > and keep idempotent merges.
Mistake: No unique event key
Without a stable key (order_id or event_id), dedupe becomes guesswork. Add or derive a deterministic key at ingestion time.
Mistake: Ignoring late data
If your pipeline closes data too quickly, late arrivals are dropped. Keep a grace period (e.g., 24–72 hours) and schedule periodic re-merges.
Mistake: Hard-breaking on new columns
Failing on extra columns wastes runs. Prefer permissive reads with warnings, then explicitly map what you need and log drift.
Debugging tip: Trace one record end-to-end
Pick a single order_id and trace its journey across landing, bronze, silver, and warehouse. Differences reveal where transformations or dedupe failed.
Mini project: Unified orders ingestion
Build a small ingestion stack that pulls:
- Daily batch CSV of orders.
- Incremental API of users.
- Streaming topic of payments.
Acceptance criteria
- Re-runs produce the same warehouse record counts (idempotent).
- Backfilling a past day does not double-count.
- Schema change: adding a nullable column does not break the pipeline.
Practical projects
- File lakehouse: ingest raw CSVs to bronze, compact to parquet in silver, and publish a clean orders table.
- API to warehouse: schedule incremental pulls of a SaaS dataset and upsert to a fact table.
- Clickstream stream: consume events, partition by hour, and maintain a 48-hour late-data merge window.
Subskills
- Source System Discovery: Map owners, SLAs, data contracts, and delivery formats.
- API Ingestion Patterns: Handle pagination, rate limits, and retries with watermarks.
- Batch File Ingestion: Land, validate, and partition files safely.
- Streaming Ingestion Basics: Topics, partitions, offsets, and delivery guarantees.
- CDC Concepts And Change Capture: Capture inserts, updates, and deletes from DB logs.
- Incremental Loads And Watermarks: Move only new/changed records reliably.
- Schema Evolution Handling: Add, widen, and deprecate fields without breakage.
- Idempotent Loads And Backfills: Safe re-runs, dedupe, and historical rebuilds.
Next steps
- Practice: implement one pipeline of each pattern (batch, API, streaming, CDC if available).
- Add metrics and alerts for freshness, volume, and error rates.
- Take the skill exam below to check your understanding. Progress is saved for logged-in users; everyone can take the exam for free.