Why this matters
As an Analytics Engineer, your transformations must run reliably, on schedule, and with clear visibility. Integrating dbt with an orchestrator (Airflow, Prefect, Dagster, or dbt Cloud jobs) lets you:
- Schedule daily/hourly dbt builds with retries and alerts.
- Gate deployments on tests so bad data never reaches dashboards.
- Backfill historical periods safely and reproducibly.
- Coordinate dbt with upstream loads (ELT) and downstream tasks (docs, notifications).
Concept explained simply
An orchestrator is a conductor. It decides when to run dbt, with which parameters, and what to do if something fails. dbt is the musician that transforms raw notes (data) into songs (models). You define the music (SQL + config), and the orchestrator ensures it performs on time, in order, and with proper monitoring.
Mental model
- Inputs: warehouse credentials, dbt project, environment (dev/prod), variables and tags.
- Action: run dbt commands (build, run, test, seed, snapshot) via CLI or API.
- Outputs: database tables/views, logs, and artifacts (manifest.json, run_results.json).
- Control: retries, timeouts, concurrency limits, and dependencies between tasks.
Common integration options
- dbt CLI in orchestrator: container or worker runs
dbt buildwith environment variables and profiles. - dbt Cloud job trigger: orchestrator calls a job via API; dbt Cloud handles execution and artifacts.
- Framework-specific integrations: e.g., Dagster's
dagster-dbtassets, or Prefect tasks for shell commands.
When to choose CLI vs dbt Cloud
- Choose CLI when you want full control over infrastructure, images, and costs.
- Choose dbt Cloud job triggers when you want managed runs, easy artifacts, and role-based access with less ops work.
Worked examples
Example 1: Airflow DAG runs dbt build for a tag
# airflow/dags/dbt_build_tag.py
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
"retries": 2,
}
dag = DAG(
dag_id="dbt_build_marketing_tag",
start_date=datetime(2024, 1, 1),
schedule="0 2 * * *", # 2 AM daily
catchup=False,
default_args=default_args,
)
dbt_env = {
"DBT_PROFILES_DIR": "/opt/airflow/dags/dbt_project/",
"DBT_TARGET": "prod",
}
build_marketing = BashOperator(
task_id="dbt_build_marketing",
bash_command="cd /opt/airflow/dags/dbt_project && dbt build --select tag:marketing",
env=dbt_env,
dag=dag,
)
Key points: use environment variables for credentials and profiles; pin schedule; add retries. Artifacts appear in target/ for parsing later.
Example 2: Prefect flow triggers dbt CLI
# flows/dbt_build.py
from prefect import flow, task
import subprocess, os
@task(retries=2, retry_delay_seconds=60)
def run_dbt_build(select):
env = os.environ.copy()
env["DBT_PROFILES_DIR"] = "/opt/dbt"
env["DBT_TARGET"] = "prod"
cmd = ["dbt", "build", "--select", select]
result = subprocess.run(cmd, cwd="/opt/dbt/project", env=env, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(result.stderr)
return result.stdout
@flow
def daily_dbt_build():
log = run_dbt_build("tag:core+ test_type:generic")
return log
if __name__ == "__main__":
daily_dbt_build()
Key points: encapsulate command in a retryable task; set working directory; capture logs; pass parameters via function arguments.
Example 3: Dagster with dagster-dbt
# assets/dbt_assets.py
from dagster_dbt import load_assets_from_dbt_project, DbtCliResource
from dagster import Definitions
dbt = DbtCliResource(project_dir="/opt/dbt/project", profiles_dir="/opt/dbt")
dbt_assets = load_assets_from_dbt_project(
project_dir="/opt/dbt/project",
profiles_dir="/opt/dbt",
)
defs = Definitions(
assets=[*dbt_assets],
resources={"dbt": dbt},
)
Key points: represent dbt models as assets; leverage built-in selection, partitions, and materialization; run with schedules or sensors.
Bonus: Triggering a dbt Cloud job
# Pseudocode HTTP call from any orchestrator task
# Use environment variables for token and job_id
POST https://cloud.getdbt.com/api/v2/accounts/{account_id}/jobs/{job_id}/run/
Headers: Authorization: Token {DBT_CLOUD_TOKEN}
Body: {{ "cause": "orchestrator run", "git_branch": "main", "schema_override": "prod" }}
Poll run status until success or failure; then fetch artifacts if needed.
Implementation steps
- Prepare environment: dbt project builds locally; profiles.yml configured; warehouse credentials via secrets.
- Decide execution mode: CLI in your orchestrator workers, or dbt Cloud job trigger.
- Create a minimal run: run
dbt build --select state:modified+or by tag. - Add reliability: retries, timeouts, and concurrency limits (one dbt prod run at a time).
- Add quality gates: fail pipeline if any dbt test severity=error fails.
- Capture artifacts: save
target/run_results.jsonandtarget/manifest.jsonfor later auditing. - Observability: emit structured logs and push metrics (duration, status, model counts).
- Backfills: parameterize date ranges or partition keys; run safely with limits.
Observability and artifacts
- Parse run_results.json to count failures and durations.
- Use manifest.json to list models built and their parents for targeted reruns.
- Upload artifacts to object storage for auditability.
Mini parser snippet for failures (pseudo-Python)
import json, sys
with open("target/run_results.json") as f:
data = json.load(f)
failures = [r for r in data.get("results", []) if r.get("status") != "success"]
if failures:
print(f"dbt failures: {len(failures)}")
sys.exit(1)
Security and secrets
- Never hardcode credentials. Use secret managers or orchestrator secrets.
- Grant least-privilege roles to dbt service accounts.
- Separate dev and prod profiles and schemas to avoid cross-contamination.
Common mistakes and how to self-check
- Running in the wrong target: Check logs for DBT_TARGET and schema prefix; enforce via environment validation step.
- No retries: Add at least 1–2 retries for transient warehouse issues; confirm retry policy in the DAG/flow.
- Ignoring test failures: Ensure the orchestrator fails the job if dbt tests fail; parse run_results.json or rely on dbt exit code.
- Concurrent prod runs: Add a concurrency limit or mutex; verify only one prod run is active.
- Missing artifacts: Confirm target/ contains manifest and run_results after each run and that they’re persisted.
Exercises
These mirror the exercises below. Do them in order. The quick test is available to everyone; only logged-in users get saved progress.
Exercise 1 — Airflow + dbt CLI (tagged build)
- Create an Airflow DAG that runs
dbt build --select tag:financedaily at 3 AM. - Pass DBT_PROFILES_DIR and DBT_TARGET=prod via env.
- Set 2 retries and a 30-minute timeout.
Need a hint?
- Use BashOperator with env dict.
- Set
dagrun_timeoutorexecution_timeouton the task.
Exercise 2 — Prefect flow with artifact check
- Write a Prefect flow that runs
dbt testand fails if any test fails. - Parse
target/run_results.jsonto enforce failure. - Return model failure names in the final log.
Need a hint?
- Use subprocess and capture_output.
- Open run_results.json and filter non-success statuses.
- [ ] I can run dbt CLI from my orchestrator worker.
- [ ] My pipeline fails when a dbt test fails.
- [ ] Artifacts are saved and accessible for audit.
- [ ] I can parameterize the selection (tags, models, state).
- [ ] I have retries and concurrency controls.
Mini challenge
Build a parameterized job that:
- Accepts a date or partition key.
- Runs
dbt build --select tag:incrementalwith that parameter as a var. - Backfills a past 7-day window by iterating over dates, with max 2 concurrent runs.
- Stops immediately if any day fails tests.
Practical projects
- Productionize a daily dbt build with quality gates, artifacts upload, and Slack/email notification on failure.
- Create a targeted rebuild pipeline that runs only models impacted by a changed directory (use
state:modified+selection). - Implement a safe backfill job for a large incremental model using small batch windows and concurrency limits.
Who this is for
- Analytics Engineers integrating dbt into production workflows.
- Data Engineers orchestrating ELT + dbt transformations.
- BI Developers needing reliable, test-gated model refreshes.
Prerequisites
- Basic dbt knowledge (models, tests, profiles).
- Familiarity with one orchestrator (Airflow, Prefect, or Dagster).
- Access to a SQL warehouse and a working dbt project.
Learning path
- Run dbt locally with tests and document artifacts.
- Wrap dbt in your orchestrator with retries and environment variables.
- Add quality gates and artifact parsing.
- Introduce scheduling, alerts, and backfills.
- Optimize with targeted selection, state-based runs, and concurrency controls.
Next steps
- Implement your first production schedule with a small model set.
- Add artifact parsing and a notification step.
- Take the quick test below to confirm understanding.
Quick Test availability
The quick test is available to everyone. Only logged-in users get saved progress and resume where they left off.