Why this matters
As an ETL Developer, your pipelines live or die by how reliably they bring data in. Most real-world sources are APIs and files (CSV/JSON/XML/Parquet) stored in cloud buckets, SFTP, or internal shares. You’ll deal with auth, pagination, rate limits, large files, schema drift, and retries. Mastering these lets you build trustworthy, repeatable ingestions.
- Daily tasks: pulling CRM data via REST, syncing product catalogs as CSV from S3, consuming logs as NDJSON, or reading partner data from SFTP.
- Outcomes: consistent schedules, no duplicates, recoverable failures, clear lineage, and predictable costs.
Concept explained simply
Think of a pipeline as a delivery dock. APIs and files are the trucks. You must verify identity (auth), obey rules (rate limits), unload in parts (pagination/chunking), and check what arrived (validation and checksums) before shelving it in your warehouse (staging tables) with the right labels (schema).
Mental model
- Contract: endpoint or file path + format + schema + limits + change tracking.
- Transport: HTTP(S)/SFTP + retries + backoff + timeouts.
- Unpack: parse JSON/CSV/XML, infer/validate types, handle encoding/compression.
- Control: incremental strategy (page, cursor, timestamp), idempotency, dedup.
Key building blocks
- Auth: API key, Basic, OAuth 2.0 (client credentials), signed requests.
- Pagination: page/limit, offset, cursor/next token, time-sliced windows.
- Rate limits: max requests per minute; use sleep/backoff and respect headers.
- File handling: CSV/TSV, JSON/NDJSON, XML, Parquet; gzip/zip; UTF-8 vs others.
- Incremental: watermarks (updated_at), cursors (next_page_token), change flags.
- Reliability: retries with jitter, idempotent upserts, checkpoints, dead-letter.
- Validation: row counts, schema checks, nullability, referential checks, checksums.
Worked examples
Example 1 — Paginated REST API to staging
Scenario: Pull /orders from a REST API using an API key, with page+limit pagination and a last_edited watermark.
See approach
- Auth: Add header X-Api-Key: YOUR_KEY.
- Incremental: Keep last_edited watermark. Start with a safe default (e.g., 30 days back) if none.
- Pagination: GET /orders?limit=200&page=N&updated_since=wm.
- Rate limits: 60 req/min; add sleep and exponential backoff on 429.
- Write: Stage to orders_raw (append-only) with load_id and extract_ts.
import os, time, requests, datetime
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
API = "https://api.example.com/orders"
KEY = os.getenv("API_KEY")
wm = os.getenv("WATERMARK", (datetime.datetime.utcnow()-datetime.timedelta(days=30)).isoformat()+"Z")
s = requests.Session()
retries = Retry(total=5, backoff_factor=1.5, status_forcelist=[429, 500, 502, 503, 504])
s.mount("https://", HTTPAdapter(max_retries=retries))
page = 1
all_rows = []
while True:
r = s.get(API, headers={"X-Api-Key": KEY}, params={"limit":200, "page":page, "updated_since":wm}, timeout=30)
r.raise_for_status()
data = r.json()
rows = data.get("items", [])
if not rows:
break
all_rows.extend(rows)
page += 1
# Write to staging (pseudo)
# write_jsonl("orders_raw.jsonl", all_rows)
Validation: assert >0 rows on first run; on incrementals, allow 0. Record max(last_edited) as new watermark.
Example 2 — Large CSV from object storage with chunking
Scenario: Read a 5 GB gzip CSV from an object store. Convert to Parquet in chunks, enforce schema, deduplicate by natural key (order_id).
See approach
- Stream read in chunks (e.g., 100k rows) to avoid memory blowups.
- Apply types: order_id string, amount decimal(18,2), updated_at timestamp.
- Compute hash of key fields to help dedup.
- Partition Parquet by updated_date=YYYY-MM-DD.
import pandas as pd
from hashlib import md5
chunks = pd.read_csv("s3://bucket/orders.csv.gz", compression="gzip", chunksize=100_000, dtype={"order_id":"string"}, parse_dates=["updated_at"])
def keyhash(o):
return md5(o.encode("utf-8")).hexdigest()
for df in chunks:
df["_key"] = df["order_id"].astype("string").apply(keyhash)
df = df.drop_duplicates(subset=["_key"], keep="last")
df["updated_date"] = df["updated_at"].dt.date.astype("string")
# df.to_parquet("s3://bucket/stage/orders/", partition_cols=["updated_date"], index=False)
Validation: compare input row count vs staged (after dedup), check nullability for critical fields, ensure partitions created.
Example 3 — SFTP XML with watermark
Scenario: Partner drops daily XML files to SFTP. Process only files with modified time > last watermark; transform to JSON Lines.
See approach
- List directory; filter by mtime > watermark.
- For each file: stream parse XML, map fields, handle missing nodes safely.
- On success: move to /processed; on failure: move to /error with reason.
# Pseudo: list SFTP, filter by mtime, parse element-by-element
# Use a streaming parser to avoid loading the entire XML into memory.
Validation: keep a manifest (file name, size, checksum, rows_out). Verify checksums pre/post move.
Security and reliability basics
- Never hardcode secrets. Use environment variables or your platform's secure secret store.
- Set timeouts on all network calls; default retries with exponential backoff and jitter.
- Make loads idempotent: upsert by key or use merge-on-read patterns.
- Log what you pulled: source, time window, counts, and highest watermark.
- Handle encodings explicitly (prefer UTF-8) and validate schema on ingest.
Common mistakes and self-check
- Ignoring rate limits leading to 429s. Self-check: do you read limit headers and back off?
- Full reloads every run. Self-check: is there a watermark or cursor persisted?
- Memory crashes with big files. Self-check: are you chunking/streaming?
- Silent schema drift. Self-check: do you validate types and alert on new/removed columns?
- Duplicates on retries. Self-check: is ingestion idempotent (keys/merges/hashes)?
- Timezone mistakes. Self-check: normalize to UTC and store source timezone separately.
Who this is for
ETL Developers and data engineers who need to reliably ingest data from external APIs and file-based sources into staging and warehouse layers.
Prerequisites
- Basic scripting (e.g., Python) or ETL tool familiarity.
- Comfort with HTTP concepts (headers, status codes) and file formats.
- SQL for staging and upserts/merges.
Learning path
- Start: Practice connecting to a simple API with pagination and a small CSV.
- Next: Add retries, backoff, and proper watermarking.
- Then: Tackle large files (chunking), compression, and schema validation.
- Finally: Build an end-to-end incremental pipeline with idempotent merges.
Practical projects
- Customer data sync: Incrementally pull users from a REST API and upsert into a warehouse table with a daily schedule.
- Sales CSV lake: Convert large daily CSV drops to partitioned Parquet with schema checks and a manifest.
- Partner SFTP integration: Process XML to JSONL with checksum verification and a robust error quarantine.
Exercises
Complete these mini tasks. You can do them without special tooling—designs and pseudo-configs are fine.
Exercise 1 — Design an API ingestion spec (ex1)
Goal: Write a concise ingestion spec for a hypothetical Books API:
- Endpoint: /books
- Auth: header X-Api-Key
- Pagination: page+limit (max 200)
- Incremental: updated_since (ISO-8601)
- Rate limit: 100 req/min; returns 429 with Retry-After
Include: auth method, base URL, parameters, pagination loop, rate limit handling, watermark persistence, write target (staging), validation checks, and idempotent merge strategy.
- [ ] Auth header defined and not hardcoded key
- [ ] Watermark default and update logic
- [ ] Pagination loop with stop condition
- [ ] Backoff on 429 and 5xx
- [ ] Staging + merge keys
Exercise 2 — File ingestion mapping (ex2)
Goal: Create a mapping for a CSV file customers_2025-01-01.csv.gz with columns:
customer_id,name,email,signup_ts,is_active,segment,updated_at
Define types, primary key, nullability, partitioning, and a dedup key/hash. Show validation rules.
- [ ] Types and nullability defined
- [ ] Primary key and dedup logic
- [ ] Partition column chosen
- [ ] Basic validation (row count, required fields)
Mini challenge
You get 0 rows on an incremental API run but yesterday had 10k. In 5 lines, write a plan to diagnose without breaking SLAs. Consider watermarks, filters, and source-side changes.
Next steps
Take the quick test below to check your understanding. The test is available to everyone; if you’re logged in, your progress is saved automatically.