luvv to helpDiscover the Best Free Online Tools
Topic 2 of 8

Workflow Templates And Libraries

Learn Workflow Templates And Libraries for free with explanations, exercises, and a quick test (for Data Platform Engineer).

Published: January 11, 2026 | Updated: January 11, 2026

Why this matters

As a Data Platform Engineer, you support dozens or hundreds of pipelines. Workflow templates and libraries let you ship new pipelines fast, keep them consistent, and reduce incidents. Common tasks include:

  • Onboarding a new data source using a standard extract-load-transform pattern.
  • Rolling out a change (e.g., retry policy or alerting) to many workflows at once.
  • Enforcing naming conventions, tagging, and observability across the platform.
  • Parameterizing schedules, resources, and destinations per environment (dev/stage/prod).

Concept explained simply

A workflow template is a blueprint for a pipeline. You feed it parameters (tables, schedule, retries, connections) and it produces a ready-to-run workflow with the right tasks and wiring. A workflow library is the shared code that implements these templates and reusable tasks.

Mental model

Think of a template as a recipe and the parameters as ingredients. The library is your cookbook. When someone requests a new pipeline, you pick a recipe, insert ingredients, and the kitchen (orchestrator) prepares the dish consistently every time.

When to use a template vs. custom code
  • Template: repeatable patterns (ingest N tables, run quality checks, backfill).
  • Custom code: unique, one-off tasks where templating would add complexity.

Core building blocks

  • Template types: ingestion (ELT), transformation batches, backfills, quality-check jobs, sensor/trigger flows.
  • Parameters and config: table list, schedule, owner, retries, connections, resource sizes, tags, SLAs.
  • Rendering: simple string formatting or Jinja-like templating; configs in YAML/JSON for clarity.
  • Library structure: clear modules for operators/tasks, utilities, validators, and template factories.
  • Validation: JSON Schema or custom checks to validate input configurations before rendering.
  • Versioning: semantic versioning (MAJOR.MINOR.PATCH) for templates and task libraries.
  • Observability baked in: standard logging, metrics, alerts, and lineage annotations added by default.
Minimal parameter contract to standardize
  • name: workflow identifier
  • schedule: cron or interval
  • retries: count and delay
  • owner/team: for alerts
  • resources/tags: execution class, cost center, domain

Worked examples

Example 1: Airflow-style DAG factory (Python)

Show code and explanation

This pattern creates many similar DAGs from a single factory function. It enforces naming, retries, and observability.

# Pseudo-Airflow code for illustration
from datetime import timedelta

def create_ingest_dag(cfg):
    """
    cfg example:
    {
      "name": "ingest_sales",
      "schedule": "0 2 * * *",
      "tables": ["customers", "orders"],
      "retries": 2,
      "retry_delay_minutes": 5,
      "tags": ["domain:sales", "template:ingest"],
      "conn": {"source": "sales_api", "warehouse": "wh_conn"}
    }
    """
    dag_id = f"{cfg['name']}"
    default_args = {
        "retries": cfg.get("retries", 1),
        "retry_delay": timedelta(minutes=cfg.get("retry_delay_minutes", 5)),
        "owner": cfg.get("owner", "data-platform")
    }
    # dag = DAG(dag_id, schedule=cfg["schedule"], default_args=default_args, tags=cfg["tags"])  # real Airflow
    dag = {"dag_id": dag_id, "schedule": cfg["schedule"], "tasks": [], "tags": cfg["tags"]}  # pseudo

    for table in cfg["tables"]:
        extract_task_id = f"extract_{table}"
        load_task_id = f"load_{table}"
        # In real Airflow: BashOperator/PythonOperator with templated params
        dag["tasks"].append({"task_id": extract_task_id, "uses": "ExtractOperator", "conn": cfg["conn"]["source"]})
        dag["tasks"].append({"task_id": load_task_id, "uses": "LoadOperator", "conn": cfg["conn"]["warehouse"], "upstream": extract_task_id})

    return dag

cfg = {
  "name": "ingest_sales",
  "schedule": "0 2 * * *",
  "tables": ["customers", "orders"],
  "retries": 2,
  "retry_delay_minutes": 5,
  "tags": ["domain:sales", "template:ingest"],
  "conn": {"source": "sales_api", "warehouse": "wh_conn"}
}

print(create_ingest_dag(cfg))

Result: a consistent DAG where each table has extract then load, with standard retries and tags.

Example 2: Prefect-style flow template with YAML config

Show code and explanation
# Pseudo-Prefect example
from typing import Dict

# Reusable tasks
def extract(table: str, source_conn: str):
    print(f"Extracting {table} from {source_conn}")

def load(table: str, warehouse_conn: str):
    print(f"Loading {table} into {warehouse_conn}")

# Template (factory)
def build_ingest_flow(cfg: Dict):
    flow_name = cfg["name"]
    tables = cfg["tables"]
    source = cfg["conn"]["source"]
    wh = cfg["conn"]["warehouse"]

    def run():  # represents a Prefect flow
        for t in tables:
            extract(t, source)
            load(t, wh)
    return {"name": flow_name, "run": run, "tags": cfg.get("tags", [])}

# YAML-like config
cfg = {
  "name": "ingest_marketing",
  "tables": ["campaigns", "clicks"],
  "conn": {"source": "ga_api", "warehouse": "wh_conn"},
  "tags": ["domain:marketing", "template:ingest"]
}

flow = build_ingest_flow(cfg)
flow["run"]()

The template returns a flow object with a run function and metadata. In a real setup, you would define @flow and @task decorators, but the pattern is the same.

Example 3: Dagster-style reusable ops and job factory

Show code and explanation
# Pseudo-Dagster example focusing on template composition

def ingest_job_factory(cfg):
    ops = []
    for t in cfg["tables"]:
        ops.append({"op_name": f"extract_{t}", "impl": "extract_op", "params": {"table": t}})
        ops.append({"op_name": f"load_{t}", "impl": "load_op", "after": f"extract_{t}"})
    return {"job_name": cfg["name"], "ops": ops, "tags": cfg.get("tags", [])}

print(ingest_job_factory({
  "name": "ingest_finance",
  "tables": ["invoices", "payments"],
  "tags": ["domain:finance"]
}))

Factories keep the job topology consistent across domains and make rollouts easier.

Designing your template library

Step 1 — Define the contract

Decide on required inputs (name, schedule, tables, connections) and validation rules. Keep names snake_case and deterministic.

Step 2 — Start with the smallest useful template

Begin with an ingest template that does extract → load, plus standard retries, tags, and alerts. Avoid premature complexity.

Step 3 — Add config validation

Fail fast if required fields are missing or names break conventions.

Step 4 — Package and version

Publish your library as a versioned artifact. Use semantic versioning. Document breaking changes.

Step 5 — Bake in observability

Standardize logs, metrics, lineage tags, and alert routing. Make it impossible to create a blind workflow.

Governance and versioning

  • Semantic versioning: MAJOR for breaking changes, MINOR for new features, PATCH for fixes.
  • Deprecation policy: allow both old and new params for one MINOR release before removal.
  • Compatibility tests: run a suite of "golden" configs through the template on each change.
  • Security: keep secrets out of configs; reference secret managers or connections by name.

Exercises

Complete the exercise below. You can take the Quick Test after practicing.

Exercise 1 — Build a minimal workflow template (platform-agnostic)

Goal: Write a function that takes a configuration and returns a rendered workflow object with two tasks per table: extract_{table} then load_{table}. Include schedule, retries, and tags.

Instructions

1) Use this configuration:

cfg = {
  "name": "ingest_demo",
  "schedule": "0 3 * * *",
  "tables": ["customers", "orders"],
  "retries": 2,
  "tags": ["domain:demo", "template:ingest"]
}

2) Implement build_workflow(cfg) returning a dict:

{
  "name": str,
  "schedule": str,
  "retries": int,
  "tags": list[str],
  "tasks": [
    {"id": "extract_customers", "after": None},
    {"id": "load_customers", "after": "extract_customers"},
    ...
  ]
}

3) Enforce snake_case names and the extract → load ordering for each table.

Checklist

  • Returns a structured dict with required keys.
  • Task IDs follow extract_{table} and load_{table} naming.
  • load tasks depend on their corresponding extract tasks.
  • Schedule, retries, and tags copied from cfg.

Common mistakes and how to self-check

Over-templating simple workflows

Signal: Many optional params and complex branching for rare cases. Fix: Split into two templates or keep unique cases custom.

Inconsistent parameter names

Signal: "tableList" in one place, "tables" in another. Fix: Write a schema and validate configs before rendering.

Hardcoding environment details

Signal: Dev/prod endpoints baked into code. Fix: Move to config; keep code environment-agnostic.

No versioning

Signal: Pipelines break after library updates. Fix: Use semantic versions and changelogs; pin versions per environment.

Secrets in configs

Signal: API keys in YAML. Fix: Reference secret names; load at runtime via the orchestrator's secret store.

Practical projects

  • Build an ingestion template that supports N tables, retries, alerts, and tags. Package it as a small library.
  • Create a configuration validator that rejects bad schedules, table names, or missing connections.
  • Add a quality-check add-on: automatically insert row-count checks after load.
  • Write golden tests: render sample configs and assert the task graph and metadata.

Learning path

  1. Understand orchestrator basics: tasks, dependencies, schedules, retries.
  2. Learn configuration-driven design (YAML/JSON) and validation.
  3. Build a minimal template and package it.
  4. Add observability and quality checks to the template.
  5. Introduce governance: versioning, deprecation, documentation.

Who this is for

  • Data Platform Engineers standardizing pipelines across domains.
  • Data Engineers maintaining many similar workflows.
  • Teams migrating to a new orchestrator and seeking consistency.

Prerequisites

  • Comfortable with Python basics.
  • Familiarity with YAML/JSON configuration files.
  • Basic Git workflow and code reviews.
  • Understanding of scheduling concepts (cron, intervals) and retries.

Next steps

  • Add a backfill template that accepts a date range and parallelizes by partition.
  • Introduce a sensor/trigger template for event-driven pipelines.
  • Document your parameter schema and share usage examples with your team.

Mini challenge

Extend the ingestion template to support three storage backends (e.g., object storage A, B, C) using a single parameter backend_type. Ensure the correct connector task is inserted based on the backend_type. Keep the parameter contract backwards compatible.

Quick Test

Take the quick test to check your understanding. Everyone can take it; sign in if you want your progress saved.

Practice Exercises

1 exercises to complete

Instructions

Write a function build_workflow(cfg) that returns a rendered workflow object from the configuration below. The workflow must include two tasks per table: extract_{table} then load_{table}. load depends on its matching extract. Copy schedule, retries, and tags from cfg.

cfg = {
  "name": "ingest_demo",
  "schedule": "0 3 * * *",
  "tables": ["customers", "orders"],
  "retries": 2,
  "tags": ["domain:demo", "template:ingest"]
}

Your function should return:

{
  "name": str,
  "schedule": str,
  "retries": int,
  "tags": list,
  "tasks": [ {"id": str, "after": str or null}, ... ]
}
Expected Output
{ "name": "ingest_demo", "schedule": "0 3 * * *", "retries": 2, "tags": ["domain:demo", "template:ingest"], "tasks": [ {"id": "extract_customers", "after": null}, {"id": "load_customers", "after": "extract_customers"}, {"id": "extract_orders", "after": null}, {"id": "load_orders", "after": "extract_orders"} ] }

Workflow Templates And Libraries — Quick Test

Test your knowledge with 9 questions. Pass with 70% or higher.

9 questions70% to pass

Have questions about Workflow Templates And Libraries?

AI Assistant

Ask questions about this tool