1"""
2Asynchronous Programming with asyncio
3
4Demonstrates:
5- async/await syntax
6- asyncio.create_task
7- asyncio.gather
8- Concurrent execution
9- Async generators
10- asyncio.Queue
11- Error handling in async code
12"""
13
14import asyncio
15import time
16from typing import List, AsyncIterator
17
18
19def section(title: str) -> None:
20 """Print a section header."""
21 print("\n" + "=" * 60)
22 print(f" {title}")
23 print("=" * 60)
24
25
26# =============================================================================
27# Basic async/await
28# =============================================================================
29
30section("Basic async/await")
31
32
33async def simple_coroutine(name: str, delay: float) -> str:
34 """Simple async function."""
35 print(f" {name}: Starting (delay={delay}s)")
36 await asyncio.sleep(delay)
37 print(f" {name}: Finished")
38 return f"Result from {name}"
39
40
41async def basic_example():
42 """Run basic async function."""
43 result = await simple_coroutine("Task-A", 0.1)
44 print(f" Got result: {result}")
45
46
47asyncio.run(basic_example())
48
49
50# =============================================================================
51# Concurrent Execution with create_task
52# =============================================================================
53
54section("Concurrent Execution with create_task")
55
56
57async def fetch_data(task_id: int, delay: float) -> dict:
58 """Simulate fetching data from API."""
59 print(f" Task-{task_id}: Starting fetch (delay={delay:.2f}s)")
60 await asyncio.sleep(delay)
61 print(f" Task-{task_id}: Fetch complete")
62 return {"id": task_id, "data": f"Data-{task_id}"}
63
64
65async def concurrent_tasks():
66 """Run multiple tasks concurrently."""
67 start = time.perf_counter()
68
69 # Create tasks
70 task1 = asyncio.create_task(fetch_data(1, 0.2))
71 task2 = asyncio.create_task(fetch_data(2, 0.15))
72 task3 = asyncio.create_task(fetch_data(3, 0.1))
73
74 # Wait for all tasks
75 result1 = await task1
76 result2 = await task2
77 result3 = await task3
78
79 elapsed = time.perf_counter() - start
80
81 print(f"\n Results: {[result1, result2, result3]}")
82 print(f" Total time: {elapsed:.2f}s (concurrent)")
83 print(f" Sequential would take: {0.2 + 0.15 + 0.1:.2f}s")
84
85
86asyncio.run(concurrent_tasks())
87
88
89# =============================================================================
90# asyncio.gather
91# =============================================================================
92
93section("asyncio.gather - Run Multiple Coroutines")
94
95
96async def download_file(file_id: int) -> str:
97 """Simulate file download."""
98 delay = 0.1 + (file_id % 3) * 0.05
99 print(f" Downloading file-{file_id}...")
100 await asyncio.sleep(delay)
101 return f"file-{file_id}.dat"
102
103
104async def gather_example():
105 """Use gather to run coroutines concurrently."""
106 start = time.perf_counter()
107
108 # gather runs all coroutines concurrently
109 results = await asyncio.gather(
110 download_file(1),
111 download_file(2),
112 download_file(3),
113 download_file(4),
114 download_file(5)
115 )
116
117 elapsed = time.perf_counter() - start
118
119 print(f"\n Downloaded: {results}")
120 print(f" Total time: {elapsed:.2f}s")
121
122
123asyncio.run(gather_example())
124
125
126# =============================================================================
127# Error Handling
128# =============================================================================
129
130section("Error Handling in Async Code")
131
132
133async def risky_operation(task_id: int) -> str:
134 """Operation that might fail."""
135 await asyncio.sleep(0.05)
136 if task_id == 2:
137 raise ValueError(f"Task-{task_id} failed!")
138 return f"Success-{task_id}"
139
140
141async def error_handling_example():
142 """Handle errors in async code."""
143 print("Without error handling:")
144 try:
145 results = await asyncio.gather(
146 risky_operation(1),
147 risky_operation(2), # This will fail
148 risky_operation(3)
149 )
150 except ValueError as e:
151 print(f" Caught exception: {e}")
152
153 print("\nWith return_exceptions=True:")
154 results = await asyncio.gather(
155 risky_operation(1),
156 risky_operation(2), # This will fail
157 risky_operation(3),
158 return_exceptions=True
159 )
160
161 for i, result in enumerate(results, 1):
162 if isinstance(result, Exception):
163 print(f" Task-{i}: ERROR - {result}")
164 else:
165 print(f" Task-{i}: {result}")
166
167
168asyncio.run(error_handling_example())
169
170
171# =============================================================================
172# Async Generators
173# =============================================================================
174
175section("Async Generators")
176
177
178async def async_range(n: int) -> AsyncIterator[int]:
179 """Async generator yielding numbers."""
180 for i in range(n):
181 print(f" Yielding {i}")
182 await asyncio.sleep(0.05)
183 yield i
184
185
186async def fetch_pages(num_pages: int) -> AsyncIterator[dict]:
187 """Simulate fetching pages from API."""
188 for page in range(1, num_pages + 1):
189 await asyncio.sleep(0.1)
190 yield {
191 "page": page,
192 "data": [f"item-{page}-{i}" for i in range(3)]
193 }
194
195
196async def async_generator_example():
197 """Use async generators."""
198 print("Async range:")
199 async for i in async_range(5):
200 print(f" Received: {i}")
201
202 print("\nAsync page fetcher:")
203 async for page_data in fetch_pages(3):
204 print(f" Page {page_data['page']}: {page_data['data']}")
205
206
207asyncio.run(async_generator_example())
208
209
210# =============================================================================
211# asyncio.Queue - Producer/Consumer
212# =============================================================================
213
214section("asyncio.Queue - Producer/Consumer Pattern")
215
216
217async def producer(queue: asyncio.Queue, producer_id: int, num_items: int):
218 """Produce items and put them in queue."""
219 for i in range(num_items):
220 item = f"P{producer_id}-Item{i}"
221 await asyncio.sleep(0.05)
222 await queue.put(item)
223 print(f" Producer-{producer_id}: produced {item}")
224
225 await queue.put(None) # Sentinel value
226
227
228async def consumer(queue: asyncio.Queue, consumer_id: int):
229 """Consume items from queue."""
230 while True:
231 item = await queue.get()
232
233 if item is None:
234 queue.task_done()
235 print(f" Consumer-{consumer_id}: received sentinel, stopping")
236 break
237
238 print(f" Consumer-{consumer_id}: processing {item}")
239 await asyncio.sleep(0.08)
240 queue.task_done()
241
242
243async def producer_consumer_example():
244 """Producer-consumer with asyncio.Queue."""
245 queue = asyncio.Queue(maxsize=5)
246
247 # Create producers and consumers
248 producers = [
249 asyncio.create_task(producer(queue, i, 3))
250 for i in range(2)
251 ]
252
253 consumers = [
254 asyncio.create_task(consumer(queue, i))
255 for i in range(2)
256 ]
257
258 # Wait for producers to finish
259 await asyncio.gather(*producers)
260
261 # Wait for queue to be processed
262 await queue.join()
263
264 # Cancel consumers (they're waiting on empty queue)
265 for c in consumers:
266 c.cancel()
267
268
269asyncio.run(producer_consumer_example())
270
271
272# =============================================================================
273# Timeouts
274# =============================================================================
275
276section("Timeouts with asyncio.wait_for")
277
278
279async def slow_operation():
280 """Slow operation that might timeout."""
281 print(" Starting slow operation...")
282 await asyncio.sleep(2.0)
283 return "Operation complete"
284
285
286async def timeout_example():
287 """Demonstrate timeout handling."""
288 try:
289 result = await asyncio.wait_for(slow_operation(), timeout=0.5)
290 print(f" Result: {result}")
291 except asyncio.TimeoutError:
292 print(" Operation timed out after 0.5s")
293
294
295asyncio.run(timeout_example())
296
297
298# =============================================================================
299# Concurrent HTTP Requests (Simulated)
300# =============================================================================
301
302section("Simulated Concurrent HTTP Requests")
303
304
305async def http_get(url: str) -> dict:
306 """Simulate HTTP GET request."""
307 # Simulate network latency
308 delay = 0.1 + hash(url) % 10 / 100
309 await asyncio.sleep(delay)
310
311 return {
312 "url": url,
313 "status": 200,
314 "data": f"Content from {url}",
315 "time": delay
316 }
317
318
319async def fetch_all_urls(urls: List[str]) -> List[dict]:
320 """Fetch all URLs concurrently."""
321 tasks = [http_get(url) for url in urls]
322 return await asyncio.gather(*tasks)
323
324
325async def http_example():
326 """Fetch multiple URLs concurrently."""
327 urls = [
328 "https://example.com/api/users",
329 "https://example.com/api/posts",
330 "https://example.com/api/comments",
331 "https://example.com/api/photos",
332 ]
333
334 start = time.perf_counter()
335 results = await fetch_all_urls(urls)
336 elapsed = time.perf_counter() - start
337
338 print(" Fetch results:")
339 for result in results:
340 print(f" {result['url']}: {result['status']} ({result['time']:.3f}s)")
341
342 print(f"\n Total time: {elapsed:.2f}s (concurrent)")
343 print(f" Sequential would take: {sum(r['time'] for r in results):.2f}s")
344
345
346asyncio.run(http_example())
347
348
349# =============================================================================
350# Running Synchronous Code
351# =============================================================================
352
353section("Running Synchronous Code with run_in_executor")
354
355
356def blocking_io_operation(n: int) -> str:
357 """Blocking I/O operation (sync function)."""
358 print(f" Blocking operation {n} starting...")
359 time.sleep(0.1)
360 return f"Result-{n}"
361
362
363async def run_blocking_code():
364 """Run blocking code in executor."""
365 loop = asyncio.get_event_loop()
366
367 # Run blocking calls in thread pool
368 results = await asyncio.gather(
369 loop.run_in_executor(None, blocking_io_operation, 1),
370 loop.run_in_executor(None, blocking_io_operation, 2),
371 loop.run_in_executor(None, blocking_io_operation, 3)
372 )
373
374 print(f" Results: {results}")
375
376
377asyncio.run(run_blocking_code())
378
379
380# =============================================================================
381# Summary
382# =============================================================================
383
384section("Summary")
385
386print("""
387Asyncio patterns covered:
3881. async/await - define and await coroutines
3892. asyncio.create_task - schedule concurrent execution
3903. asyncio.gather - run multiple coroutines, collect results
3914. Error handling - try/except and return_exceptions
3925. Async generators - async for and yield
3936. asyncio.Queue - producer/consumer pattern
3947. Timeouts - asyncio.wait_for
3958. run_in_executor - run blocking code without blocking event loop
396
397Benefits of asyncio:
398- Efficient I/O-bound concurrency
399- Single-threaded (no GIL issues)
400- Clean async/await syntax
401- Better resource utilization than threads for I/O
402
403Use asyncio for:
404- Network requests (HTTP, websockets)
405- Database queries
406- File I/O
407- Any I/O-bound operations
408
409Don't use for CPU-bound tasks (use multiprocessing instead).
410""")