Why this matters
Integrating a data quality framework makes data pipelines trustworthy by default. As a Data Platform Engineer, you will:
- Gate pipeline runs with automatic checks so bad data does not silently land in production.
- Quarantine bad records for triage while allowing healthy data to progress.
- Expose dataset health metrics and alerts to stakeholders.
- Codify quality rules once and reuse them across batch, streaming, and warehouse jobs.
- Support data contracts with producers and enforce them programmatically.
Quick test is available to everyone; sign in to save your progress across sessions.
Concept explained simply
A data quality framework is a reusable set of checks, tests, and metrics that verify data assumptions automatically where your data flows run (ETL/ELT, dbt, streams). Think of it as guardrails that run on every batch or message.
Mental model: 4 guardrails
- Source contract: Validate schema and key constraints at ingestion.
- Batch/Stream checks: Nulls, uniqueness, ranges, referential integrity, freshness.
- Warehouse tests: Model-level assertions in dbt/SQL before exposure to BI.
- Monitoring: Long-running SLOs (e.g., failure rate < 1%), anomaly detection, alerting.
Integration patterns and steps
- Define rule taxonomy and severity
- Error: Stop publish and quarantine.
- Warn: Continue but log metric and alert.
- Choose framework(s)
- Batch/warehouse: Great Expectations, dbt tests, SQL assertions.
- Spark/EMR: Deequ-like checks or SQL tests.
- Streaming: Schema registry + validation + dead-letter queues.
- Wire into orchestration
- Airflow/Prefect: Short-circuit or branch on failures.
- dbt: Run tests before exposure steps.
- Streams: Reject/route bad messages.
- Emit metrics
- Track failures per rule, failure rate, row counts, freshness.
- Quarantine and triage
- Store failing rows with rule names and timestamps.
Example: Batch checks with Great Expectations in an Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from datetime import datetime
import great_expectations as ge
def extract():
# return a pandas DataFrame 'df' with columns: order_id, user_id, amount, order_ts
...
def run_quality(df):
context = ge.get_context()
suite_name = "orders_suite"
# Create validator from runtime data
from great_expectations.core.batch import RuntimeBatchRequest
batch = RuntimeBatchRequest(
datasource_name="my_pandas",
data_connector_name="default_inferred_data_connector_name",
data_asset_name="orders_df",
runtime_parameters={"batch_data": df},
batch_identifiers={"default_identifier_name": "run"},
)
validator = context.get_validator(batch_request=batch, expectation_suite_name=suite_name)
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between("amount", min_value=0)
result = validator.validate()
return {"success": result.success, "result": result}
def short_circuit_fn(**ctx):
return ctx["ti"].xcom_pull(key=None, task_ids="quality")["success"]
def load(df):
# load df to warehouse if checks passed
...
def quarantine_fn(**ctx):
# optional: write failed rows to a quarantine table for triage
...
dag = DAG("orders_with_quality", start_date=datetime(2024,1,1), schedule_interval=None, catchup=False)
t_extract = PythonOperator(task_id="extract", python_callable=extract, dag=dag)
t_quality = PythonOperator(task_id="quality", python_callable=run_quality, op_args=["{{ ti.xcom_pull('extract') }}"], dag=dag)
t_gate = ShortCircuitOperator(task_id="gate", python_callable=short_circuit_fn, dag=dag)
t_load = PythonOperator(task_id="load", python_callable=load, op_args=["{{ ti.xcom_pull('extract') }}"], dag=dag)
t_quarantine = PythonOperator(task_id="quarantine", python_callable=quarantine_fn, dag=dag)
t_extract >> t_quality >> [t_gate, t_quarantine]
t_gate >> t_loadExample: dbt model tests in schema.yml
version: 2
models:
- name: stg_orders
columns:
- name: order_id
tests:
- not_null
- unique
- name: amount
tests:
- not_null
- name: status
tests:
- accepted_values:
values: ['pending', 'paid', 'cancelled']
tests:
- relationships:
to: ref('dim_users')
field: user_id
severity: errorExample: Streaming with schema validation and DLQ
# Pseudocode
# - Enforce schema via registry (compatibility rules)
# - Validate message fields; if invalid, route to a dead-letter topic
def process_message(msg):
if not msg.order_id or msg.amount < 0:
send_to_dlq({"reason": "INVALID_FIELDS", "message": msg})
return
write_to_bronze(msg)Worked examples
- Gate a batch load
- Checks: not null order_id, unique order_id, amount >= 0.
- Action: If any check fails, skip load and write failing rows to orders_quarantine with rule names.
- Protect warehouse models with dbt
- Add schema.yml tests on stg_orders and fact_orders.
- Run: dbt run — models staging; dbt test — models staging and fact before exposures.
- Streaming DLQ
- When schema breaks or values are invalid, produce to orders_dlq with reason and partition by rule.
- Metric: dq_failure_rate = dlq_messages / total_messages.
Who this is for
- Data Platform Engineers wiring orchestration, storage, and governance.
- Data Engineers building batch/streaming pipelines.
- Analytics Engineers managing dbt models.
Prerequisites
- Comfort with SQL (joins, constraints, window functions).
- Basic Python for pipeline tasks.
- Familiarity with orchestration (e.g., Airflow) and/or dbt.
- Understanding of schemas, partitions, and warehouse tables.
Learning path
- List data assumptions for one dataset (keys, nullability, ranges, freshness).
- Add checks in one place (dbt or GE) and make them fail visibly.
- Introduce gating logic and quarantine target.
- Publish metrics and alerts (rule failure counts, failure rate).
- Scale to more datasets; standardize rule templates and severities.
Exercises
Do these hands-on tasks to anchor the concepts. Then take the quick test below.
- Exercise 1: Add a data quality gate to an Airflow batch pipeline.
- Exercise 2: Write dbt tests for a staging model.
Checklist: did you integrate it right?
- Rules are code-reviewed and stored with the pipeline/model.
- Failures stop publish steps for critical datasets.
- Quarantine preserves failing rows with the rule name and timestamp.
- Metrics show failure counts, failure rate, and freshness.
- Alerts distinguish warn vs error with clear run links or IDs.
- Developers can reproduce failures locally with the same rule set.
Common mistakes and self-checks
- Only testing locally. Self-check: Can you trigger a failure in CI or the scheduler and see it block publish?
- All-or-nothing failures. Self-check: Do you quarantine bad records but keep good ones when safe?
- Rules drift from business reality. Self-check: Are rule owners and severities documented? Is there a review cadence?
- No observability. Self-check: Do you have failure rate and freshness SLOs with alerts?
- Duplicated rules across tools. Self-check: Is there a single source-of-truth for each dataset area (e.g., dbt for warehouse, GE for ingestion)?
Practical projects
- Ingest orders data with an Airflow DAG, validate with GE, quarantine failures, and publish metrics.
- Add dbt tests to a star schema (staging + fact + dims), then create a run step that fails if tests fail.
- Stream user events through a validator that enforces schema and routes invalid messages to a DLQ with reasons.
Mini challenge
You have a model that sometimes drops ~2% of rows due to late-arriving events. Implement a two-tier rule: warn if drop rate is between 0.5% and 1.5%, error if above 1.5%, and log the distribution by source system. Outline where you would place this rule (ingestion vs transform vs exposure) and how you would measure it over time.
Next steps
- Add freshness and volume anomaly checks for critical tables.
- Standardize a rule template with severity, owner, and triage steps.
- Expand to more datasets; aim for coverage of your top data products.
- Introduce CI checks so PRs cannot merge if new rules fail on sample data.