luvv to helpDiscover the Best Free Online Tools
Topic 8 of 8

Data Quality Framework Integration

Learn Data Quality Framework Integration for free with explanations, exercises, and a quick test (for Data Platform Engineer).

Published: January 11, 2026 | Updated: January 11, 2026

Why this matters

Integrating a data quality framework makes data pipelines trustworthy by default. As a Data Platform Engineer, you will:

  • Gate pipeline runs with automatic checks so bad data does not silently land in production.
  • Quarantine bad records for triage while allowing healthy data to progress.
  • Expose dataset health metrics and alerts to stakeholders.
  • Codify quality rules once and reuse them across batch, streaming, and warehouse jobs.
  • Support data contracts with producers and enforce them programmatically.

Quick test is available to everyone; sign in to save your progress across sessions.

Concept explained simply

A data quality framework is a reusable set of checks, tests, and metrics that verify data assumptions automatically where your data flows run (ETL/ELT, dbt, streams). Think of it as guardrails that run on every batch or message.

Mental model: 4 guardrails
  • Source contract: Validate schema and key constraints at ingestion.
  • Batch/Stream checks: Nulls, uniqueness, ranges, referential integrity, freshness.
  • Warehouse tests: Model-level assertions in dbt/SQL before exposure to BI.
  • Monitoring: Long-running SLOs (e.g., failure rate < 1%), anomaly detection, alerting.

Integration patterns and steps

  1. Define rule taxonomy and severity
    • Error: Stop publish and quarantine.
    • Warn: Continue but log metric and alert.
  2. Choose framework(s)
    • Batch/warehouse: Great Expectations, dbt tests, SQL assertions.
    • Spark/EMR: Deequ-like checks or SQL tests.
    • Streaming: Schema registry + validation + dead-letter queues.
  3. Wire into orchestration
    • Airflow/Prefect: Short-circuit or branch on failures.
    • dbt: Run tests before exposure steps.
    • Streams: Reject/route bad messages.
  4. Emit metrics
    • Track failures per rule, failure rate, row counts, freshness.
  5. Quarantine and triage
    • Store failing rows with rule names and timestamps.
Example: Batch checks with Great Expectations in an Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from datetime import datetime
import great_expectations as ge

def extract():
    # return a pandas DataFrame 'df' with columns: order_id, user_id, amount, order_ts
    ...

def run_quality(df):
    context = ge.get_context()
    suite_name = "orders_suite"
    # Create validator from runtime data
    from great_expectations.core.batch import RuntimeBatchRequest
    batch = RuntimeBatchRequest(
        datasource_name="my_pandas",
        data_connector_name="default_inferred_data_connector_name",
        data_asset_name="orders_df",
        runtime_parameters={"batch_data": df},
        batch_identifiers={"default_identifier_name": "run"},
    )
    validator = context.get_validator(batch_request=batch, expectation_suite_name=suite_name)
    validator.expect_column_values_to_not_be_null("order_id")
    validator.expect_column_values_to_be_unique("order_id")
    validator.expect_column_values_to_be_between("amount", min_value=0)
    result = validator.validate()
    return {"success": result.success, "result": result}

def short_circuit_fn(**ctx):
    return ctx["ti"].xcom_pull(key=None, task_ids="quality")["success"]

def load(df):
    # load df to warehouse if checks passed
    ...

def quarantine_fn(**ctx):
    # optional: write failed rows to a quarantine table for triage
    ...

dag = DAG("orders_with_quality", start_date=datetime(2024,1,1), schedule_interval=None, catchup=False)

t_extract = PythonOperator(task_id="extract", python_callable=extract, dag=dag)

t_quality = PythonOperator(task_id="quality", python_callable=run_quality, op_args=["{{ ti.xcom_pull('extract') }}"], dag=dag)

t_gate = ShortCircuitOperator(task_id="gate", python_callable=short_circuit_fn, dag=dag)

t_load = PythonOperator(task_id="load", python_callable=load, op_args=["{{ ti.xcom_pull('extract') }}"], dag=dag)

t_quarantine = PythonOperator(task_id="quarantine", python_callable=quarantine_fn, dag=dag)

t_extract >> t_quality >> [t_gate, t_quarantine]

t_gate >> t_load
Example: dbt model tests in schema.yml
version: 2
models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - not_null
          - unique
      - name: amount
        tests:
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'paid', 'cancelled']
    tests:
      - relationships:
          to: ref('dim_users')
          field: user_id
          severity: error
Example: Streaming with schema validation and DLQ
# Pseudocode
# - Enforce schema via registry (compatibility rules)
# - Validate message fields; if invalid, route to a dead-letter topic

def process_message(msg):
    if not msg.order_id or msg.amount < 0:
        send_to_dlq({"reason": "INVALID_FIELDS", "message": msg})
        return
    write_to_bronze(msg)

Worked examples

  1. Gate a batch load
    • Checks: not null order_id, unique order_id, amount >= 0.
    • Action: If any check fails, skip load and write failing rows to orders_quarantine with rule names.
  2. Protect warehouse models with dbt
    • Add schema.yml tests on stg_orders and fact_orders.
    • Run: dbt run — models staging; dbt test — models staging and fact before exposures.
  3. Streaming DLQ
    • When schema breaks or values are invalid, produce to orders_dlq with reason and partition by rule.
    • Metric: dq_failure_rate = dlq_messages / total_messages.

Who this is for

  • Data Platform Engineers wiring orchestration, storage, and governance.
  • Data Engineers building batch/streaming pipelines.
  • Analytics Engineers managing dbt models.

Prerequisites

  • Comfort with SQL (joins, constraints, window functions).
  • Basic Python for pipeline tasks.
  • Familiarity with orchestration (e.g., Airflow) and/or dbt.
  • Understanding of schemas, partitions, and warehouse tables.

Learning path

  1. List data assumptions for one dataset (keys, nullability, ranges, freshness).
  2. Add checks in one place (dbt or GE) and make them fail visibly.
  3. Introduce gating logic and quarantine target.
  4. Publish metrics and alerts (rule failure counts, failure rate).
  5. Scale to more datasets; standardize rule templates and severities.

Exercises

Do these hands-on tasks to anchor the concepts. Then take the quick test below.

  • Exercise 1: Add a data quality gate to an Airflow batch pipeline.
  • Exercise 2: Write dbt tests for a staging model.

Checklist: did you integrate it right?

  • Rules are code-reviewed and stored with the pipeline/model.
  • Failures stop publish steps for critical datasets.
  • Quarantine preserves failing rows with the rule name and timestamp.
  • Metrics show failure counts, failure rate, and freshness.
  • Alerts distinguish warn vs error with clear run links or IDs.
  • Developers can reproduce failures locally with the same rule set.

Common mistakes and self-checks

  • Only testing locally. Self-check: Can you trigger a failure in CI or the scheduler and see it block publish?
  • All-or-nothing failures. Self-check: Do you quarantine bad records but keep good ones when safe?
  • Rules drift from business reality. Self-check: Are rule owners and severities documented? Is there a review cadence?
  • No observability. Self-check: Do you have failure rate and freshness SLOs with alerts?
  • Duplicated rules across tools. Self-check: Is there a single source-of-truth for each dataset area (e.g., dbt for warehouse, GE for ingestion)?

Practical projects

  • Ingest orders data with an Airflow DAG, validate with GE, quarantine failures, and publish metrics.
  • Add dbt tests to a star schema (staging + fact + dims), then create a run step that fails if tests fail.
  • Stream user events through a validator that enforces schema and routes invalid messages to a DLQ with reasons.

Mini challenge

You have a model that sometimes drops ~2% of rows due to late-arriving events. Implement a two-tier rule: warn if drop rate is between 0.5% and 1.5%, error if above 1.5%, and log the distribution by source system. Outline where you would place this rule (ingestion vs transform vs exposure) and how you would measure it over time.

Next steps

  • Add freshness and volume anomaly checks for critical tables.
  • Standardize a rule template with severity, owner, and triage steps.
  • Expand to more datasets; aim for coverage of your top data products.
  • Introduce CI checks so PRs cannot merge if new rules fail on sample data.

Practice Exercises

2 exercises to complete

Instructions

You have a daily orders extract with columns: order_id, user_id, amount, order_ts. Integrate a data quality framework in an Airflow DAG so that:

  • Checks: not null order_id, unique order_id, amount >= 0.
  • On failure: skip the publish/load task and write failing rows with rule names to a quarantine table.
  • On success: load to the warehouse table dim/fact as usual.

Implement gating with a short-circuit or branching pattern. Expose a run-level metric dq_failure_rate = failed_rows / total_rows.

Expected Output
A DAG run that skips the load task when checks fail, writes a quarantine dataset containing failed rows and rule names, and records dq_failure_rate for the run.

Data Quality Framework Integration — Quick Test

Test your knowledge with 8 questions. Pass with 70% or higher.

8 questions70% to pass

Have questions about Data Quality Framework Integration?

AI Assistant

Ask questions about this tool