The Concurrent Corridors: Async/Await and the Event Loop
Timothy's library system needed to fetch book metadata from multiple external APIs—publisher information, reader reviews, similar book recommendations. His synchronous code fetched them one at a time, and the library staff waited... and waited... and waited.
import requests
import time
def fetch_publisher_info(isbn):
response = requests.get(f"https://api.publisher.com/books/{isbn}")
return response.json()
def fetch_reviews(isbn):
response = requests.get(f"https://api.reviews.com/books/{isbn}")
return response.json()
def fetch_recommendations(isbn):
response = requests.get(f"https://api.recommendations.com/books/{isbn}")
return response.json()
start = time.time()
isbn = "9780441013593"
# Fetch one at a time - each waits for the previous
publisher = fetch_publisher_info(isbn) # Wait 2 seconds
reviews = fetch_reviews(isbn) # Wait 2 seconds
recommendations = fetch_recommendations(isbn) # Wait 2 seconds
print(f"Total time: {time.time() - start:.2f} seconds")
# Total time: 6.00 seconds - waited for each in sequence!
Margaret found him staring at loading spinners. "You're waiting when you could be working," she observed. "Come to the Concurrent Corridors—where Python does many things at once."
The Async Solution
She showed him the async version:
import asyncio
import aiohttp
async def fetch_publisher_info(session, isbn):
async with session.get(f"https://api.publisher.com/books/{isbn}") as response:
return await response.json()
async def fetch_reviews(session, isbn):
async with session.get(f"https://api.reviews.com/books/{isbn}") as response:
return await response.json()
async def fetch_recommendations(session, isbn):
async with session.get(f"https://api.recommendations.com/books/{isbn}") as response:
return await response.json()
async def fetch_all_data(isbn):
async with aiohttp.ClientSession() as session:
# All three run concurrently!
results = await asyncio.gather(
fetch_publisher_info(session, isbn),
fetch_reviews(session, isbn),
fetch_recommendations(session, isbn)
)
return results
start = time.time()
publisher, reviews, recommendations = asyncio.run(fetch_all_data(isbn))
print(f"Total time: {time.time() - start:.2f} seconds")
# Total time: 2.00 seconds - all three ran at once!
"Async functions don't block," Margaret explained. "When you await a network call, the program doesn't stop—it works on other tasks while waiting for the response. All three API calls happen concurrently."
What Are Coroutines?
Timothy learned the fundamental building block:
# Regular function - executes immediately
def regular_function():
print("Running")
return 42
result = regular_function() # Runs immediately
print(result) # 42
# Async function - returns a coroutine
async def async_function():
print("Running")
return 42
coro = async_function() # Doesn't run yet!
print(coro) # <coroutine object async_function>
print(type(coro)) # <class 'coroutine'>
# Must await it or run it
result = asyncio.run(async_function()) # Now it runs
print(result) # 42
"Async functions return coroutines—suspended computations," Margaret explained. "They don't execute immediately. You must await them or pass them to an event loop."
Tasks vs Coroutines
Margaret showed him the critical distinction:
import asyncio
async def work():
await asyncio.sleep(1)
return "done"
async def main():
# Coroutine - not scheduled yet
coro = work()
print(type(coro)) # <class 'coroutine'>
# Hasn't started running!
# Task - scheduled immediately on the event loop
task = asyncio.create_task(work())
print(type(task)) # <class 'Task'>
# Already running in the background!
# Tasks have extra capabilities
print(task.done()) # False - still running
task.cancel() # Can cancel tasks
# Must await the coroutine we created
result = await coro
print(result)
asyncio.run(main())
"Coroutines are blueprints," Margaret noted. "Tasks are scheduled work. When you create_task(), the task starts running immediately in the background. When you call an async function, you just get a coroutine—it won't run until awaited."
The Event Loop
Margaret revealed the mechanism behind async:
# Simplified event loop concept
async def task1():
print("Task 1: Starting")
await asyncio.sleep(1) # "I'm waiting - do other work"
print("Task 1: Done")
async def task2():
print("Task 2: Starting")
await asyncio.sleep(1) # "I'm waiting - do other work"
print("Task 2: Done")
async def main():
# Schedule both tasks
await asyncio.gather(task1(), task2())
asyncio.run(main())
# Task 1: Starting
# Task 2: Starting
# (1 second pause - both waiting simultaneously)
# Task 1: Done
# Task 2: Done
# Both tasks ran concurrently!
# Event loop switched between them during waits
"The event loop is Python's scheduler," Margaret explained. "When a coroutine hits await, it yields control back to the loop. The loop switches to another coroutine. When the awaited operation completes, the loop resumes the coroutine. It's cooperative multitasking—coroutines voluntarily yield control."
The Blocking Trap: time.sleep() vs asyncio.sleep()
Timothy made a common mistake:
import asyncio
import time
async def broken_task():
print("Starting")
time.sleep(2) # WRONG! Blocks the entire event loop!
print("Done")
async def other_task():
print("Other task running")
async def main():
await asyncio.gather(
broken_task(),
other_task()
)
asyncio.run(main())
# Starting
# (2 second freeze - nothing else can run!)
# Done
# Other task running # Only runs after broken_task finishes!
"Never use time.sleep() in async code!" Margaret warned. "It blocks the entire event loop—nothing else can run. Use asyncio.sleep() instead:"
async def correct_task():
print("Starting")
await asyncio.sleep(2) # Yields control - other tasks can run
print("Done")
async def other_task():
print("Other task running")
async def main():
await asyncio.gather(
correct_task(),
other_task()
)
asyncio.run(main())
# Starting
# Other task running # Runs immediately!
# (2 seconds pass)
# Done
"Any blocking call—time.sleep(), requests.get(), synchronous file I/O—freezes the event loop," she cautioned. "Always use async alternatives or wrap blocking code with run_in_executor()."
Understanding the GIL and Why Async Works
Timothy was puzzled. "Python has the Global Interpreter Lock—only one thread runs at a time. How does async help?"
Margaret smiled. "Excellent question. The GIL is why threading doesn't help CPU-bound tasks:"
import threading
import time
def cpu_work():
# CPU-intensive calculation
total = sum(i * i for i in range(10_000_000))
return total
start = time.time()
# Two threads don't run in parallel due to GIL
t1 = threading.Thread(target=cpu_work)
t2 = threading.Thread(target=cpu_work)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Time: {time.time() - start:.2f}s")
# Takes ~2x time - no parallelism due to GIL!
"But async works for I/O because during I/O operations—network calls, disk reads—Python releases the GIL," she explained. "The event loop is single-threaded, but while one coroutine waits for I/O, the loop switches to another coroutine. The key is that I/O waits don't hold the GIL."
# Why async works for I/O:
# 1. Coroutine makes network request
# 2. Python releases GIL during I/O wait
# 3. Event loop switches to another coroutine
# 4. That coroutine runs (single thread!)
# 5. When I/O completes, loop switches back
# This is why:
# - Async: Perfect for I/O-bound (waiting releases control)
# - Threading: I/O-bound with blocking libraries (GIL released during I/O)
# - Multiprocessing: CPU-bound (separate processes, separate GILs)
Await: The Suspension Point
Timothy learned what await actually did:
async def fetch_data():
print("Starting fetch")
# await suspends this coroutine
# Returns control to event loop
# Loop can run other coroutines
data = await slow_network_call()
# Resumes here when slow_network_call completes
print("Fetch complete")
return data
# await can only be used inside async functions
# await marks suspension points where the event loop can switch tasks
"The await keyword suspends the coroutine," Margaret noted. "It's like saying 'I'm waiting for something—do other work and come back to me when ready.' This is how async enables concurrency."
Concurrent Patterns
Margaret showed him the essential patterns:
import asyncio
async def fetch_book(book_id: int):
await asyncio.sleep(1) # Simulate network delay
return f"Book {book_id}"
async def concurrent_gather():
"""All at once with gather - waits for all"""
books = await asyncio.gather(
fetch_book(1),
fetch_book(2),
fetch_book(3)
)
print(books) # ['Book 1', 'Book 2', 'Book 3'] - all at once!
async def concurrent_tasks():
"""Background tasks with create_task"""
# Tasks start running immediately
task1 = asyncio.create_task(fetch_book(1))
task2 = asyncio.create_task(fetch_book(2))
task3 = asyncio.create_task(fetch_book(3))
# Do other work while tasks run
print("Doing other work...")
# Collect results
books = await asyncio.gather(task1, task2, task3)
print(books)
async def process_as_completed():
"""Process results as they arrive"""
tasks = [fetch_book(i) for i in range(1, 4)]
for coro in asyncio.as_completed(tasks):
book = await coro
print(f"Got {book}") # Process immediately when ready
asyncio.run(concurrent_gather())
asyncio.run(concurrent_tasks())
asyncio.run(process_as_completed())
Task Cancellation and Timeouts
Margaret showed him how to handle tasks that take too long:
import asyncio
async def long_operation():
try:
print("Starting long operation...")
await asyncio.sleep(10)
return "completed"
except asyncio.CancelledError:
print("Operation was cancelled!")
# Perform cleanup here
raise # Re-raise to signal cancellation
async def timeout_example():
"""Using wait_for for timeouts"""
try:
result = await asyncio.wait_for(
long_operation(),
timeout=2.0
)
print(result)
except asyncio.TimeoutError:
print("Operation timed out after 2 seconds")
async def manual_cancellation():
"""Manually cancelling tasks"""
task = asyncio.create_task(long_operation())
# Do some work
await asyncio.sleep(1)
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
async def timeout_group():
"""Timeout for multiple operations"""
try:
results = await asyncio.wait_for(
asyncio.gather(
long_operation(),
long_operation(),
long_operation()
),
timeout=5.0
)
except asyncio.TimeoutError:
print("One or more operations timed out")
# Python 3.11+ context manager style
async def modern_timeout():
try:
async with asyncio.timeout(2.0):
result = await long_operation()
except TimeoutError:
print("Timed out!")
"Timeouts prevent your program from hanging indefinitely," Margaret explained. "And cancellation lets you stop work that's no longer needed—like when a user cancels a request."
Producer/Consumer with asyncio.Queue
She demonstrated a common async pattern:
import asyncio
async def producer(queue: asyncio.Queue, n: int):
"""Produces items and adds them to the queue"""
for i in range(n):
await asyncio.sleep(0.1) # Simulate work
item = f"item-{i}"
await queue.put(item)
print(f"Produced {item}")
# Signal completion
await queue.put(None)
async def consumer(queue: asyncio.Queue):
"""Consumes items from the queue"""
while True:
item = await queue.get()
if item is None: # Sentinel value - we're done
break
print(f"Consumed {item}")
await asyncio.sleep(0.2) # Simulate processing
queue.task_done()
async def producer_consumer_example():
queue = asyncio.Queue()
# Run producer and consumer concurrently
await asyncio.gather(
producer(queue, 10),
consumer(queue)
)
# Wait for all items to be processed
await queue.join()
asyncio.run(producer_consumer_example())
"Queues let producers and consumers work at different rates," Margaret noted. "The queue buffers items between them—perfect for processing pipelines."
Error Handling in Async
Timothy learned how to handle failures gracefully:
import asyncio
async def might_fail(n: int):
await asyncio.sleep(0.1)
if n == 2:
raise ValueError(f"Task {n} failed!")
return f"Task {n} succeeded"
async def gather_with_exceptions():
"""Collect errors without stopping other tasks"""
results = await asyncio.gather(
might_fail(1),
might_fail(2), # This will fail
might_fail(3),
return_exceptions=True # Don't raise, return exceptions
)
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i}: {result}")
asyncio.run(gather_with_exceptions())
# Task 1: Task 1 succeeded
# Task 2 failed: Task 2 failed!
# Task 3: Task 3 succeeded
"With return_exceptions=True, failures don't stop other tasks," she explained. "You get exceptions in the results list instead of an immediate raise."
Async Context Managers
Margaret showed him async resource management:
import asyncio
import aiohttp
class AsyncDatabaseConnection:
async def __aenter__(self):
print("Connecting to database...")
await asyncio.sleep(0.1) # Simulate connection
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
await asyncio.sleep(0.1) # Simulate cleanup
async def query(self, sql):
await asyncio.sleep(0.1)
return f"Results for: {sql}"
async def use_async_context_manager():
# async with for resources that need async setup/cleanup
async with AsyncDatabaseConnection() as db:
result = await db.query("SELECT * FROM books")
print(result)
# Automatically cleaned up
# Real-world example with aiohttp
async def fetch_with_session():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
data = await response.json()
return data
asyncio.run(use_async_context_manager())
"Async context managers use async with," Margaret noted. "They can await during setup and teardown—perfect for network connections, database sessions, and file handles."
Async Iterators
She demonstrated async iteration:
import asyncio
class AsyncBookReader:
def __init__(self, books):
self.books = books
self.index = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.books):
raise StopAsyncIteration
await asyncio.sleep(0.1) # Simulate async fetch
book = self.books[self.index]
self.index += 1
return book
async def read_books():
reader = AsyncBookReader(['Book A', 'Book B', 'Book C'])
# async for iterates over async iterator
async for book in reader:
print(f"Reading {book}")
asyncio.run(read_books())
The Event Loop Already Running Gotcha
Timothy encountered a confusing error in Jupyter:
# In Jupyter notebook or IPython:
async def my_function():
return "result"
# This fails with RuntimeError!
# result = asyncio.run(my_function())
# RuntimeError: asyncio.run() cannot be called from a running event loop
# Jupyter already has an event loop running
# Just await directly:
result = await my_function() # Works in Jupyter!
# In regular Python scripts, use asyncio.run():
if __name__ == "__main__":
result = asyncio.run(my_function()) # Works in scripts!
"Jupyter and IPython already run an event loop," Margaret explained. "You can't nest asyncio.run() calls. In notebooks, just await directly. In scripts, asyncio.run() is your entry point."
When Async Doesn't Help
Margaret warned him about async limitations:
import asyncio
import time
async def cpu_intensive():
"""This won't benefit from async"""
# Heavy computation - holds the GIL
result = sum(i * i for i in range(10_000_000))
return result
async def main():
start = time.time()
# These run sequentially, not concurrently!
results = await asyncio.gather(
cpu_intensive(),
cpu_intensive(),
cpu_intensive()
)
print(f"Time: {time.time() - start:.2f}s")
# Still takes 3x time - no concurrency benefit!
asyncio.run(main())
"Async helps when you're waiting—network, disk, database," she explained. "For CPU-intensive work, use multiprocessing to bypass the GIL with separate processes."
Async vs Threading vs Multiprocessing
Timothy learned when to use each:
# Use ASYNC when:
# - I/O-bound: network requests, database queries, file operations
# - Many concurrent operations (hundreds or thousands)
# - Using async libraries (aiohttp, asyncpg, aiofiles)
# - Example: Web server handling many requests
# Use THREADING when:
# - I/O-bound with blocking libraries (requests, standard file I/O)
# - Moderate concurrency (tens of operations)
# - Libraries don't support async
# - Example: Scraping with requests library
# Use MULTIPROCESSING when:
# - CPU-bound: calculations, data processing, image manipulation
# - Need true parallelism
# - Work can be split into independent chunks
# - Example: Processing large datasets, scientific computing
Real-World Example: API Client with Rate Limiting
Margaret showed him a production-ready async API client:
import asyncio
import aiohttp
from typing import List
class BookAPIClient:
def __init__(self, base_url: str, max_concurrent: int = 10):
self.base_url = base_url
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
await self.session.close()
async def fetch_book(self, isbn: str) -> dict:
"""Fetch single book with rate limiting"""
async with self.semaphore: # Limit concurrent requests
async with self.session.get(
f"{self.base_url}/books/{isbn}",
timeout=aiohttp.ClientTimeout(total=5)
) as response:
response.raise_for_status()
return await response.json()
async def fetch_many_books(self, isbns: List[str]) -> List[dict]:
"""Fetch multiple books concurrently"""
tasks = [self.fetch_book(isbn) for isbn in isbns]
return await asyncio.gather(
*tasks,
return_exceptions=True # Don't stop on failures
)
async def search_books(self, query: str) -> List[dict]:
"""Search for books"""
async with self.session.get(
f"{self.base_url}/search",
params={"q": query}
) as response:
response.raise_for_status()
return await response.json()
# Usage
async def main():
isbns = [
"9780441013593",
"9780345391803",
"9780765326355",
"9780441569595",
"9780765326362"
]
async with BookAPIClient("https://api.library.com") as client:
# Fetch all books concurrently (max 10 at once)
books = await client.fetch_many_books(isbns)
# Process results
for isbn, result in zip(isbns, books):
if isinstance(result, Exception):
print(f"Failed to fetch {isbn}: {result}")
else:
print(f"Got book: {result['title']}")
asyncio.run(main())
"The semaphore limits concurrent requests," Margaret explained. "This prevents overwhelming the API server while still achieving high throughput. The context manager ensures cleanup even if errors occur."
Running Blocking Code in Async
She showed him how to integrate blocking libraries:
import asyncio
import requests # Synchronous library
async def fetch_with_requests(url: str):
"""Use blocking library without freezing event loop"""
loop = asyncio.get_event_loop()
# Run blocking code in thread pool
response = await loop.run_in_executor(
None, # Use default ThreadPoolExecutor
requests.get,
url
)
return response.json()
async def main():
# Mix async and sync libraries safely
results = await asyncio.gather(
fetch_with_requests('https://api.example.com/1'),
fetch_with_requests('https://api.example.com/2'),
fetch_with_requests('https://api.example.com/3')
)
print(results)
asyncio.run(main())
"When you must use blocking libraries, wrap calls with run_in_executor()," she advised. "The event loop runs them in threads, keeping itself responsive."
The Takeaway
Timothy stood in the Concurrent Corridors, watching the event loop orchestrate hundreds of coroutines.
Async/await transforms waiting into working: When coroutines wait for I/O, the event loop switches to other work—achieving high concurrency in a single thread.
Coroutines are suspended computations: Async functions return coroutines that don't run until awaited or passed to the event loop.
The event loop is the scheduler: Switches between coroutines at await points—cooperative multitasking.
await is the suspension point: Where coroutines yield control, letting other work proceed.
Tasks vs coroutines: Tasks are scheduled immediately; coroutines need explicit awaiting.
asyncio.sleep() vs time.sleep(): Always use async versions or break concurrency entirely.
The GIL isn't a problem for I/O: Python releases the GIL during I/O waits, letting the event loop switch coroutines.
asyncio.gather() runs concurrently: All start at once, wait for all to complete.
asyncio.create_task() schedules background work: Task runs while you do other things.
Task cancellation prevents hangs: Use timeouts and manual cancellation for long-running operations.
asyncio.Queue for producer/consumer: Buffer work between coroutines running at different rates.
async with for async resources: Context managers that can await during setup/teardown.
async for iterates asynchronously: Process items from async sources (APIs, databases).
Jupyter has a running loop: Can't use asyncio.run(), just await directly.
Async doesn't help CPU-bound tasks: Use multiprocessing for computation-heavy work.
Semaphores limit concurrency: Prevent overwhelming external services with too many requests.
return_exceptions=True in gather: Continue despite errors, collect exceptions with results.
run_in_executor() for blocking code: Wrap synchronous libraries to avoid freezing the event loop.
asyncio.run() is the entry point: Creates loop, runs coroutine, cleans up—use it to start async programs in scripts.
Python's async/await Pattern
Timothy had discovered Python's approach to concurrency—the async/await pattern that transformed waiting time into productive time.
The Concurrent Corridors revealed that most programs spend their lives waiting—for network responses, database queries, file operations—and async let Python juggle hundreds or thousands of such operations simultaneously in a single thread.
He learned that async wasn't magic parallelism but cooperative multitasking where coroutines politely yielded control at await points, letting the event loop orchestrate the dance.
He understood when async shined (I/O-bound) and when it failed (CPU-bound), knew the critical difference between asyncio.sleep() and time.sleep(), and mastered patterns for timeouts, cancellation, and rate limiting,
Finally, Timothy understood why the GIL didn't prevent async concurrency, and recognized that modern Python's async ecosystem transformed slow, sequential programs into fast, concurrent systems—as long as you were waiting for something external, not crunching numbers internally.
Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.
.jpeg)

Comments
Post a Comment