Why this matters for Machine Learning Engineers
Async and concurrency help you move more data, finish experiments sooner, and keep production inference responsive. As a Machine Learning Engineer, you will often need to:
- Ingest datasets from multiple APIs or cloud buckets at once without blocking.
- Prefetch and stream batches to GPUs while the model trains.
- Run parallel preprocessing on large corpora (images, text, logs).
- Serve inference to many users concurrently, with timeouts and backpressure.
- Automate many small experiments or evaluations in parallel.
Who this is for
- Beginner–intermediate Python users building ML data pipelines, training loops, or model services.
- Engineers who want to speed up IO-bound tasks (downloads, file reads) and scale CPU-bound preprocessing.
Prerequisites
- Comfortable with Python functions, modules, and basic packages.
- Know how to run scripts from the command line.
- Basic understanding of ML workflows (data loading, preprocessing, training, inference).
Concept explained simply
Concurrency lets a program handle multiple tasks by efficiently switching between them. Parallelism runs tasks at the exact same time on different CPU cores.
- Asyncio: Single-threaded concurrency for IO-bound work. You write
async deffunctions andawaitnon-blocking operations (like network or disk IO). - Threads: Good for IO-bound tasks that use blocking libraries. Python threads share memory. The GIL limits speedups on CPU-bound work but IO can still scale well.
- Processes: True parallel computation for CPU-bound tasks (feature extraction, image transforms). Each process has its own Python interpreter and memory.
A quick mental model
Imagine a restaurant:
- Asyncio: One skilled chef managing multiple dishes by switching while others cook in the oven (waiting on IO).
- Threads: Multiple assistants doing separate phone orders (blocking IO calls).
- Processes: Multiple chefs in separate kitchens cooking heavy dishes in parallel (CPU-bound work).
Deep dive: GIL and when it matters
The Global Interpreter Lock (GIL) lets only one thread execute Python bytecode at a time. For CPU-bound code, threads won't speed up compute-heavy operations. For IO-bound code, threads can still run while other threads wait on IO.
Key building blocks
- Asyncio:
async def,await,asyncio.run,asyncio.create_task,asyncio.gather,asyncio.as_completed,asyncio.Semaphore,asyncio.wait_for, cancellation. - Threads:
threading.Thread,concurrent.futures.ThreadPoolExecutor,Lock,Queue. Great when a library is blocking (e.g., file, network, DB clients). - Processes:
multiprocessing,concurrent.futures.ProcessPoolExecutor. Use for CPU-bound work; rememberif __name__ == "__main__"guard (especially on Windows/macOS). - Patterns: Producer–consumer queues, pipeline stages, batching, rate limiting, backpressure, timeouts, graceful shutdown.
Worked examples
Example 1 — Asyncio prefetch for IO-bound tasks
import asyncio, random
async def fetch_item(i, sem):
async with sem: # rate limit
await asyncio.sleep(random.uniform(0.05, 0.2)) # simulate network
return f"item-{i}"
async def main():
sem = asyncio.Semaphore(10)
tasks = [asyncio.create_task(fetch_item(i, sem)) for i in range(30)]
try:
results = await asyncio.wait_for(asyncio.gather(*tasks), timeout=2.0)
print("Fetched:", len(results))
except asyncio.TimeoutError:
for t in tasks:
t.cancel()
print("Timed out; cancelled remaining tasks")
if __name__ == "__main__":
asyncio.run(main())
Why it helps: Keep your pipeline busy while waiting on network/disk.
Example 2 — Threads for file-like IO work
from concurrent.futures import ThreadPoolExecutor, as_completed
import time, random
def read_log(i): # blocking IO simulation
time.sleep(random.uniform(0.05, 0.15))
return f"log-{i}" * random.randint(1, 4)
def main():
total = 0
with ThreadPoolExecutor(max_workers=8) as ex:
futures = [ex.submit(read_log, i) for i in range(100)]
for f in as_completed(futures):
total += len(f.result())
print("Processed:", len(futures), "Total chars:", total)
if __name__ == "__main__":
main()
Why it helps: Works with libraries that aren’t async-friendly.
Example 3 — Processes for CPU-bound work
from concurrent.futures import ProcessPoolExecutor
def is_prime(n):
if n < 2: return False
if n % 2 == 0: return n == 2
r = int(n ** 0.5)
for k in range(3, r + 1, 2):
if n % k == 0:
return False
return True
def count_primes(limit):
return sum(1 for i in range(limit) if is_prime(i))
def main():
limits = [50_000, 60_000, 70_000, 80_000]
with ProcessPoolExecutor() as ex:
counts = list(ex.map(count_primes, limits))
print(list(zip(limits, counts)))
if __name__ == "__main__":
main()
Why it helps: Speed up heavy preprocessing by using multiple cores.
How to choose: decision checklist
- If it waits on network/disk: prefer asyncio. If the library is blocking and not async: use threads.
- If it burns CPU: use processes (or native extensions that release the GIL).
- Need both? Use asyncio for IO stages, and offload CPU steps with a ProcessPool.
- Apply rate limits, timeouts, and cancellation to stay reliable under load.
Exercises
Note: The quick test is available to everyone; only logged-in users get saved progress.
-
Async batch fetch with rate limit
- Write an
asyncioprogram that fetches 50 simulated items concurrently. - Use
asyncio.Semaphore(10)to limit concurrency. - Ensure results are returned in the same order as inputs.
- Add a global timeout of 3 seconds.
- Write an
-
Threaded IO pipeline
- Use
ThreadPoolExecutor(max_workers=8)to simulate downloading 30 files (sleep) and computing a short hash (e.g., length or hashlib). - Print the count of completed items and the first 5 hashes.
- Use
-
Parallel CPU preprocessing
- Use
ProcessPoolExecutorto run a CPU-heavy function across 4 inputs. - Return results in the same order as inputs and print a list of tuples (input, output).
- Use
- Self-check checklist:
- No blocking
time.sleepinsideasync def(useasyncio.sleep). - Threaded code avoids shared mutable state or uses locks.
- Process code guarded by
if __name__ == "__main__". - Time-limited tasks cancel cleanly without hanging.
- No blocking
Common mistakes and how to self-check
- Blocking the event loop: Using
time.sleepor CPU-heavy loops insideasync def. Fix:await asyncio.sleepor offload CPU to a process pool. - Assuming threads speed up CPU code: GIL prevents true parallelism for pure Python CPU tasks. Use processes.
- Forgetting timeouts and cancellation: Leads to stuck pipelines. Use
asyncio.wait_forand cancel pending tasks. - Race conditions in threads: Shared data without locks/queues. Use
Lockor design message-passing. - Huge object transfer between processes: Serialization overhead kills speed. Send IDs/paths, not large arrays; load inside the process.
Practical projects
- Async dataset ingestor: concurrently fetch metadata for 5,000 items with rate limiting and retries; write to a local file.
- Threaded log aggregator: read many small log files concurrently, extract metrics, and write rolling summaries.
- Multiprocess image preprocessor: resize and normalize images across cores; measure speedup vs single process.
Learning path
- Async and Concurrency Basics (this page)
- Advanced asyncio patterns: queues, backpressure, structured concurrency
- Multiprocessing and shared memory for numeric data
- Distributed compute (Spark/Ray) for large-scale pipelines
- Production inference: async servers, batching, circuit breakers
Next steps
- Refactor a data-loading step in your current project to use asyncio or threads and measure latency.
- Move a CPU-heavy transform to processes; compare throughput.
- Add timeouts and cancellation handling throughout your pipeline.
Mini challenge
Build a two-stage pipeline: stage 1 asynchronously simulates downloading 100 samples; stage 2 uses a process pool to apply a CPU-heavy transform. Limit stage 1 to 10 concurrent downloads, add a global timeout, and ensure results are emitted in input order. Print total items processed and total time.