Async Patterns for Production Python
Structuring concurrent Python applications with asyncio — task groups, graceful shutdown, and error boundaries.
Beyond async/await
Writing async def is the easy part. Building reliable concurrent systems requires understanding task lifecycle management, cancellation semantics, and error propagation.
Task Groups
Python 3.11 introduced TaskGroup, the structured concurrency primitive that should replace asyncio.gather in production code.
| 1 | async def process_batch(items: list[Item]) -> list[Result]: |
| 2 | async with asyncio.TaskGroup() as tg: |
| 3 | tasks = [tg.create_task(process(item)) for item in items] |
| 4 | return [task.result() for task in tasks] |
The key difference: if any task fails, all other tasks in the group are cancelled. With asyncio.gather, you get partial results and potentially leaked tasks.
Graceful Shutdown
Production services must handle SIGTERM gracefully. This means draining in-flight requests, closing database connections, and flushing buffers.
| 1 | class GracefulService: |
| 2 | def __init__(self): |
| 3 | self.shutdown_event = asyncio.Event() |
| 4 | |
| 5 | async def run(self): |
| 6 | loop = asyncio.get_event_loop() |
| 7 | loop.add_signal_handler(signal.SIGTERM, self.shutdown_event.set) |
| 8 | |
| 9 | async with asyncio.TaskGroup() as tg: |
| 10 | tg.create_task(self.serve()) |
| 11 | tg.create_task(self.wait_for_shutdown()) |
| 12 | |
| 13 | async def wait_for_shutdown(self): |
| 14 | await self.shutdown_event.wait() |
| 15 | await self.drain_connections() |
| 16 | raise SystemExit(0) |
Semaphores for Rate Limiting
When calling external APIs concurrently, semaphores prevent overwhelming downstream services.
| 1 | class RateLimitedClient: |
| 2 | def __init__(self, max_concurrent: int = 10): |
| 3 | self.semaphore = asyncio.Semaphore(max_concurrent) |
| 4 | |
| 5 | async def fetch(self, url: str) -> Response: |
| 6 | async with self.semaphore: |
| 7 | return await self.client.get(url) |
Key Takeaways
- Prefer
TaskGroupoverasyncio.gatherfor structured error handling - Always implement graceful shutdown in long-running services
- Use semaphores to bound concurrency against external resources
- Design cancellation-aware code from the start