luvv to helpDiscover the Best Free Online Tools
Topic 6 of 10

Async And Concurrency Basics

Learn Async And Concurrency Basics for free with explanations, exercises, and a quick test (for Machine Learning Engineer).

Published: January 1, 2026 | Updated: January 1, 2026

Why this matters for Machine Learning Engineers

Async and concurrency help you move more data, finish experiments sooner, and keep production inference responsive. As a Machine Learning Engineer, you will often need to:

  • Ingest datasets from multiple APIs or cloud buckets at once without blocking.
  • Prefetch and stream batches to GPUs while the model trains.
  • Run parallel preprocessing on large corpora (images, text, logs).
  • Serve inference to many users concurrently, with timeouts and backpressure.
  • Automate many small experiments or evaluations in parallel.

Who this is for

  • Beginner–intermediate Python users building ML data pipelines, training loops, or model services.
  • Engineers who want to speed up IO-bound tasks (downloads, file reads) and scale CPU-bound preprocessing.

Prerequisites

  • Comfortable with Python functions, modules, and basic packages.
  • Know how to run scripts from the command line.
  • Basic understanding of ML workflows (data loading, preprocessing, training, inference).

Concept explained simply

Concurrency lets a program handle multiple tasks by efficiently switching between them. Parallelism runs tasks at the exact same time on different CPU cores.

  • Asyncio: Single-threaded concurrency for IO-bound work. You write async def functions and await non-blocking operations (like network or disk IO).
  • Threads: Good for IO-bound tasks that use blocking libraries. Python threads share memory. The GIL limits speedups on CPU-bound work but IO can still scale well.
  • Processes: True parallel computation for CPU-bound tasks (feature extraction, image transforms). Each process has its own Python interpreter and memory.

A quick mental model

Imagine a restaurant:

  • Asyncio: One skilled chef managing multiple dishes by switching while others cook in the oven (waiting on IO).
  • Threads: Multiple assistants doing separate phone orders (blocking IO calls).
  • Processes: Multiple chefs in separate kitchens cooking heavy dishes in parallel (CPU-bound work).
Deep dive: GIL and when it matters

The Global Interpreter Lock (GIL) lets only one thread execute Python bytecode at a time. For CPU-bound code, threads won't speed up compute-heavy operations. For IO-bound code, threads can still run while other threads wait on IO.

Key building blocks

  • Asyncio: async def, await, asyncio.run, asyncio.create_task, asyncio.gather, asyncio.as_completed, asyncio.Semaphore, asyncio.wait_for, cancellation.
  • Threads: threading.Thread, concurrent.futures.ThreadPoolExecutor, Lock, Queue. Great when a library is blocking (e.g., file, network, DB clients).
  • Processes: multiprocessing, concurrent.futures.ProcessPoolExecutor. Use for CPU-bound work; remember if __name__ == "__main__" guard (especially on Windows/macOS).
  • Patterns: Producer–consumer queues, pipeline stages, batching, rate limiting, backpressure, timeouts, graceful shutdown.

Worked examples

Example 1 — Asyncio prefetch for IO-bound tasks

import asyncio, random

async def fetch_item(i, sem):
    async with sem:  # rate limit
        await asyncio.sleep(random.uniform(0.05, 0.2))  # simulate network
        return f"item-{i}"

async def main():
    sem = asyncio.Semaphore(10)
    tasks = [asyncio.create_task(fetch_item(i, sem)) for i in range(30)]
    try:
        results = await asyncio.wait_for(asyncio.gather(*tasks), timeout=2.0)
        print("Fetched:", len(results))
    except asyncio.TimeoutError:
        for t in tasks:
            t.cancel()
        print("Timed out; cancelled remaining tasks")

if __name__ == "__main__":
    asyncio.run(main())

Why it helps: Keep your pipeline busy while waiting on network/disk.

Example 2 — Threads for file-like IO work

from concurrent.futures import ThreadPoolExecutor, as_completed
import time, random

def read_log(i):  # blocking IO simulation
    time.sleep(random.uniform(0.05, 0.15))
    return f"log-{i}" * random.randint(1, 4)

def main():
    total = 0
    with ThreadPoolExecutor(max_workers=8) as ex:
        futures = [ex.submit(read_log, i) for i in range(100)]
        for f in as_completed(futures):
            total += len(f.result())
    print("Processed:", len(futures), "Total chars:", total)

if __name__ == "__main__":
    main()

Why it helps: Works with libraries that aren’t async-friendly.

Example 3 — Processes for CPU-bound work

from concurrent.futures import ProcessPoolExecutor

def is_prime(n):
    if n < 2: return False
    if n % 2 == 0: return n == 2
    r = int(n ** 0.5)
    for k in range(3, r + 1, 2):
        if n % k == 0:
            return False
    return True

def count_primes(limit):
    return sum(1 for i in range(limit) if is_prime(i))

def main():
    limits = [50_000, 60_000, 70_000, 80_000]
    with ProcessPoolExecutor() as ex:
        counts = list(ex.map(count_primes, limits))
    print(list(zip(limits, counts)))

if __name__ == "__main__":
    main()

Why it helps: Speed up heavy preprocessing by using multiple cores.

How to choose: decision checklist

  • If it waits on network/disk: prefer asyncio. If the library is blocking and not async: use threads.
  • If it burns CPU: use processes (or native extensions that release the GIL).
  • Need both? Use asyncio for IO stages, and offload CPU steps with a ProcessPool.
  • Apply rate limits, timeouts, and cancellation to stay reliable under load.

Exercises

Note: The quick test is available to everyone; only logged-in users get saved progress.

  1. Async batch fetch with rate limit
    • Write an asyncio program that fetches 50 simulated items concurrently.
    • Use asyncio.Semaphore(10) to limit concurrency.
    • Ensure results are returned in the same order as inputs.
    • Add a global timeout of 3 seconds.
  2. Threaded IO pipeline
    • Use ThreadPoolExecutor(max_workers=8) to simulate downloading 30 files (sleep) and computing a short hash (e.g., length or hashlib).
    • Print the count of completed items and the first 5 hashes.
  3. Parallel CPU preprocessing
    • Use ProcessPoolExecutor to run a CPU-heavy function across 4 inputs.
    • Return results in the same order as inputs and print a list of tuples (input, output).
  • Self-check checklist:
    • No blocking time.sleep inside async def (use asyncio.sleep).
    • Threaded code avoids shared mutable state or uses locks.
    • Process code guarded by if __name__ == "__main__".
    • Time-limited tasks cancel cleanly without hanging.

Common mistakes and how to self-check

  • Blocking the event loop: Using time.sleep or CPU-heavy loops inside async def. Fix: await asyncio.sleep or offload CPU to a process pool.
  • Assuming threads speed up CPU code: GIL prevents true parallelism for pure Python CPU tasks. Use processes.
  • Forgetting timeouts and cancellation: Leads to stuck pipelines. Use asyncio.wait_for and cancel pending tasks.
  • Race conditions in threads: Shared data without locks/queues. Use Lock or design message-passing.
  • Huge object transfer between processes: Serialization overhead kills speed. Send IDs/paths, not large arrays; load inside the process.

Practical projects

  • Async dataset ingestor: concurrently fetch metadata for 5,000 items with rate limiting and retries; write to a local file.
  • Threaded log aggregator: read many small log files concurrently, extract metrics, and write rolling summaries.
  • Multiprocess image preprocessor: resize and normalize images across cores; measure speedup vs single process.

Learning path

  • Async and Concurrency Basics (this page)
  • Advanced asyncio patterns: queues, backpressure, structured concurrency
  • Multiprocessing and shared memory for numeric data
  • Distributed compute (Spark/Ray) for large-scale pipelines
  • Production inference: async servers, batching, circuit breakers

Next steps

  • Refactor a data-loading step in your current project to use asyncio or threads and measure latency.
  • Move a CPU-heavy transform to processes; compare throughput.
  • Add timeouts and cancellation handling throughout your pipeline.

Mini challenge

Build a two-stage pipeline: stage 1 asynchronously simulates downloading 100 samples; stage 2 uses a process pool to apply a CPU-heavy transform. Limit stage 1 to 10 concurrent downloads, add a global timeout, and ensure results are emitted in input order. Print total items processed and total time.

Practice Exercises

3 exercises to complete

Instructions

Write an asyncio script that fetches 50 simulated items concurrently using a semaphore of 10. Ensure the final results list is in the same order as the input IDs. Add a 3s global timeout; if it triggers, cancel remaining tasks cleanly.

  • Create async function fetch(i, sem) that awaits asyncio.sleep with a small random delay and returns i.
  • Create tasks with asyncio.create_task and combine using asyncio.gather.
  • Wrap gather in asyncio.wait_for and handle asyncio.TimeoutError by cancelling pending tasks.
Expected Output
Fetched: 50

Async And Concurrency Basics — Quick Test

Test your knowledge with 10 questions. Pass with 70% or higher.

10 questions70% to pass

Have questions about Async And Concurrency Basics?

AI Assistant

Ask questions about this tool