Why this matters
Incremental loads let you move only the new or changed data, not the entire dataset. This saves time, money, and reduces risk. Watermarks keep track of progress so your jobs are fast, resume safely after failures, and avoid duplicate records.
- Daily jobs: Move only rows changed since yesterday.
- Streaming and CDC: Resume from the last processed offset/sequence.
- File lakes: Pick up only new or late-arriving files.
Who this is for
- ETL/ELT developers setting up reliable batch or near-real-time pipelines.
- Data engineers maintaining data lakes/warehouses.
- Analysts/engineers optimizing refresh times for dashboards.
Prerequisites
- Comfort with SQL (SELECT, JOIN, GROUP BY, MERGE/UPSERT).
- Basic ETL/ELT concepts: staging, target schemas, idempotency.
- Familiarity with timestamps, sequences, or offsets in data sources.
Concept explained simply
A watermark is a marker of how far you have processed. After each load, you store the highest processed point (like the latest updated_at timestamp, a max ID, or a CDC offset). Next time, you only read data greater than this stored value. If a job fails, you still know the last confirmed point and can resume safely.
Mental model
Think of data as a conveyor belt. The watermark is a sticky note that says "we inspected up to here." During the next run, you start just after that sticky note, inspect a new section, and move the sticky note forward to the furthest confirmed position.
Key terms
- High-water mark (HWM): The maximum processed value (timestamp, sequence, offset).
- Low-water mark: The minimum value in a processing window (used less commonly).
- Idempotency: Re-running a job produces the same final result.
- Late-arriving data: Records that belong to earlier periods but arrive after the initial load.
- Lookback window: A small overlap period to capture late arrivals safely.
Design choices
Choosing a watermark field
- Timestamps (e.g., updated_at): Easy and common; ensure timezone consistency (normalize to UTC).
- Monotonic IDs/sequences: Reliable if strictly increasing and gap-free for order.
- CDC offsets or log positions: Best for change streams; track per-partition offsets.
- File systems: Use folder date, file modified time, or a manifest index.
Where to store watermarks
- Control table (preferred): e.g., watermark(state_key, value, updated_on).
- Metadata store: Same database as target warehouse or a durable key-value store.
- Store per source + per partition if needed (e.g., Kafka partitions).
Handling late data and duplicates
- Add a small lookback window (e.g., 5–60 minutes) when reading new data.
- Use deduplication keys (natural key + last_update) in staging before MERGE.
- Design MERGE/UPSERT to be idempotent.
Timezones and clock skew
- Convert all times to UTC at ingestion.
- If sources have skew, rely on server-time or CDC sequence instead of client-time.
Deletes
- Soft deletes: propagate a deleted flag and update targets via MERGE.
- Hard deletes: require CDC with tombstones or periodic anti-join reconciliation.
Worked examples
Example 1: Database table using updated_at
- Control table stores last_hwm = 2025-03-01T00:00:00Z.
- Extract WHERE updated_at >= (last_hwm - lookback 10m).
- Stage deduplicated rows by (id, updated_at) choosing the latest.
- MERGE into target by business key; insert new, update changed.
- Set new_hwm = MAX(updated_at) that actually loaded; commit both MERGE and watermark in one transaction when possible.
-- Read delta with lookback
SELECT *
FROM src.orders
WHERE updated_at >= TIMESTAMP '2025-02-28 23:50:00+00';
-- Update watermark after success
UPDATE ctl.watermark
SET value = '2025-03-01T10:15:00Z'
WHERE state_key = 'orders.updated_at';Example 2: Object storage files by partition date
- Watermark holds last processed partition_date: 2025-02-15.
- Discover files in partitions >= 2025-02-15 - 1 day lookback.
- Load new files; dedup by file name or internal record key to avoid repeats.
- Advance watermark to the max loaded partition_date and persist a processed manifest.
# Pseudocode
candidates = list_partitions(base='s3://lake/sales/', from=wm_date - 1d)
for p in sorted(candidates):
for f in list_files(p):
if not processed_manifest.contains(f):
load_file_to_stage(f)
processed_manifest.add(f)
wm_date = max_partition_loadedExample 3: Kafka CDC with per-partition offsets
- Watermark table stores offsets per partition: topic=customers_cdc, partition=0..N.
- On start, seek each partition to stored offset.
- Process events; for upserts, keep latest by primary key and commit in micro-batches.
- After each micro-batch, atomically persist new offsets for each partition.
// Offsets table
state_key: 'customers_cdc:p{partition}', value: last_committed_offset
// Processing loop
for batch in read_stream():
apply_merge(batch)
commit_offsets(batch.max_offsets)Patterns and practical snippets
Idempotent MERGE template (SQL)
MERGE INTO dim_orders AS tgt
USING (SELECT * FROM stage_orders) AS src
ON tgt.order_id = src.order_id
WHEN MATCHED AND (tgt.hash != src.hash OR tgt.is_deleted != src.is_deleted) THEN
UPDATE SET amount = src.amount,
status = src.status,
is_deleted = src.is_deleted,
updated_at = src.updated_at,
hash = src.hash
WHEN NOT MATCHED THEN
INSERT (order_id, amount, status, is_deleted, created_at, updated_at, hash)
VALUES (src.order_id, src.amount, src.status, src.is_deleted, src.created_at, src.updated_at, src.hash);
Watermark control table
CREATE TABLE IF NOT EXISTS ctl_watermark (
state_key VARCHAR PRIMARY KEY,
value_str VARCHAR NOT NULL,
updated_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Examples:
-- 'orders.updated_at' -> '2025-03-01T10:15:00Z'
-- 'kafka.customers_cdc.p0' -> '123456789'Late-arriving lookback
-- For timestamp-based HWM, overlap by lookback_minutes
WITH params AS (
SELECT (SELECT value_str FROM ctl_watermark WHERE state_key = 'orders.updated_at') AS last_hwm
)
SELECT *
FROM src.orders o, params p
WHERE o.updated_at >= (CAST(p.last_hwm AS TIMESTAMP) - INTERVAL '10 minutes');Build-right checklist
- [ ] Choose a reliable watermark field or offset.
- [ ] Normalize timestamps to UTC.
- [ ] Add a lookback window appropriate for source latency.
- [ ] Deduplicate in staging before MERGE.
- [ ] Make MERGE idempotent and key-driven.
- [ ] Store and update watermarks atomically with load success.
- [ ] Track per-partition offsets for streaming.
- [ ] Monitor for staleness and late-arrival rates.
Exercises
Do these hands-on tasks. The same exercises are available below with solution reveals. Quick Test is available to everyone; only logged-in users get saved progress.
- Exercise 1 — Database incremental MERGE with watermark
Design an incremental load for table orders using updated_at as the watermark. Create: (a) control table DDL, (b) SELECT to read delta with a 10-minute lookback, (c) idempotent MERGE, and (d) statement to advance the watermark to the max processed updated_at. Include a short note on transaction boundaries. - Exercise 2 — File-based watermark and manifest
Given daily partitions in a data lake (sales/partition_date=YYYY-MM-DD/...), outline pseudocode to: (a) read last processed partition from a control table, (b) discover new partitions with a 1-day lookback, (c) load only unprocessed files using a manifest, (d) update the watermark.
Common mistakes and self-check
- Mistake: Using local time instead of UTC. Self-check: Do your queries convert to UTC before comparison?
- Mistake: No lookback; missing late data. Self-check: Can a record arriving a few minutes late still be captured?
- Mistake: Advancing watermark before commit. Self-check: Are MERGE and watermark update in the same atomic unit?
- Mistake: Dedup after MERGE. Self-check: Do you deduplicate in staging by natural key + last_update?
- Mistake: Single global watermark for multi-partition streams. Self-check: Do you track offsets per partition?
- Mistake: Ignoring deletes. Self-check: Do you propagate deleted flags or handle tombstones?
Practical projects
- Implement an orders incremental pipeline (DB to warehouse) with a 15-minute lookback and idempotent MERGE. Add monitoring on rows inserted/updated and watermark value.
- Build a file-lake loader that reads partitioned sales data, keeps a processed-file manifest, and updates a partition watermark.
- Create a CDC consumer that tracks per-partition offsets and performs upsert + soft-delete handling into a dimension table.
Learning path
- Before this: Data modeling basics and staging patterns.
- Now: Incremental loads and watermarks (this page).
- Next: Slowly Changing Dimensions (SCD) and change propagation; CDC ingestion patterns; Retry and backoff strategies.
Next steps
- Complete the two exercises and verify with the solutions.
- Take the Quick Test to confirm understanding (progress saved for logged-in users).
- Apply the pattern in a small project with real timings and monitoring.
Mini challenge
Your source has updated_at in local time and sometimes arrives late by 20 minutes. Propose exact SQL (or pseudocode) to normalize to UTC, apply a 30-minute lookback, deduplicate, MERGE, and update the watermark. State how you ensure idempotency and atomicity.