The Async Iterator Part 2: Streaming Data and Real-World Patterns
The next morning, Timothy arrived at the library early, eager to apply what he'd learned about async iteration. He'd spent the evening refactoring his log analyzer, and now he wanted to tackle something more ambitious: building a real-time dashboard that streamed library statistics.
"Margaret, I've got async iteration working," Timothy said, pulling up his screen. "But now I'm trying to do something practical, and I keep running into problems."
Margaret looked over with interest. "Show me what you're working on."
Timothy pulled up his code:
import asyncio
async def get_checkouts():
"""Get recent checkouts"""
checkouts = []
async for record in read_checkout_log():
checkouts.append(record)
return checkouts
async def get_returns():
"""Get recent returns"""
returns = []
async for record in read_return_log():
returns.append(record)
return returns
async def get_active_users():
"""Get active users"""
users = []
async for user in read_active_sessions():
users.append(user)
return users
"This works," Timothy said, "but it feels clunky. I'm writing the same pattern over and over—create a list, iterate, append. There must be a better way."
Margaret smiled. "There is. Let me introduce you to async comprehensions."
Async Comprehensions: Syntactic Sugar for Async Iteration
"Remember list comprehensions?" Margaret asked, typing:
# Regular list comprehension
numbers = [1, 2, 3, 4, 5]
squares = [n ** 2 for n in numbers]
print(squares) # [1, 4, 9, 16, 25]
"Of course," Timothy said. "They're more elegant than building lists with loops."
"Async comprehensions are the same concept, but for async iterators," Margaret explained. She refactored his code:
import asyncio
async def read_checkout_log():
"""Simulate reading checkout records"""
for i in range(5):
await asyncio.sleep(0.1)
yield {"id": i, "book": f"Book_{i}", "user": f"User_{i}"}
async def read_return_log():
"""Simulate reading return records"""
for i in range(3):
await asyncio.sleep(0.1)
yield {"id": i, "book": f"Book_{i}"}
# OLD WAY: Manual iteration
async def get_checkouts_old():
checkouts = []
async for record in read_checkout_log():
checkouts.append(record)
return checkouts
# NEW WAY: Async list comprehension
async def get_checkouts_new():
return [record async for record in read_checkout_log()]
async def demo():
print("Old way:")
result1 = await get_checkouts_old()
print(f" Got {len(result1)} checkouts")
print("\nNew way:")
result2 = await get_checkouts_new()
print(f" Got {len(result2)} checkouts")
asyncio.run(demo())
Output:
Old way:
Got 5 checkouts
New way:
Got 5 checkouts
"Just add async before the for!" Timothy said. "That's much cleaner."
"Right. The syntax is [expr async for item in async_iterator]," Margaret said. "You can also use filtering:"
import asyncio
async def read_checkout_log():
"""Generate checkout records"""
records = [
{"id": 1, "book": "Python Guide", "user": "Alice", "overdue": False},
{"id": 2, "book": "Async Patterns", "user": "Bob", "overdue": True},
{"id": 3, "book": "Data Structures", "user": "Carol", "overdue": False},
{"id": 4, "book": "Algorithms", "user": "Dave", "overdue": True},
]
for record in records:
await asyncio.sleep(0.1)
yield record
async def get_overdue_books():
"""Get only overdue books using filtering"""
return [
record['book']
async for record in read_checkout_log()
if record['overdue']
]
async def demo():
overdue = await get_overdue_books()
print(f"Overdue books: {overdue}")
asyncio.run(demo())
Output:
Overdue books: ['Async Patterns', 'Algorithms']
"You can filter right in the comprehension!" Timothy said.
All Three Comprehension Types
Margaret showed him the complete picture:
import asyncio
async def generate_records():
"""Generate sample records"""
for i in range(5):
await asyncio.sleep(0.05)
yield {"id": i, "value": i * 10, "category": "even" if i % 2 == 0 else "odd"}
async def demo_all_comprehensions():
# Async LIST comprehension
values_list = [r['value'] async for r in generate_records()]
print(f"List: {values_list}")
# Async SET comprehension
categories_set = {r['category'] async for r in generate_records()}
print(f"Set: {categories_set}")
# Async DICT comprehension
id_to_value = {r['id']: r['value'] async for r in generate_records()}
print(f"Dict: {id_to_value}")
asyncio.run(demo_all_comprehensions())
Output:
List: [0, 10, 20, 30, 40]
Set: {'even', 'odd'}
Dict: {0: 0, 1: 10, 2: 20, 3: 30, 4: 40}
"Just like regular comprehensions," Margaret explained, "you have list, set, and dict versions. The syntax is always async for inside the comprehension."
She wrote out the patterns:
Async Comprehension Syntax:
List: [expr async for item in async_iter]
Set: {expr async for item in async_iter}
Dict: {key: val async for item in async_iter}
With filtering:
[expr async for item in async_iter if condition]
Regular comprehensions: for item in iter
Async comprehensions: async for item in async_iter
^^^^^ just add async!
When NOT to Use Async Comprehensions
"One warning," Margaret said. "Async comprehensions build the entire result in memory. If you're processing a huge stream of data, this can be a problem."
She demonstrated:
import asyncio
async def huge_dataset():
"""Simulate a massive dataset"""
for i in range(1_000_000):
await asyncio.sleep(0) # Yield control
yield i
# BAD: Builds entire list in memory
async def process_all_at_once():
print("Loading everything into memory...")
# This creates a list with 1 million items!
all_data = [x async for x in huge_dataset()]
print(f"Loaded {len(all_data)} items")
return sum(all_data)
# GOOD: Process one item at a time
async def process_streaming():
print("Processing as a stream...")
total = 0
count = 0
async for x in huge_dataset():
total += x
count += 1
print(f"Processed {count} items")
return total
# Both get the same result, but streaming uses constant memory
"The comprehension forces you to wait for all data before processing," Margaret explained. "Sometimes you want that—when you need the complete dataset. But for large streams, iterate directly with async for."
Timothy nodded. "So comprehensions are for collecting finite datasets, but direct iteration is for processing streams."
"Exactly. Think about it like regular Python: you wouldn't load a 10GB file into a list comprehension. Same principle applies here."
Real Async File I/O with aiofiles
"Let's move to something more practical," Margaret said. "In Part 1, I mentioned that Python's built-in open() blocks the event loop. Let me show you the real solution."
She opened a terminal and typed:
pip install aiofiles
Then she wrote:
import asyncio
import aiofiles
async def read_log_async():
"""Demonstrates async file I/O"""
print("Setting up demo file...")
# Create a sample file (for demo purposes)
async with aiofiles.open("sample.log", "w") as f:
await f.write("Line 1\nLine 2\nLine 3\n")
print("Reading with async I/O...")
# Read asynchronously - doesn't block event loop!
async with aiofiles.open("sample.log", "r") as f:
async for line in f:
print(f" {line.strip()}")
async def other_task():
"""Runs concurrently"""
for i in range(3):
print(f"[Background task {i}]")
await asyncio.sleep(0.1)
async def demo():
# Run file reading alongside other work
await asyncio.gather(
read_log_async(),
other_task()
)
asyncio.run(demo())
Output:
Setting up demo file...
Reading with async I/O...
[Background task 0]
Line 1
Line 2
[Background task 1]
Line 3
[Background task 2]
"See how the background task runs during file reading?" Margaret pointed out. "With regular open(), the entire event loop would block on the file I/O, and the background task couldn't run until reading was complete."
"The key differences," she continued, typing out a comparison:
Regular File I/O vs Async File I/O:
Regular (blocks event loop):
with open(path, 'r') as f:
for line in f:
process(line)
Async (cooperates with event loop):
async with aiofiles.open(path, 'r') as f:
async for line in f:
await process(line)
Differences:
1. async with instead of with
2. aiofiles.open() instead of open()
3. async for instead of for
4. Can await inside the loop
Async Context Managers + Async Iteration
Timothy studied the code. "I see async with and async for together. How does that work?"
"Great observation," Margaret said. "Let me explain async context managers first, then show how they combine with async iteration."
She typed:
import asyncio
class DatabaseConnection:
"""Simulated async database connection"""
async def __aenter__(self):
"""Called when entering 'async with' block"""
print(" Opening database connection...")
await asyncio.sleep(0.1) # Simulate connection time
self.connected = True
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Called when exiting 'async with' block"""
print(" Closing database connection...")
await asyncio.sleep(0.1) # Simulate cleanup
self.connected = False
async def query(self, sql):
"""Execute a query"""
if not self.connected:
raise RuntimeError("Not connected!")
await asyncio.sleep(0.05)
# Simulate returning results
return [f"Result {i}" for i in range(3)]
async def demo():
# Async context manager ensures cleanup
async with DatabaseConnection() as db:
print("Inside async with block")
results = await db.query("SELECT * FROM books")
print(f"Got {len(results)} results")
print("Outside async with block - connection closed")
asyncio.run(demo())
Output:
Opening database connection...
Inside async with block
Got 3 results
Closing database connection...
Outside async with block - connection closed
"An async context manager uses async with and must define __aenter__ and __aexit__ as coroutines," Margaret explained. "It's like a regular context manager, but async."
She drew a diagram:
Regular Context Manager:
with resource():
use()
# Cleanup happens here
Async Context Manager:
async with resource():
await use()
# Async cleanup happens here
Both guarantee cleanup, but async version can await during setup/teardown
Combining Async Context Managers with Async Iteration
"Now here's where it gets powerful," Margaret said. "You can combine async with and async for for safe streaming."
import asyncio
class DatabaseCursor:
"""Async database cursor with iteration"""
def __init__(self):
self.position = 0
self.closed = False
async def __aenter__(self):
"""Set up cursor"""
print(" Opening cursor...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Clean up cursor"""
print(" Closing cursor...")
self.closed = True
await asyncio.sleep(0.1)
def __aiter__(self):
"""Return self as async iterator"""
return self
async def __anext__(self):
"""Fetch next row"""
if self.closed:
raise RuntimeError("Cursor is closed!")
if self.position >= 5:
raise StopAsyncIteration
await asyncio.sleep(0.05) # Simulate fetch time
row = {"id": self.position, "title": f"Book {self.position}"}
self.position += 1
return row
async def fetch_books():
"""Fetch books using cursor"""
# Both context manager AND iterator!
async with DatabaseCursor() as cursor:
print("Fetching books:")
async for book in cursor:
print(f" {book['title']}")
print("Cursor automatically closed")
asyncio.run(fetch_books())
Output:
Opening cursor...
Fetching books:
Book 0
Book 1
Book 2
Book 3
Book 4
Closing cursor...
Cursor automatically closed
"This pattern is incredibly common with databases," Margaret said. "The cursor is both a context manager (for resource cleanup) and an async iterator (for fetching rows)."
Timothy was impressed. "So async with ensures the cursor gets closed, and async for streams the results?"
"Exactly. Even if an error occurs during iteration, __aexit__ will be called to clean up the cursor."
Error Handling in Async Iteration
"Speaking of errors," Timothy said, "what happens if an async iterator fails partway through?"
"Excellent question," Margaret said. "Let me show you."
import asyncio
async def flaky_data_source():
"""Iterator that fails after a few items"""
for i in range(5):
await asyncio.sleep(0.1)
if i == 3:
raise ValueError(f"Database error at record {i}")
yield {"id": i, "data": f"Record {i}"}
async def process_with_error_handling():
"""Handle errors during iteration"""
print("Processing with error handling:")
processed = 0
try:
async for record in flaky_data_source():
print(f" Processing: {record['data']}")
processed += 1
except ValueError as e:
print(f" ERROR: {e}")
print(f" Successfully processed {processed} records before error")
print(f"Finished (processed {processed} records)")
asyncio.run(process_with_error_handling())
Output:
Processing with error handling:
Processing: Record 0
Processing: Record 1
Processing: Record 2
ERROR: Database error at record 3
Successfully processed 3 records before error
Finished (processed 3 records)
"The async for loop stops when an exception is raised," Margaret explained. "You handle it the same way as regular iteration—with try/except."
She showed a more robust pattern:
import asyncio
async def retry_data_source():
"""Iterator with retry logic"""
for i in range(5):
await asyncio.sleep(0.1)
yield {"id": i, "data": f"Record {i}"}
async def process_with_recovery():
"""Process with automatic recovery"""
print("Processing with recovery:")
max_retries = 3
retry_count = 0
processed = 0
while retry_count < max_retries:
try:
async for record in retry_data_source():
print(f" Processing: {record['data']}")
processed += 1
# Success - break out of retry loop
break
except Exception as e:
retry_count += 1
print(f" ERROR: {e}")
if retry_count < max_retries:
print(f" Retrying... (attempt {retry_count + 1}/{max_retries})")
await asyncio.sleep(1) # Backoff
else:
print(f" Max retries reached. Processed {processed} records.")
raise
asyncio.run(process_with_recovery())
"This pattern is common for network requests or database queries that might fail transiently," Margaret said.
Cleanup on Error
"One critical point," Margaret added. "When an exception occurs in an async for loop, Python automatically calls aclose() on the generator."
import asyncio
async def generator_with_cleanup():
"""Generator that needs cleanup"""
print(" [Generator] Starting")
try:
for i in range(5):
await asyncio.sleep(0.1)
if i == 2:
raise ValueError("Simulated error")
yield i
finally:
print(" [Generator] Cleanup running (finally block)")
async def demo_cleanup_on_error():
print("Demo: Cleanup on error")
try:
async for value in generator_with_cleanup():
print(f" Got value: {value}")
except ValueError as e:
print(f" Caught error: {e}")
print("Finished")
asyncio.run(demo_cleanup_on_error())
Output:
Demo: Cleanup on error
[Generator] Starting
Got value: 0
Got value: 1
[Generator] Cleanup running (finally block)
Caught error: Simulated error
Finished
"See how the finally block runs even though we raised an error?" Margaret pointed out. "Python ensures cleanup happens. This is why finally blocks in generators are perfect for closing files, releasing locks, or cleaning up resources."
"One important detail," Margaret added. "If your cleanup code needs to await anything—like closing a network connection or flushing a buffer—your generator must be an async def. The finally block can contain await statements because the whole generator is a coroutine."
She showed an example:
import asyncio
async def acquire_async_resource():
"""Simulated async resource acquisition"""
await asyncio.sleep(0.1)
return {"name": "DatabaseConnection", "close": lambda: asyncio.sleep(0.1)}
async def generator_with_async_cleanup():
"""Cleanup that requires await"""
print(" Acquiring resource")
resource = await acquire_async_resource()
try:
yield "data"
finally:
# This await works because we're in async def
print(" Closing resource...")
await asyncio.sleep(0.1)
print(" Resource closed")
async def demo():
async for item in generator_with_async_cleanup():
print(f" Got: {item}")
asyncio.run(demo())
Output:
Acquiring resource
Got: data
Closing resource...
Resource closed
"If this were a regular generator (def instead of async def), you couldn't use await in the finally block," Margaret explained. "The async def is what allows the cleanup to be asynchronous."
Streaming API Pagination
"Let's tackle a real-world problem," Margaret said. "Imagine our library has an API that returns paginated results. You want to iterate over all books, but the API only gives you 10 at a time."
import asyncio
async def fetch_page(page_number, page_size=10):
"""Simulate fetching a page from an API"""
await asyncio.sleep(0.2) # Simulate network delay
# Simulate total of 35 books
total_books = 35
start = page_number * page_size
end = min(start + page_size, total_books)
if start >= total_books:
return []
return [
{"id": i, "title": f"Book {i}"}
for i in range(start, end)
]
async def paginated_books():
"""Async generator that handles pagination"""
page = 0
while True:
books = await fetch_page(page)
if not books:
break # No more pages
for book in books:
yield book
page += 1
async def demo_pagination():
print("Fetching all books via pagination:")
count = 0
async for book in paginated_books():
count += 1
print(f" {book['title']}")
print(f"\nTotal books fetched: {count}")
asyncio.run(demo_pagination())
Output:
Fetching all books via pagination:
Book 0
Book 1
Book 2
...
Book 33
Book 34
Total books fetched: 35
"The consumer doesn't know about pagination," Margaret explained. "They just use async for and get all the books. The generator handles the complexity of fetching multiple pages."
Timothy grinned. "That's way cleaner than managing page numbers in the calling code."
"Right. This is a common pattern for REST APIs, database queries with cursors, or any paginated data source."
Buffering and Prefetching
"One more advanced pattern," Margaret said. "Sometimes you want to fetch data ahead of time to minimize waiting."
import asyncio
from collections import deque
async def slow_data_source():
"""Simulates a slow data source"""
for i in range(10):
await asyncio.sleep(0.5) # Slow!
yield i
async def buffered_iterator(source, buffer_size=3):
"""Pre-fetch items into a buffer"""
buffer = deque()
iterator = source.__aiter__()
# Pre-fill buffer
print(f"Pre-filling buffer with {buffer_size} items...")
for _ in range(buffer_size):
try:
item = await iterator.__anext__()
buffer.append(item)
except StopAsyncIteration:
break
# Yield from buffer and refill
while buffer:
yield buffer.popleft()
# Try to refill
try:
item = await iterator.__anext__()
buffer.append(item)
except StopAsyncIteration:
pass
async def demo_buffering():
print("Without buffering (slow):")
start = asyncio.get_event_loop().time()
count = 0
async for item in slow_data_source():
count += 1
if count >= 3:
break
elapsed = asyncio.get_event_loop().time() - start
print(f" Got 3 items in {elapsed:.1f}s\n")
print("With buffering (faster):")
start = asyncio.get_event_loop().time()
count = 0
async for item in buffered_iterator(slow_data_source(), buffer_size=3):
count += 1
if count >= 3:
break
elapsed = asyncio.get_event_loop().time() - start
print(f" Got 3 items in {elapsed:.1f}s")
asyncio.run(demo_buffering())
"Buffering can significantly improve throughput when processing slow streams," Margaret explained. "You're essentially pipelining the fetching and processing."
"This demonstrates the buffering concept," Margaret added. "In production, you might use libraries like aiostream that provide pre-built buffering utilities. But understanding how to build it yourself helps you know what's happening under the hood."
Testing Async Iterators
As they wrapped up, Timothy asked, "How do I test these things? My usual testing approaches don't work with async code."
"Good question," Margaret said. "Let me show you pytest with async support."
# test_async_iterators.py
import pytest
import asyncio
async def sample_generator():
"""Generator to test"""
for i in range(3):
await asyncio.sleep(0.01)
yield i * 2
@pytest.mark.asyncio
async def test_async_generator():
"""Test async generator output"""
results = []
async for value in sample_generator():
results.append(value)
assert results == [0, 2, 4]
@pytest.mark.asyncio
async def test_async_comprehension():
"""Test async comprehension"""
results = [v async for v in sample_generator()]
assert results == [0, 2, 4]
@pytest.mark.asyncio
async def test_error_handling():
"""Test error handling in async iteration"""
async def failing_generator():
yield 1
raise ValueError("Test error")
with pytest.raises(ValueError):
async for _ in failing_generator():
pass
"Install pytest-asyncio and mark your tests with @pytest.mark.asyncio," Margaret said. "Then write tests like normal async functions."
The Pattern Library
Margaret summarized the patterns they'd covered on a whiteboard:
Async Iterator Patterns:
1. Async Comprehensions
[x async for x in source() if condition]
- Clean syntax for collecting results
- Loads everything into memory
- Use for finite datasets
2. Direct Iteration
async for x in source():
await process(x)
- Constant memory usage
- Use for large streams
- Process one item at a time
3. Context Manager + Iterator
async with resource() as r:
async for item in r:
await process(item)
- Guaranteed cleanup
- Common with databases, files, connections
4. Pagination Pattern
async def paginated():
page = 0
while True:
batch = await fetch(page)
if not batch: break
for item in batch:
yield item
page += 1
- Hides pagination complexity
- Consumer sees flat stream
5. Buffering/Prefetching
- Pre-fetch items to minimize waiting
- Improves throughput for slow sources
- More complex but can be worth it
6. Error Handling
try:
async for item in source():
await process(item)
except Exception:
# Handle or retry
- Same as regular iteration
- Cleanup happens automatically
The Takeaway
Timothy closed his laptop, now equipped with practical patterns for real-world async iteration.
Async comprehensions provide clean syntax: Just add async before for in list/set/dict comprehensions.
Comprehensions load everything into memory: Use direct async for for large streams instead.
aiofiles provides real async file I/O: Python's built-in open() blocks the event loop.
async with + async for is a powerful combination: Context managers ensure cleanup, iterators stream data.
Async context managers use aenter and aexit: Both must be coroutines.
Error handling works the same as regular iteration: Use try/except around async for loops.
Python calls aclose() automatically on errors: Generator cleanup happens even when iteration fails.
finally blocks can contain await in async generators: Because the entire generator is a coroutine.
Pagination pattern hides API complexity: Consumers see a flat stream, generator handles pages.
Buffering can improve throughput: Pre-fetch items to minimize waiting between items.
Libraries like aiostream provide pre-built patterns: But building your own helps you understand the mechanics.
pytest-asyncio enables async test cases: Mark tests with @pytest.mark.asyncio.
finally blocks ensure cleanup: Use them for releasing resources in generators.
Choose the right pattern for your use case: Comprehensions for collecting, direct iteration for streaming, buffering for performance.
Margaret and Timothy had covered the essential patterns for production async iteration. Timothy's dashboard was now streaming library statistics efficiently, his file processing no longer blocked the event loop, and he understood how to handle errors gracefully in async code.
The library was quiet in the afternoon. As Timothy packed up, he realized that async iteration wasn't just about syntax—it was about choosing the right pattern for each situation, understanding when to collect versus stream, and ensuring resources were properly managed even when things went wrong.
Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.
.jpeg)

Comments
Post a Comment