The Concurrent Corridors: Async/Await and the Event Loop

 

The Concurrent Corridors: Async/Await and the Event Loop





Timothy's library system needed to fetch book metadata from multiple external APIs—publisher information, reader reviews, similar book recommendations. His synchronous code fetched them one at a time, and the library staff waited... and waited... and waited.

import requests
import time

def fetch_publisher_info(isbn):
    response = requests.get(f"https://api.publisher.com/books/{isbn}")
    return response.json()

def fetch_reviews(isbn):
    response = requests.get(f"https://api.reviews.com/books/{isbn}")
    return response.json()

def fetch_recommendations(isbn):
    response = requests.get(f"https://api.recommendations.com/books/{isbn}")
    return response.json()

start = time.time()
isbn = "9780441013593"

# Fetch one at a time - each waits for the previous
publisher = fetch_publisher_info(isbn)    # Wait 2 seconds
reviews = fetch_reviews(isbn)             # Wait 2 seconds
recommendations = fetch_recommendations(isbn)  # Wait 2 seconds

print(f"Total time: {time.time() - start:.2f} seconds")
# Total time: 6.00 seconds - waited for each in sequence!

Margaret found him staring at loading spinners. "You're waiting when you could be working," she observed. "Come to the Concurrent Corridors—where Python does many things at once."

The Async Solution

She showed him the async version:

import asyncio
import aiohttp

async def fetch_publisher_info(session, isbn):
    async with session.get(f"https://api.publisher.com/books/{isbn}") as response:
        return await response.json()

async def fetch_reviews(session, isbn):
    async with session.get(f"https://api.reviews.com/books/{isbn}") as response:
        return await response.json()

async def fetch_recommendations(session, isbn):
    async with session.get(f"https://api.recommendations.com/books/{isbn}") as response:
        return await response.json()

async def fetch_all_data(isbn):
    async with aiohttp.ClientSession() as session:
        # All three run concurrently!
        results = await asyncio.gather(
            fetch_publisher_info(session, isbn),
            fetch_reviews(session, isbn),
            fetch_recommendations(session, isbn)
        )
        return results

start = time.time()
publisher, reviews, recommendations = asyncio.run(fetch_all_data(isbn))
print(f"Total time: {time.time() - start:.2f} seconds")
# Total time: 2.00 seconds - all three ran at once!

"Async functions don't block," Margaret explained. "When you await a network call, the program doesn't stop—it works on other tasks while waiting for the response. All three API calls happen concurrently."

What Are Coroutines?

Timothy learned the fundamental building block:

# Regular function - executes immediately
def regular_function():
    print("Running")
    return 42

result = regular_function()  # Runs immediately
print(result)  # 42

# Async function - returns a coroutine
async def async_function():
    print("Running")
    return 42

coro = async_function()  # Doesn't run yet!
print(coro)  # <coroutine object async_function>
print(type(coro))  # <class 'coroutine'>

# Must await it or run it
result = asyncio.run(async_function())  # Now it runs
print(result)  # 42

"Async functions return coroutines—suspended computations," Margaret explained. "They don't execute immediately. You must await them or pass them to an event loop."

Tasks vs Coroutines

Margaret showed him the critical distinction:

import asyncio

async def work():
    await asyncio.sleep(1)
    return "done"

async def main():
    # Coroutine - not scheduled yet
    coro = work()
    print(type(coro))  # <class 'coroutine'>
    # Hasn't started running!

    # Task - scheduled immediately on the event loop
    task = asyncio.create_task(work())
    print(type(task))  # <class 'Task'>
    # Already running in the background!

    # Tasks have extra capabilities
    print(task.done())  # False - still running
    task.cancel()       # Can cancel tasks

    # Must await the coroutine we created
    result = await coro
    print(result)

asyncio.run(main())

"Coroutines are blueprints," Margaret noted. "Tasks are scheduled work. When you create_task(), the task starts running immediately in the background. When you call an async function, you just get a coroutine—it won't run until awaited."

The Event Loop

Margaret revealed the mechanism behind async:

# Simplified event loop concept
async def task1():
    print("Task 1: Starting")
    await asyncio.sleep(1)  # "I'm waiting - do other work"
    print("Task 1: Done")

async def task2():
    print("Task 2: Starting")
    await asyncio.sleep(1)  # "I'm waiting - do other work"
    print("Task 2: Done")

async def main():
    # Schedule both tasks
    await asyncio.gather(task1(), task2())

asyncio.run(main())
# Task 1: Starting
# Task 2: Starting
# (1 second pause - both waiting simultaneously)
# Task 1: Done
# Task 2: Done

# Both tasks ran concurrently!
# Event loop switched between them during waits

"The event loop is Python's scheduler," Margaret explained. "When a coroutine hits await, it yields control back to the loop. The loop switches to another coroutine. When the awaited operation completes, the loop resumes the coroutine. It's cooperative multitasking—coroutines voluntarily yield control."

The Blocking Trap: time.sleep() vs asyncio.sleep()

Timothy made a common mistake:

import asyncio
import time

async def broken_task():
    print("Starting")
    time.sleep(2)  # WRONG! Blocks the entire event loop!
    print("Done")

async def other_task():
    print("Other task running")

async def main():
    await asyncio.gather(
        broken_task(),
        other_task()
    )

asyncio.run(main())
# Starting
# (2 second freeze - nothing else can run!)
# Done
# Other task running  # Only runs after broken_task finishes!

"Never use time.sleep() in async code!" Margaret warned. "It blocks the entire event loop—nothing else can run. Use asyncio.sleep() instead:"

async def correct_task():
    print("Starting")
    await asyncio.sleep(2)  # Yields control - other tasks can run
    print("Done")

async def other_task():
    print("Other task running")

async def main():
    await asyncio.gather(
        correct_task(),
        other_task()
    )

asyncio.run(main())
# Starting
# Other task running  # Runs immediately!
# (2 seconds pass)
# Done

"Any blocking call—time.sleep()requests.get(), synchronous file I/O—freezes the event loop," she cautioned. "Always use async alternatives or wrap blocking code with run_in_executor()."

Understanding the GIL and Why Async Works

Timothy was puzzled. "Python has the Global Interpreter Lock—only one thread runs at a time. How does async help?"

Margaret smiled. "Excellent question. The GIL is why threading doesn't help CPU-bound tasks:"

import threading
import time

def cpu_work():
    # CPU-intensive calculation
    total = sum(i * i for i in range(10_000_000))
    return total

start = time.time()

# Two threads don't run in parallel due to GIL
t1 = threading.Thread(target=cpu_work)
t2 = threading.Thread(target=cpu_work)

t1.start()
t2.start()
t1.join()
t2.join()

print(f"Time: {time.time() - start:.2f}s")
# Takes ~2x time - no parallelism due to GIL!

"But async works for I/O because during I/O operations—network calls, disk reads—Python releases the GIL," she explained. "The event loop is single-threaded, but while one coroutine waits for I/O, the loop switches to another coroutine. The key is that I/O waits don't hold the GIL."

# Why async works for I/O:
# 1. Coroutine makes network request
# 2. Python releases GIL during I/O wait
# 3. Event loop switches to another coroutine
# 4. That coroutine runs (single thread!)
# 5. When I/O completes, loop switches back

# This is why:
# - Async: Perfect for I/O-bound (waiting releases control)
# - Threading: I/O-bound with blocking libraries (GIL released during I/O)
# - Multiprocessing: CPU-bound (separate processes, separate GILs)

Await: The Suspension Point

Timothy learned what await actually did:

async def fetch_data():
    print("Starting fetch")

    # await suspends this coroutine
    # Returns control to event loop
    # Loop can run other coroutines
    data = await slow_network_call()

    # Resumes here when slow_network_call completes
    print("Fetch complete")
    return data

# await can only be used inside async functions
# await marks suspension points where the event loop can switch tasks

"The await keyword suspends the coroutine," Margaret noted. "It's like saying 'I'm waiting for something—do other work and come back to me when ready.' This is how async enables concurrency."

Concurrent Patterns

Margaret showed him the essential patterns:

import asyncio

async def fetch_book(book_id: int):
    await asyncio.sleep(1)  # Simulate network delay
    return f"Book {book_id}"

async def concurrent_gather():
    """All at once with gather - waits for all"""
    books = await asyncio.gather(
        fetch_book(1),
        fetch_book(2),
        fetch_book(3)
    )
    print(books)  # ['Book 1', 'Book 2', 'Book 3'] - all at once!

async def concurrent_tasks():
    """Background tasks with create_task"""
    # Tasks start running immediately
    task1 = asyncio.create_task(fetch_book(1))
    task2 = asyncio.create_task(fetch_book(2))
    task3 = asyncio.create_task(fetch_book(3))

    # Do other work while tasks run
    print("Doing other work...")

    # Collect results
    books = await asyncio.gather(task1, task2, task3)
    print(books)

async def process_as_completed():
    """Process results as they arrive"""
    tasks = [fetch_book(i) for i in range(1, 4)]

    for coro in asyncio.as_completed(tasks):
        book = await coro
        print(f"Got {book}")  # Process immediately when ready

asyncio.run(concurrent_gather())
asyncio.run(concurrent_tasks())
asyncio.run(process_as_completed())

Task Cancellation and Timeouts

Margaret showed him how to handle tasks that take too long:

import asyncio

async def long_operation():
    try:
        print("Starting long operation...")
        await asyncio.sleep(10)
        return "completed"
    except asyncio.CancelledError:
        print("Operation was cancelled!")
        # Perform cleanup here
        raise  # Re-raise to signal cancellation

async def timeout_example():
    """Using wait_for for timeouts"""
    try:
        result = await asyncio.wait_for(
            long_operation(),
            timeout=2.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out after 2 seconds")

async def manual_cancellation():
    """Manually cancelling tasks"""
    task = asyncio.create_task(long_operation())

    # Do some work
    await asyncio.sleep(1)

    # Cancel the task
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

async def timeout_group():
    """Timeout for multiple operations"""
    try:
        results = await asyncio.wait_for(
            asyncio.gather(
                long_operation(),
                long_operation(),
                long_operation()
            ),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        print("One or more operations timed out")

# Python 3.11+ context manager style
async def modern_timeout():
    try:
        async with asyncio.timeout(2.0):
            result = await long_operation()
    except TimeoutError:
        print("Timed out!")

"Timeouts prevent your program from hanging indefinitely," Margaret explained. "And cancellation lets you stop work that's no longer needed—like when a user cancels a request."

Producer/Consumer with asyncio.Queue

She demonstrated a common async pattern:

import asyncio

async def producer(queue: asyncio.Queue, n: int):
    """Produces items and adds them to the queue"""
    for i in range(n):
        await asyncio.sleep(0.1)  # Simulate work
        item = f"item-{i}"
        await queue.put(item)
        print(f"Produced {item}")

    # Signal completion
    await queue.put(None)

async def consumer(queue: asyncio.Queue):
    """Consumes items from the queue"""
    while True:
        item = await queue.get()

        if item is None:  # Sentinel value - we're done
            break

        print(f"Consumed {item}")
        await asyncio.sleep(0.2)  # Simulate processing
        queue.task_done()

async def producer_consumer_example():
    queue = asyncio.Queue()

    # Run producer and consumer concurrently
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue)
    )

    # Wait for all items to be processed
    await queue.join()

asyncio.run(producer_consumer_example())

"Queues let producers and consumers work at different rates," Margaret noted. "The queue buffers items between them—perfect for processing pipelines."

Error Handling in Async

Timothy learned how to handle failures gracefully:

import asyncio

async def might_fail(n: int):
    await asyncio.sleep(0.1)
    if n == 2:
        raise ValueError(f"Task {n} failed!")
    return f"Task {n} succeeded"

async def gather_with_exceptions():
    """Collect errors without stopping other tasks"""
    results = await asyncio.gather(
        might_fail(1),
        might_fail(2),  # This will fail
        might_fail(3),
        return_exceptions=True  # Don't raise, return exceptions
    )

    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i}: {result}")

asyncio.run(gather_with_exceptions())
# Task 1: Task 1 succeeded
# Task 2 failed: Task 2 failed!
# Task 3: Task 3 succeeded

"With return_exceptions=True, failures don't stop other tasks," she explained. "You get exceptions in the results list instead of an immediate raise."

Async Context Managers

Margaret showed him async resource management:

import asyncio
import aiohttp

class AsyncDatabaseConnection:
    async def __aenter__(self):
        print("Connecting to database...")
        await asyncio.sleep(0.1)  # Simulate connection
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.1)  # Simulate cleanup

    async def query(self, sql):
        await asyncio.sleep(0.1)
        return f"Results for: {sql}"

async def use_async_context_manager():
    # async with for resources that need async setup/cleanup
    async with AsyncDatabaseConnection() as db:
        result = await db.query("SELECT * FROM books")
        print(result)
    # Automatically cleaned up

# Real-world example with aiohttp
async def fetch_with_session():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as response:
            data = await response.json()
            return data

asyncio.run(use_async_context_manager())

"Async context managers use async with," Margaret noted. "They can await during setup and teardown—perfect for network connections, database sessions, and file handles."

Async Iterators

She demonstrated async iteration:

import asyncio

class AsyncBookReader:
    def __init__(self, books):
        self.books = books
        self.index = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.index >= len(self.books):
            raise StopAsyncIteration

        await asyncio.sleep(0.1)  # Simulate async fetch
        book = self.books[self.index]
        self.index += 1
        return book

async def read_books():
    reader = AsyncBookReader(['Book A', 'Book B', 'Book C'])

    # async for iterates over async iterator
    async for book in reader:
        print(f"Reading {book}")

asyncio.run(read_books())

The Event Loop Already Running Gotcha

Timothy encountered a confusing error in Jupyter:

# In Jupyter notebook or IPython:
async def my_function():
    return "result"

# This fails with RuntimeError!
# result = asyncio.run(my_function())
# RuntimeError: asyncio.run() cannot be called from a running event loop

# Jupyter already has an event loop running
# Just await directly:
result = await my_function()  # Works in Jupyter!

# In regular Python scripts, use asyncio.run():
if __name__ == "__main__":
    result = asyncio.run(my_function())  # Works in scripts!

"Jupyter and IPython already run an event loop," Margaret explained. "You can't nest asyncio.run() calls. In notebooks, just await directly. In scripts, asyncio.run() is your entry point."

When Async Doesn't Help

Margaret warned him about async limitations:

import asyncio
import time

async def cpu_intensive():
    """This won't benefit from async"""
    # Heavy computation - holds the GIL
    result = sum(i * i for i in range(10_000_000))
    return result

async def main():
    start = time.time()

    # These run sequentially, not concurrently!
    results = await asyncio.gather(
        cpu_intensive(),
        cpu_intensive(),
        cpu_intensive()
    )

    print(f"Time: {time.time() - start:.2f}s")
    # Still takes 3x time - no concurrency benefit!

asyncio.run(main())

"Async helps when you're waiting—network, disk, database," she explained. "For CPU-intensive work, use multiprocessing to bypass the GIL with separate processes."

Async vs Threading vs Multiprocessing

Timothy learned when to use each:

# Use ASYNC when:
# - I/O-bound: network requests, database queries, file operations
# - Many concurrent operations (hundreds or thousands)
# - Using async libraries (aiohttp, asyncpg, aiofiles)
# - Example: Web server handling many requests

# Use THREADING when:
# - I/O-bound with blocking libraries (requests, standard file I/O)
# - Moderate concurrency (tens of operations)
# - Libraries don't support async
# - Example: Scraping with requests library

# Use MULTIPROCESSING when:
# - CPU-bound: calculations, data processing, image manipulation
# - Need true parallelism
# - Work can be split into independent chunks
# - Example: Processing large datasets, scientific computing

Real-World Example: API Client with Rate Limiting

Margaret showed him a production-ready async API client:

import asyncio
import aiohttp
from typing import List

class BookAPIClient:
    def __init__(self, base_url: str, max_concurrent: int = 10):
        self.base_url = base_url
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, *args):
        await self.session.close()

    async def fetch_book(self, isbn: str) -> dict:
        """Fetch single book with rate limiting"""
        async with self.semaphore:  # Limit concurrent requests
            async with self.session.get(
                f"{self.base_url}/books/{isbn}",
                timeout=aiohttp.ClientTimeout(total=5)
            ) as response:
                response.raise_for_status()
                return await response.json()

    async def fetch_many_books(self, isbns: List[str]) -> List[dict]:
        """Fetch multiple books concurrently"""
        tasks = [self.fetch_book(isbn) for isbn in isbns]
        return await asyncio.gather(
            *tasks,
            return_exceptions=True  # Don't stop on failures
        )

    async def search_books(self, query: str) -> List[dict]:
        """Search for books"""
        async with self.session.get(
            f"{self.base_url}/search",
            params={"q": query}
        ) as response:
            response.raise_for_status()
            return await response.json()

# Usage
async def main():
    isbns = [
        "9780441013593",
        "9780345391803",
        "9780765326355",
        "9780441569595",
        "9780765326362"
    ]

    async with BookAPIClient("https://api.library.com") as client:
        # Fetch all books concurrently (max 10 at once)
        books = await client.fetch_many_books(isbns)

        # Process results
        for isbn, result in zip(isbns, books):
            if isinstance(result, Exception):
                print(f"Failed to fetch {isbn}: {result}")
            else:
                print(f"Got book: {result['title']}")

asyncio.run(main())

"The semaphore limits concurrent requests," Margaret explained. "This prevents overwhelming the API server while still achieving high throughput. The context manager ensures cleanup even if errors occur."

Running Blocking Code in Async

She showed him how to integrate blocking libraries:

import asyncio
import requests  # Synchronous library

async def fetch_with_requests(url: str):
    """Use blocking library without freezing event loop"""
    loop = asyncio.get_event_loop()

    # Run blocking code in thread pool
    response = await loop.run_in_executor(
        None,  # Use default ThreadPoolExecutor
        requests.get,
        url
    )

    return response.json()

async def main():
    # Mix async and sync libraries safely
    results = await asyncio.gather(
        fetch_with_requests('https://api.example.com/1'),
        fetch_with_requests('https://api.example.com/2'),
        fetch_with_requests('https://api.example.com/3')
    )
    print(results)

asyncio.run(main())

"When you must use blocking libraries, wrap calls with run_in_executor()," she advised. "The event loop runs them in threads, keeping itself responsive."


The Takeaway

Timothy stood in the Concurrent Corridors, watching the event loop orchestrate hundreds of coroutines.

Async/await transforms waiting into working: When coroutines wait for I/O, the event loop switches to other work—achieving high concurrency in a single thread.

Coroutines are suspended computations: Async functions return coroutines that don't run until awaited or passed to the event loop.

The event loop is the scheduler: Switches between coroutines at await points—cooperative multitasking.

await is the suspension point: Where coroutines yield control, letting other work proceed.

Tasks vs coroutines: Tasks are scheduled immediately; coroutines need explicit awaiting.

asyncio.sleep() vs time.sleep(): Always use async versions or break concurrency entirely.

The GIL isn't a problem for I/O: Python releases the GIL during I/O waits, letting the event loop switch coroutines.

asyncio.gather() runs concurrently: All start at once, wait for all to complete.

asyncio.create_task() schedules background work: Task runs while you do other things.

Task cancellation prevents hangs: Use timeouts and manual cancellation for long-running operations.

asyncio.Queue for producer/consumer: Buffer work between coroutines running at different rates.

async with for async resources: Context managers that can await during setup/teardown.

async for iterates asynchronously: Process items from async sources (APIs, databases).

Jupyter has a running loop: Can't use asyncio.run(), just await directly.

Async doesn't help CPU-bound tasks: Use multiprocessing for computation-heavy work.

Semaphores limit concurrency: Prevent overwhelming external services with too many requests.

return_exceptions=True in gather: Continue despite errors, collect exceptions with results.

run_in_executor() for blocking code: Wrap synchronous libraries to avoid freezing the event loop.

asyncio.run() is the entry point: Creates loop, runs coroutine, cleans up—use it to start async programs in scripts.

Python's async/await Pattern

Timothy had discovered Python's approach to concurrency—the async/await pattern that transformed waiting time into productive time.

The Concurrent Corridors revealed that most programs spend their lives waiting—for network responses, database queries, file operations—and async let Python juggle hundreds or thousands of such operations simultaneously in a single thread.

He learned that async wasn't magic parallelism but cooperative multitasking where coroutines politely yielded control at await points, letting the event loop orchestrate the dance.

He understood when async shined (I/O-bound) and when it failed (CPU-bound), knew the critical difference between asyncio.sleep() and time.sleep(), and mastered patterns for timeouts, cancellation, and rate limiting,

Finally, Timothy understood why the GIL didn't prevent async concurrency, and recognized that modern Python's async ecosystem transformed slow, sequential programs into fast, concurrent systems—as long as you were waiting for something external, not crunching numbers internally.


Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.

Comments

Popular posts from this blog

The New ChatGPT Reason Feature: What It Is and Why You Should Use It

Raspberry Pi Connect vs. RealVNC: A Comprehensive Comparison

Insight: The Great Minimal OS Showdown—DietPi vs Raspberry Pi OS Lite