05_async_await.py

Download
python 411 lines 10.9 KB
  1"""
  2Asynchronous Programming with asyncio
  3
  4Demonstrates:
  5- async/await syntax
  6- asyncio.create_task
  7- asyncio.gather
  8- Concurrent execution
  9- Async generators
 10- asyncio.Queue
 11- Error handling in async code
 12"""
 13
 14import asyncio
 15import time
 16from typing import List, AsyncIterator
 17
 18
 19def section(title: str) -> None:
 20    """Print a section header."""
 21    print("\n" + "=" * 60)
 22    print(f"  {title}")
 23    print("=" * 60)
 24
 25
 26# =============================================================================
 27# Basic async/await
 28# =============================================================================
 29
 30section("Basic async/await")
 31
 32
 33async def simple_coroutine(name: str, delay: float) -> str:
 34    """Simple async function."""
 35    print(f"  {name}: Starting (delay={delay}s)")
 36    await asyncio.sleep(delay)
 37    print(f"  {name}: Finished")
 38    return f"Result from {name}"
 39
 40
 41async def basic_example():
 42    """Run basic async function."""
 43    result = await simple_coroutine("Task-A", 0.1)
 44    print(f"  Got result: {result}")
 45
 46
 47asyncio.run(basic_example())
 48
 49
 50# =============================================================================
 51# Concurrent Execution with create_task
 52# =============================================================================
 53
 54section("Concurrent Execution with create_task")
 55
 56
 57async def fetch_data(task_id: int, delay: float) -> dict:
 58    """Simulate fetching data from API."""
 59    print(f"  Task-{task_id}: Starting fetch (delay={delay:.2f}s)")
 60    await asyncio.sleep(delay)
 61    print(f"  Task-{task_id}: Fetch complete")
 62    return {"id": task_id, "data": f"Data-{task_id}"}
 63
 64
 65async def concurrent_tasks():
 66    """Run multiple tasks concurrently."""
 67    start = time.perf_counter()
 68
 69    # Create tasks
 70    task1 = asyncio.create_task(fetch_data(1, 0.2))
 71    task2 = asyncio.create_task(fetch_data(2, 0.15))
 72    task3 = asyncio.create_task(fetch_data(3, 0.1))
 73
 74    # Wait for all tasks
 75    result1 = await task1
 76    result2 = await task2
 77    result3 = await task3
 78
 79    elapsed = time.perf_counter() - start
 80
 81    print(f"\n  Results: {[result1, result2, result3]}")
 82    print(f"  Total time: {elapsed:.2f}s (concurrent)")
 83    print(f"  Sequential would take: {0.2 + 0.15 + 0.1:.2f}s")
 84
 85
 86asyncio.run(concurrent_tasks())
 87
 88
 89# =============================================================================
 90# asyncio.gather
 91# =============================================================================
 92
 93section("asyncio.gather - Run Multiple Coroutines")
 94
 95
 96async def download_file(file_id: int) -> str:
 97    """Simulate file download."""
 98    delay = 0.1 + (file_id % 3) * 0.05
 99    print(f"  Downloading file-{file_id}...")
100    await asyncio.sleep(delay)
101    return f"file-{file_id}.dat"
102
103
104async def gather_example():
105    """Use gather to run coroutines concurrently."""
106    start = time.perf_counter()
107
108    # gather runs all coroutines concurrently
109    results = await asyncio.gather(
110        download_file(1),
111        download_file(2),
112        download_file(3),
113        download_file(4),
114        download_file(5)
115    )
116
117    elapsed = time.perf_counter() - start
118
119    print(f"\n  Downloaded: {results}")
120    print(f"  Total time: {elapsed:.2f}s")
121
122
123asyncio.run(gather_example())
124
125
126# =============================================================================
127# Error Handling
128# =============================================================================
129
130section("Error Handling in Async Code")
131
132
133async def risky_operation(task_id: int) -> str:
134    """Operation that might fail."""
135    await asyncio.sleep(0.05)
136    if task_id == 2:
137        raise ValueError(f"Task-{task_id} failed!")
138    return f"Success-{task_id}"
139
140
141async def error_handling_example():
142    """Handle errors in async code."""
143    print("Without error handling:")
144    try:
145        results = await asyncio.gather(
146            risky_operation(1),
147            risky_operation(2),  # This will fail
148            risky_operation(3)
149        )
150    except ValueError as e:
151        print(f"  Caught exception: {e}")
152
153    print("\nWith return_exceptions=True:")
154    results = await asyncio.gather(
155        risky_operation(1),
156        risky_operation(2),  # This will fail
157        risky_operation(3),
158        return_exceptions=True
159    )
160
161    for i, result in enumerate(results, 1):
162        if isinstance(result, Exception):
163            print(f"  Task-{i}: ERROR - {result}")
164        else:
165            print(f"  Task-{i}: {result}")
166
167
168asyncio.run(error_handling_example())
169
170
171# =============================================================================
172# Async Generators
173# =============================================================================
174
175section("Async Generators")
176
177
178async def async_range(n: int) -> AsyncIterator[int]:
179    """Async generator yielding numbers."""
180    for i in range(n):
181        print(f"  Yielding {i}")
182        await asyncio.sleep(0.05)
183        yield i
184
185
186async def fetch_pages(num_pages: int) -> AsyncIterator[dict]:
187    """Simulate fetching pages from API."""
188    for page in range(1, num_pages + 1):
189        await asyncio.sleep(0.1)
190        yield {
191            "page": page,
192            "data": [f"item-{page}-{i}" for i in range(3)]
193        }
194
195
196async def async_generator_example():
197    """Use async generators."""
198    print("Async range:")
199    async for i in async_range(5):
200        print(f"  Received: {i}")
201
202    print("\nAsync page fetcher:")
203    async for page_data in fetch_pages(3):
204        print(f"  Page {page_data['page']}: {page_data['data']}")
205
206
207asyncio.run(async_generator_example())
208
209
210# =============================================================================
211# asyncio.Queue - Producer/Consumer
212# =============================================================================
213
214section("asyncio.Queue - Producer/Consumer Pattern")
215
216
217async def producer(queue: asyncio.Queue, producer_id: int, num_items: int):
218    """Produce items and put them in queue."""
219    for i in range(num_items):
220        item = f"P{producer_id}-Item{i}"
221        await asyncio.sleep(0.05)
222        await queue.put(item)
223        print(f"  Producer-{producer_id}: produced {item}")
224
225    await queue.put(None)  # Sentinel value
226
227
228async def consumer(queue: asyncio.Queue, consumer_id: int):
229    """Consume items from queue."""
230    while True:
231        item = await queue.get()
232
233        if item is None:
234            queue.task_done()
235            print(f"  Consumer-{consumer_id}: received sentinel, stopping")
236            break
237
238        print(f"  Consumer-{consumer_id}: processing {item}")
239        await asyncio.sleep(0.08)
240        queue.task_done()
241
242
243async def producer_consumer_example():
244    """Producer-consumer with asyncio.Queue."""
245    queue = asyncio.Queue(maxsize=5)
246
247    # Create producers and consumers
248    producers = [
249        asyncio.create_task(producer(queue, i, 3))
250        for i in range(2)
251    ]
252
253    consumers = [
254        asyncio.create_task(consumer(queue, i))
255        for i in range(2)
256    ]
257
258    # Wait for producers to finish
259    await asyncio.gather(*producers)
260
261    # Wait for queue to be processed
262    await queue.join()
263
264    # Cancel consumers (they're waiting on empty queue)
265    for c in consumers:
266        c.cancel()
267
268
269asyncio.run(producer_consumer_example())
270
271
272# =============================================================================
273# Timeouts
274# =============================================================================
275
276section("Timeouts with asyncio.wait_for")
277
278
279async def slow_operation():
280    """Slow operation that might timeout."""
281    print("  Starting slow operation...")
282    await asyncio.sleep(2.0)
283    return "Operation complete"
284
285
286async def timeout_example():
287    """Demonstrate timeout handling."""
288    try:
289        result = await asyncio.wait_for(slow_operation(), timeout=0.5)
290        print(f"  Result: {result}")
291    except asyncio.TimeoutError:
292        print("  Operation timed out after 0.5s")
293
294
295asyncio.run(timeout_example())
296
297
298# =============================================================================
299# Concurrent HTTP Requests (Simulated)
300# =============================================================================
301
302section("Simulated Concurrent HTTP Requests")
303
304
305async def http_get(url: str) -> dict:
306    """Simulate HTTP GET request."""
307    # Simulate network latency
308    delay = 0.1 + hash(url) % 10 / 100
309    await asyncio.sleep(delay)
310
311    return {
312        "url": url,
313        "status": 200,
314        "data": f"Content from {url}",
315        "time": delay
316    }
317
318
319async def fetch_all_urls(urls: List[str]) -> List[dict]:
320    """Fetch all URLs concurrently."""
321    tasks = [http_get(url) for url in urls]
322    return await asyncio.gather(*tasks)
323
324
325async def http_example():
326    """Fetch multiple URLs concurrently."""
327    urls = [
328        "https://example.com/api/users",
329        "https://example.com/api/posts",
330        "https://example.com/api/comments",
331        "https://example.com/api/photos",
332    ]
333
334    start = time.perf_counter()
335    results = await fetch_all_urls(urls)
336    elapsed = time.perf_counter() - start
337
338    print("  Fetch results:")
339    for result in results:
340        print(f"    {result['url']}: {result['status']} ({result['time']:.3f}s)")
341
342    print(f"\n  Total time: {elapsed:.2f}s (concurrent)")
343    print(f"  Sequential would take: {sum(r['time'] for r in results):.2f}s")
344
345
346asyncio.run(http_example())
347
348
349# =============================================================================
350# Running Synchronous Code
351# =============================================================================
352
353section("Running Synchronous Code with run_in_executor")
354
355
356def blocking_io_operation(n: int) -> str:
357    """Blocking I/O operation (sync function)."""
358    print(f"  Blocking operation {n} starting...")
359    time.sleep(0.1)
360    return f"Result-{n}"
361
362
363async def run_blocking_code():
364    """Run blocking code in executor."""
365    loop = asyncio.get_event_loop()
366
367    # Run blocking calls in thread pool
368    results = await asyncio.gather(
369        loop.run_in_executor(None, blocking_io_operation, 1),
370        loop.run_in_executor(None, blocking_io_operation, 2),
371        loop.run_in_executor(None, blocking_io_operation, 3)
372    )
373
374    print(f"  Results: {results}")
375
376
377asyncio.run(run_blocking_code())
378
379
380# =============================================================================
381# Summary
382# =============================================================================
383
384section("Summary")
385
386print("""
387Asyncio patterns covered:
3881. async/await - define and await coroutines
3892. asyncio.create_task - schedule concurrent execution
3903. asyncio.gather - run multiple coroutines, collect results
3914. Error handling - try/except and return_exceptions
3925. Async generators - async for and yield
3936. asyncio.Queue - producer/consumer pattern
3947. Timeouts - asyncio.wait_for
3958. run_in_executor - run blocking code without blocking event loop
396
397Benefits of asyncio:
398- Efficient I/O-bound concurrency
399- Single-threaded (no GIL issues)
400- Clean async/await syntax
401- Better resource utilization than threads for I/O
402
403Use asyncio for:
404- Network requests (HTTP, websockets)
405- Database queries
406- File I/O
407- Any I/O-bound operations
408
409Don't use for CPU-bound tasks (use multiprocessing instead).
410""")