Why this matters
As a Data Platform Engineer, you support dozens or hundreds of pipelines. Workflow templates and libraries let you ship new pipelines fast, keep them consistent, and reduce incidents. Common tasks include:
- Onboarding a new data source using a standard extract-load-transform pattern.
- Rolling out a change (e.g., retry policy or alerting) to many workflows at once.
- Enforcing naming conventions, tagging, and observability across the platform.
- Parameterizing schedules, resources, and destinations per environment (dev/stage/prod).
Concept explained simply
A workflow template is a blueprint for a pipeline. You feed it parameters (tables, schedule, retries, connections) and it produces a ready-to-run workflow with the right tasks and wiring. A workflow library is the shared code that implements these templates and reusable tasks.
Mental model
Think of a template as a recipe and the parameters as ingredients. The library is your cookbook. When someone requests a new pipeline, you pick a recipe, insert ingredients, and the kitchen (orchestrator) prepares the dish consistently every time.
When to use a template vs. custom code
- Template: repeatable patterns (ingest N tables, run quality checks, backfill).
- Custom code: unique, one-off tasks where templating would add complexity.
Core building blocks
- Template types: ingestion (ELT), transformation batches, backfills, quality-check jobs, sensor/trigger flows.
- Parameters and config: table list, schedule, owner, retries, connections, resource sizes, tags, SLAs.
- Rendering: simple string formatting or Jinja-like templating; configs in YAML/JSON for clarity.
- Library structure: clear modules for operators/tasks, utilities, validators, and template factories.
- Validation: JSON Schema or custom checks to validate input configurations before rendering.
- Versioning: semantic versioning (MAJOR.MINOR.PATCH) for templates and task libraries.
- Observability baked in: standard logging, metrics, alerts, and lineage annotations added by default.
Minimal parameter contract to standardize
- name: workflow identifier
- schedule: cron or interval
- retries: count and delay
- owner/team: for alerts
- resources/tags: execution class, cost center, domain
Worked examples
Example 1: Airflow-style DAG factory (Python)
Show code and explanation
This pattern creates many similar DAGs from a single factory function. It enforces naming, retries, and observability.
# Pseudo-Airflow code for illustration
from datetime import timedelta
def create_ingest_dag(cfg):
"""
cfg example:
{
"name": "ingest_sales",
"schedule": "0 2 * * *",
"tables": ["customers", "orders"],
"retries": 2,
"retry_delay_minutes": 5,
"tags": ["domain:sales", "template:ingest"],
"conn": {"source": "sales_api", "warehouse": "wh_conn"}
}
"""
dag_id = f"{cfg['name']}"
default_args = {
"retries": cfg.get("retries", 1),
"retry_delay": timedelta(minutes=cfg.get("retry_delay_minutes", 5)),
"owner": cfg.get("owner", "data-platform")
}
# dag = DAG(dag_id, schedule=cfg["schedule"], default_args=default_args, tags=cfg["tags"]) # real Airflow
dag = {"dag_id": dag_id, "schedule": cfg["schedule"], "tasks": [], "tags": cfg["tags"]} # pseudo
for table in cfg["tables"]:
extract_task_id = f"extract_{table}"
load_task_id = f"load_{table}"
# In real Airflow: BashOperator/PythonOperator with templated params
dag["tasks"].append({"task_id": extract_task_id, "uses": "ExtractOperator", "conn": cfg["conn"]["source"]})
dag["tasks"].append({"task_id": load_task_id, "uses": "LoadOperator", "conn": cfg["conn"]["warehouse"], "upstream": extract_task_id})
return dag
cfg = {
"name": "ingest_sales",
"schedule": "0 2 * * *",
"tables": ["customers", "orders"],
"retries": 2,
"retry_delay_minutes": 5,
"tags": ["domain:sales", "template:ingest"],
"conn": {"source": "sales_api", "warehouse": "wh_conn"}
}
print(create_ingest_dag(cfg))
Result: a consistent DAG where each table has extract then load, with standard retries and tags.
Example 2: Prefect-style flow template with YAML config
Show code and explanation
# Pseudo-Prefect example
from typing import Dict
# Reusable tasks
def extract(table: str, source_conn: str):
print(f"Extracting {table} from {source_conn}")
def load(table: str, warehouse_conn: str):
print(f"Loading {table} into {warehouse_conn}")
# Template (factory)
def build_ingest_flow(cfg: Dict):
flow_name = cfg["name"]
tables = cfg["tables"]
source = cfg["conn"]["source"]
wh = cfg["conn"]["warehouse"]
def run(): # represents a Prefect flow
for t in tables:
extract(t, source)
load(t, wh)
return {"name": flow_name, "run": run, "tags": cfg.get("tags", [])}
# YAML-like config
cfg = {
"name": "ingest_marketing",
"tables": ["campaigns", "clicks"],
"conn": {"source": "ga_api", "warehouse": "wh_conn"},
"tags": ["domain:marketing", "template:ingest"]
}
flow = build_ingest_flow(cfg)
flow["run"]()
The template returns a flow object with a run function and metadata. In a real setup, you would define @flow and @task decorators, but the pattern is the same.
Example 3: Dagster-style reusable ops and job factory
Show code and explanation
# Pseudo-Dagster example focusing on template composition
def ingest_job_factory(cfg):
ops = []
for t in cfg["tables"]:
ops.append({"op_name": f"extract_{t}", "impl": "extract_op", "params": {"table": t}})
ops.append({"op_name": f"load_{t}", "impl": "load_op", "after": f"extract_{t}"})
return {"job_name": cfg["name"], "ops": ops, "tags": cfg.get("tags", [])}
print(ingest_job_factory({
"name": "ingest_finance",
"tables": ["invoices", "payments"],
"tags": ["domain:finance"]
}))
Factories keep the job topology consistent across domains and make rollouts easier.
Designing your template library
Step 1 — Define the contract
Decide on required inputs (name, schedule, tables, connections) and validation rules. Keep names snake_case and deterministic.
Step 2 — Start with the smallest useful template
Begin with an ingest template that does extract → load, plus standard retries, tags, and alerts. Avoid premature complexity.
Step 3 — Add config validation
Fail fast if required fields are missing or names break conventions.
Step 4 — Package and version
Publish your library as a versioned artifact. Use semantic versioning. Document breaking changes.
Step 5 — Bake in observability
Standardize logs, metrics, lineage tags, and alert routing. Make it impossible to create a blind workflow.
Governance and versioning
- Semantic versioning: MAJOR for breaking changes, MINOR for new features, PATCH for fixes.
- Deprecation policy: allow both old and new params for one MINOR release before removal.
- Compatibility tests: run a suite of "golden" configs through the template on each change.
- Security: keep secrets out of configs; reference secret managers or connections by name.
Exercises
Complete the exercise below. You can take the Quick Test after practicing.
Exercise 1 — Build a minimal workflow template (platform-agnostic)
Goal: Write a function that takes a configuration and returns a rendered workflow object with two tasks per table: extract_{table} then load_{table}. Include schedule, retries, and tags.
Instructions
1) Use this configuration:
cfg = {
"name": "ingest_demo",
"schedule": "0 3 * * *",
"tables": ["customers", "orders"],
"retries": 2,
"tags": ["domain:demo", "template:ingest"]
}
2) Implement build_workflow(cfg) returning a dict:
{
"name": str,
"schedule": str,
"retries": int,
"tags": list[str],
"tasks": [
{"id": "extract_customers", "after": None},
{"id": "load_customers", "after": "extract_customers"},
...
]
}
3) Enforce snake_case names and the extract → load ordering for each table.
Checklist
- Returns a structured dict with required keys.
- Task IDs follow extract_{table} and load_{table} naming.
- load tasks depend on their corresponding extract tasks.
- Schedule, retries, and tags copied from cfg.
Common mistakes and how to self-check
Over-templating simple workflows
Signal: Many optional params and complex branching for rare cases. Fix: Split into two templates or keep unique cases custom.
Inconsistent parameter names
Signal: "tableList" in one place, "tables" in another. Fix: Write a schema and validate configs before rendering.
Hardcoding environment details
Signal: Dev/prod endpoints baked into code. Fix: Move to config; keep code environment-agnostic.
No versioning
Signal: Pipelines break after library updates. Fix: Use semantic versions and changelogs; pin versions per environment.
Secrets in configs
Signal: API keys in YAML. Fix: Reference secret names; load at runtime via the orchestrator's secret store.
Practical projects
- Build an ingestion template that supports N tables, retries, alerts, and tags. Package it as a small library.
- Create a configuration validator that rejects bad schedules, table names, or missing connections.
- Add a quality-check add-on: automatically insert row-count checks after load.
- Write golden tests: render sample configs and assert the task graph and metadata.
Learning path
- Understand orchestrator basics: tasks, dependencies, schedules, retries.
- Learn configuration-driven design (YAML/JSON) and validation.
- Build a minimal template and package it.
- Add observability and quality checks to the template.
- Introduce governance: versioning, deprecation, documentation.
Who this is for
- Data Platform Engineers standardizing pipelines across domains.
- Data Engineers maintaining many similar workflows.
- Teams migrating to a new orchestrator and seeking consistency.
Prerequisites
- Comfortable with Python basics.
- Familiarity with YAML/JSON configuration files.
- Basic Git workflow and code reviews.
- Understanding of scheduling concepts (cron, intervals) and retries.
Next steps
- Add a backfill template that accepts a date range and parallelizes by partition.
- Introduce a sensor/trigger template for event-driven pipelines.
- Document your parameter schema and share usage examples with your team.
Mini challenge
Extend the ingestion template to support three storage backends (e.g., object storage A, B, C) using a single parameter backend_type. Ensure the correct connector task is inserted based on the backend_type. Keep the parameter contract backwards compatible.
Quick Test
Take the quick test to check your understanding. Everyone can take it; sign in if you want your progress saved.