luvv to helpDiscover the Best Free Online Tools
Topic 8 of 8

Integrating dbt Runs Into Orchestrators

Learn Integrating dbt Runs Into Orchestrators for free with explanations, exercises, and a quick test (for Analytics Engineer).

Published: December 23, 2025 | Updated: December 23, 2025

Why this matters

As an Analytics Engineer, your transformations must run reliably, on schedule, and with clear visibility. Integrating dbt with an orchestrator (Airflow, Prefect, Dagster, or dbt Cloud jobs) lets you:

  • Schedule daily/hourly dbt builds with retries and alerts.
  • Gate deployments on tests so bad data never reaches dashboards.
  • Backfill historical periods safely and reproducibly.
  • Coordinate dbt with upstream loads (ELT) and downstream tasks (docs, notifications).

Concept explained simply

An orchestrator is a conductor. It decides when to run dbt, with which parameters, and what to do if something fails. dbt is the musician that transforms raw notes (data) into songs (models). You define the music (SQL + config), and the orchestrator ensures it performs on time, in order, and with proper monitoring.

Mental model

  • Inputs: warehouse credentials, dbt project, environment (dev/prod), variables and tags.
  • Action: run dbt commands (build, run, test, seed, snapshot) via CLI or API.
  • Outputs: database tables/views, logs, and artifacts (manifest.json, run_results.json).
  • Control: retries, timeouts, concurrency limits, and dependencies between tasks.

Common integration options

  • dbt CLI in orchestrator: container or worker runs dbt build with environment variables and profiles.
  • dbt Cloud job trigger: orchestrator calls a job via API; dbt Cloud handles execution and artifacts.
  • Framework-specific integrations: e.g., Dagster's dagster-dbt assets, or Prefect tasks for shell commands.
When to choose CLI vs dbt Cloud
  • Choose CLI when you want full control over infrastructure, images, and costs.
  • Choose dbt Cloud job triggers when you want managed runs, easy artifacts, and role-based access with less ops work.

Worked examples

Example 1: Airflow DAG runs dbt build for a tag

# airflow/dags/dbt_build_tag.py
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    "retries": 2,
}

dag = DAG(
    dag_id="dbt_build_marketing_tag",
    start_date=datetime(2024, 1, 1),
    schedule="0 2 * * *",  # 2 AM daily
    catchup=False,
    default_args=default_args,
)

dbt_env = {
    "DBT_PROFILES_DIR": "/opt/airflow/dags/dbt_project/",
    "DBT_TARGET": "prod",
}

build_marketing = BashOperator(
    task_id="dbt_build_marketing",
    bash_command="cd /opt/airflow/dags/dbt_project && dbt build --select tag:marketing",
    env=dbt_env,
    dag=dag,
)

Key points: use environment variables for credentials and profiles; pin schedule; add retries. Artifacts appear in target/ for parsing later.

Example 2: Prefect flow triggers dbt CLI

# flows/dbt_build.py
from prefect import flow, task
import subprocess, os

@task(retries=2, retry_delay_seconds=60)
def run_dbt_build(select):
    env = os.environ.copy()
    env["DBT_PROFILES_DIR"] = "/opt/dbt"
    env["DBT_TARGET"] = "prod"
    cmd = ["dbt", "build", "--select", select]
    result = subprocess.run(cmd, cwd="/opt/dbt/project", env=env, capture_output=True, text=True)
    if result.returncode != 0:
        raise RuntimeError(result.stderr)
    return result.stdout

@flow
def daily_dbt_build():
    log = run_dbt_build("tag:core+ test_type:generic")
    return log

if __name__ == "__main__":
    daily_dbt_build()

Key points: encapsulate command in a retryable task; set working directory; capture logs; pass parameters via function arguments.

Example 3: Dagster with dagster-dbt

# assets/dbt_assets.py
from dagster_dbt import load_assets_from_dbt_project, DbtCliResource
from dagster import Definitions

dbt = DbtCliResource(project_dir="/opt/dbt/project", profiles_dir="/opt/dbt")

dbt_assets = load_assets_from_dbt_project(
    project_dir="/opt/dbt/project",
    profiles_dir="/opt/dbt",
)

defs = Definitions(
    assets=[*dbt_assets],
    resources={"dbt": dbt},
)

Key points: represent dbt models as assets; leverage built-in selection, partitions, and materialization; run with schedules or sensors.

Bonus: Triggering a dbt Cloud job
# Pseudocode HTTP call from any orchestrator task
# Use environment variables for token and job_id
POST https://cloud.getdbt.com/api/v2/accounts/{account_id}/jobs/{job_id}/run/
Headers: Authorization: Token {DBT_CLOUD_TOKEN}
Body: {{ "cause": "orchestrator run", "git_branch": "main", "schema_override": "prod" }}

Poll run status until success or failure; then fetch artifacts if needed.

Implementation steps

  1. Prepare environment: dbt project builds locally; profiles.yml configured; warehouse credentials via secrets.
  2. Decide execution mode: CLI in your orchestrator workers, or dbt Cloud job trigger.
  3. Create a minimal run: run dbt build --select state:modified+ or by tag.
  4. Add reliability: retries, timeouts, and concurrency limits (one dbt prod run at a time).
  5. Add quality gates: fail pipeline if any dbt test severity=error fails.
  6. Capture artifacts: save target/run_results.json and target/manifest.json for later auditing.
  7. Observability: emit structured logs and push metrics (duration, status, model counts).
  8. Backfills: parameterize date ranges or partition keys; run safely with limits.

Observability and artifacts

  • Parse run_results.json to count failures and durations.
  • Use manifest.json to list models built and their parents for targeted reruns.
  • Upload artifacts to object storage for auditability.
Mini parser snippet for failures (pseudo-Python)
import json, sys
with open("target/run_results.json") as f:
    data = json.load(f)
failures = [r for r in data.get("results", []) if r.get("status") != "success"]
if failures:
    print(f"dbt failures: {len(failures)}")
    sys.exit(1)

Security and secrets

  • Never hardcode credentials. Use secret managers or orchestrator secrets.
  • Grant least-privilege roles to dbt service accounts.
  • Separate dev and prod profiles and schemas to avoid cross-contamination.

Common mistakes and how to self-check

  • Running in the wrong target: Check logs for DBT_TARGET and schema prefix; enforce via environment validation step.
  • No retries: Add at least 1–2 retries for transient warehouse issues; confirm retry policy in the DAG/flow.
  • Ignoring test failures: Ensure the orchestrator fails the job if dbt tests fail; parse run_results.json or rely on dbt exit code.
  • Concurrent prod runs: Add a concurrency limit or mutex; verify only one prod run is active.
  • Missing artifacts: Confirm target/ contains manifest and run_results after each run and that they’re persisted.

Exercises

These mirror the exercises below. Do them in order. The quick test is available to everyone; only logged-in users get saved progress.

Exercise 1 — Airflow + dbt CLI (tagged build)

  • Create an Airflow DAG that runs dbt build --select tag:finance daily at 3 AM.
  • Pass DBT_PROFILES_DIR and DBT_TARGET=prod via env.
  • Set 2 retries and a 30-minute timeout.
Need a hint?
  • Use BashOperator with env dict.
  • Set dagrun_timeout or execution_timeout on the task.

Exercise 2 — Prefect flow with artifact check

  • Write a Prefect flow that runs dbt test and fails if any test fails.
  • Parse target/run_results.json to enforce failure.
  • Return model failure names in the final log.
Need a hint?
  • Use subprocess and capture_output.
  • Open run_results.json and filter non-success statuses.
  • [ ] I can run dbt CLI from my orchestrator worker.
  • [ ] My pipeline fails when a dbt test fails.
  • [ ] Artifacts are saved and accessible for audit.
  • [ ] I can parameterize the selection (tags, models, state).
  • [ ] I have retries and concurrency controls.

Mini challenge

Build a parameterized job that:

  • Accepts a date or partition key.
  • Runs dbt build --select tag:incremental with that parameter as a var.
  • Backfills a past 7-day window by iterating over dates, with max 2 concurrent runs.
  • Stops immediately if any day fails tests.

Practical projects

  • Productionize a daily dbt build with quality gates, artifacts upload, and Slack/email notification on failure.
  • Create a targeted rebuild pipeline that runs only models impacted by a changed directory (use state:modified+ selection).
  • Implement a safe backfill job for a large incremental model using small batch windows and concurrency limits.

Who this is for

  • Analytics Engineers integrating dbt into production workflows.
  • Data Engineers orchestrating ELT + dbt transformations.
  • BI Developers needing reliable, test-gated model refreshes.

Prerequisites

  • Basic dbt knowledge (models, tests, profiles).
  • Familiarity with one orchestrator (Airflow, Prefect, or Dagster).
  • Access to a SQL warehouse and a working dbt project.

Learning path

  1. Run dbt locally with tests and document artifacts.
  2. Wrap dbt in your orchestrator with retries and environment variables.
  3. Add quality gates and artifact parsing.
  4. Introduce scheduling, alerts, and backfills.
  5. Optimize with targeted selection, state-based runs, and concurrency controls.

Next steps

  • Implement your first production schedule with a small model set.
  • Add artifact parsing and a notification step.
  • Take the quick test below to confirm understanding.

Quick Test availability

The quick test is available to everyone. Only logged-in users get saved progress and resume where they left off.

Practice Exercises

2 exercises to complete

Instructions

Create an Airflow DAG that runs dbt build --select tag:finance daily at 03:00, with:

  • Environment vars: DBT_PROFILES_DIR, DBT_TARGET=prod
  • 2 retries, 30m execution timeout
  • Artifacts persisted in target/

Name the DAG dbt_build_finance_tag.

Expected Output
Airflow shows a successful run at 03:00 with dbt logs. Artifacts manifest.json and run_results.json appear in target/. Task fails on dbt test failure.

Integrating dbt Runs Into Orchestrators — Quick Test

Test your knowledge with 8 questions. Pass with 70% or higher.

8 questions70% to pass

Have questions about Integrating dbt Runs Into Orchestrators?

AI Assistant

Ask questions about this tool