1"""
2Asyncio Demonstration
3
4Covers:
51. Basic async/await
62. Concurrent task execution
73. Gathering results
84. Semaphores for rate limiting
95. Error handling in async code
106. Async context managers
117. Practical examples (HTTP-like requests, data processing)
12"""
13
14import asyncio
15import time
16import random
17from typing import List, Dict, Any
18from dataclasses import dataclass
19from enum import Enum
20
21
22# =============================================================================
23# 1. BASIC ASYNC/AWAIT
24# =============================================================================
25
26print("=" * 70)
27print("1. BASIC ASYNC/AWAIT")
28print("=" * 70)
29
30
31async def simple_coroutine(name: str, delay: float) -> str:
32 """Simple async function"""
33 print(f"[{name}] Starting (delay: {delay:.2f}s)...")
34 await asyncio.sleep(delay) # Non-blocking sleep
35 print(f"[{name}] Finished!")
36 return f"{name} completed"
37
38
39async def demonstrate_basic_async():
40 """Basic async/await demonstration"""
41 print("\n[BASIC ASYNC]")
42 print("-" * 50)
43
44 # Sequential execution (still async, but one at a time)
45 print("Sequential:")
46 start = time.time()
47 result1 = await simple_coroutine("Task-1", 0.3)
48 result2 = await simple_coroutine("Task-2", 0.2)
49 print(f"Results: {result1}, {result2}")
50 print(f"Time: {time.time() - start:.2f}s\n")
51
52 # Concurrent execution
53 print("Concurrent:")
54 start = time.time()
55 results = await asyncio.gather(
56 simple_coroutine("Task-A", 0.3),
57 simple_coroutine("Task-B", 0.2),
58 simple_coroutine("Task-C", 0.25)
59 )
60 print(f"Results: {results}")
61 print(f"Time: {time.time() - start:.2f}s (much faster!)")
62
63
64# =============================================================================
65# 2. SIMULATED HTTP REQUESTS
66# =============================================================================
67
68print("\n" + "=" * 70)
69print("2. SIMULATED HTTP REQUESTS")
70print("=" * 70)
71
72
73class ResponseStatus(Enum):
74 """HTTP response status"""
75 SUCCESS = 200
76 NOT_FOUND = 404
77 ERROR = 500
78
79
80@dataclass
81class Response:
82 """Simulated HTTP response"""
83 url: str
84 status: ResponseStatus
85 data: Any
86 duration: float
87
88
89async def fetch_url(url: str, delay: float = None) -> Response:
90 """
91 Simulate fetching a URL.
92 In real code, use aiohttp library.
93 """
94 if delay is None:
95 delay = random.uniform(0.1, 0.5)
96
97 print(f"Fetching {url}...")
98 start = time.time()
99
100 await asyncio.sleep(delay) # Simulate network delay
101
102 # Simulate occasional errors
103 if random.random() < 0.1:
104 status = ResponseStatus.ERROR
105 data = {"error": "Server error"}
106 else:
107 status = ResponseStatus.SUCCESS
108 data = {"url": url, "content": f"Content from {url}"}
109
110 duration = time.time() - start
111 print(f"Fetched {url} in {duration:.2f}s")
112
113 return Response(url, status, data, duration)
114
115
116async def demonstrate_concurrent_requests():
117 """Demonstrate concurrent HTTP-like requests"""
118 print("\n[CONCURRENT REQUESTS]")
119 print("-" * 50)
120
121 urls = [
122 "https://api.example.com/users",
123 "https://api.example.com/posts",
124 "https://api.example.com/comments",
125 "https://api.example.com/photos",
126 "https://api.example.com/todos",
127 ]
128
129 # Fetch all URLs concurrently
130 start = time.time()
131 responses = await asyncio.gather(*[fetch_url(url) for url in urls])
132 total_time = time.time() - start
133
134 # Process results
135 successful = [r for r in responses if r.status == ResponseStatus.SUCCESS]
136 failed = [r for r in responses if r.status == ResponseStatus.ERROR]
137
138 print(f"\nTotal time: {total_time:.2f}s")
139 print(f"Successful: {len(successful)}")
140 print(f"Failed: {len(failed)}")
141
142 # If we did this synchronously:
143 total_delay = sum(r.duration for r in responses)
144 print(f"Sequential would take: {total_delay:.2f}s")
145 print(f"Speedup: {total_delay / total_time:.2f}x")
146
147
148# =============================================================================
149# 3. RATE LIMITING WITH SEMAPHORE
150# =============================================================================
151
152print("\n" + "=" * 70)
153print("3. RATE LIMITING WITH SEMAPHORE")
154print("=" * 70)
155
156
157async def rate_limited_fetch(url: str, semaphore: asyncio.Semaphore) -> Response:
158 """Fetch URL with rate limiting"""
159 async with semaphore: # Only N concurrent requests
160 return await fetch_url(url)
161
162
163async def demonstrate_rate_limiting():
164 """Demonstrate rate limiting with semaphore"""
165 print("\n[RATE LIMITING]")
166 print("-" * 50)
167
168 urls = [f"https://api.example.com/item/{i}" for i in range(10)]
169
170 # Limit to 3 concurrent requests
171 semaphore = asyncio.Semaphore(3)
172
173 print(f"Fetching {len(urls)} URLs with max 3 concurrent requests...")
174 start = time.time()
175
176 tasks = [rate_limited_fetch(url, semaphore) for url in urls]
177 responses = await asyncio.gather(*tasks)
178
179 total_time = time.time() - start
180 print(f"\nCompleted in {total_time:.2f}s")
181 print(f"Average per request: {total_time / len(urls):.2f}s")
182
183
184# =============================================================================
185# 4. ERROR HANDLING
186# =============================================================================
187
188print("\n" + "=" * 70)
189print("4. ERROR HANDLING")
190print("=" * 70)
191
192
193async def risky_operation(task_id: int) -> dict:
194 """Operation that might fail"""
195 await asyncio.sleep(random.uniform(0.1, 0.3))
196
197 # 30% chance of failure
198 if random.random() < 0.3:
199 raise ValueError(f"Task {task_id} failed!")
200
201 return {"task_id": task_id, "result": task_id * 2}
202
203
204async def demonstrate_error_handling():
205 """Demonstrate error handling in async code"""
206 print("\n[ERROR HANDLING]")
207 print("-" * 50)
208
209 # Approach 1: gather with return_exceptions=True
210 print("Approach 1: gather with return_exceptions")
211 tasks = [risky_operation(i) for i in range(5)]
212 results = await asyncio.gather(*tasks, return_exceptions=True)
213
214 for i, result in enumerate(results):
215 if isinstance(result, Exception):
216 print(f"Task {i}: Error - {result}")
217 else:
218 print(f"Task {i}: Success - {result}")
219
220 # Approach 2: Individual try/except
221 print("\nApproach 2: Individual try/except wrappers")
222
223 async def safe_operation(task_id: int) -> dict:
224 try:
225 return await risky_operation(task_id)
226 except Exception as e:
227 return {"task_id": task_id, "error": str(e)}
228
229 tasks = [safe_operation(i) for i in range(5)]
230 results = await asyncio.gather(*tasks)
231
232 for result in results:
233 if "error" in result:
234 print(f"Task {result['task_id']}: Error - {result['error']}")
235 else:
236 print(f"Task {result['task_id']}: Success - {result['result']}")
237
238
239# =============================================================================
240# 5. ASYNC CONTEXT MANAGERS
241# =============================================================================
242
243print("\n" + "=" * 70)
244print("5. ASYNC CONTEXT MANAGERS")
245print("=" * 70)
246
247
248class AsyncDatabaseConnection:
249 """Simulated async database connection"""
250
251 def __init__(self, connection_string: str):
252 self.connection_string = connection_string
253 self.connected = False
254
255 async def __aenter__(self):
256 """Async context manager entry"""
257 print(f"Connecting to {self.connection_string}...")
258 await asyncio.sleep(0.1) # Simulate connection delay
259 self.connected = True
260 print("Connected!")
261 return self
262
263 async def __aexit__(self, exc_type, exc_val, exc_tb):
264 """Async context manager exit"""
265 print("Disconnecting...")
266 await asyncio.sleep(0.05) # Simulate disconnection delay
267 self.connected = False
268 print("Disconnected!")
269 return False
270
271 async def query(self, sql: str) -> List[dict]:
272 """Simulate async query"""
273 if not self.connected:
274 raise RuntimeError("Not connected")
275
276 print(f"Executing: {sql}")
277 await asyncio.sleep(0.1) # Simulate query execution
278 return [{"id": 1, "name": "Result"}]
279
280
281async def demonstrate_async_context_manager():
282 """Demonstrate async context manager"""
283 print("\n[ASYNC CONTEXT MANAGER]")
284 print("-" * 50)
285
286 async with AsyncDatabaseConnection("postgresql://localhost/mydb") as db:
287 results = await db.query("SELECT * FROM users")
288 print(f"Query results: {results}")
289 # Connection automatically closed
290
291
292# =============================================================================
293# 6. PRACTICAL EXAMPLE: DATA PROCESSING PIPELINE
294# =============================================================================
295
296print("\n" + "=" * 70)
297print("6. PRACTICAL EXAMPLE: DATA PROCESSING PIPELINE")
298print("=" * 70)
299
300
301async def fetch_data(source_id: int) -> List[dict]:
302 """Fetch data from source"""
303 await asyncio.sleep(random.uniform(0.1, 0.3))
304 return [
305 {"source": source_id, "value": random.randint(1, 100)}
306 for _ in range(random.randint(5, 10))
307 ]
308
309
310async def process_item(item: dict) -> dict:
311 """Process a single item"""
312 await asyncio.sleep(0.01) # Simulate processing
313 return {
314 **item,
315 "processed": True,
316 "doubled": item["value"] * 2
317 }
318
319
320async def save_results(results: List[dict]) -> None:
321 """Save processed results"""
322 await asyncio.sleep(0.1) # Simulate save delay
323 print(f"Saved {len(results)} items to database")
324
325
326async def data_processing_pipeline():
327 """Complete data processing pipeline"""
328 print("\n[DATA PROCESSING PIPELINE]")
329 print("-" * 50)
330
331 # Step 1: Fetch data from multiple sources concurrently
332 print("Step 1: Fetching data from sources...")
333 source_ids = range(1, 6)
334 fetch_tasks = [fetch_data(source_id) for source_id in source_ids]
335 data_batches = await asyncio.gather(*fetch_tasks)
336
337 # Flatten data
338 all_data = [item for batch in data_batches for item in batch]
339 print(f"Fetched {len(all_data)} items")
340
341 # Step 2: Process items concurrently (with rate limiting)
342 print("\nStep 2: Processing items...")
343 semaphore = asyncio.Semaphore(10) # Max 10 concurrent processing
344
345 async def process_with_limit(item):
346 async with semaphore:
347 return await process_item(item)
348
349 process_tasks = [process_with_limit(item) for item in all_data]
350 processed_items = await asyncio.gather(*process_tasks)
351
352 print(f"Processed {len(processed_items)} items")
353
354 # Step 3: Save results
355 print("\nStep 3: Saving results...")
356 await save_results(processed_items)
357
358 print("\nPipeline completed!")
359 return processed_items
360
361
362# =============================================================================
363# 7. TASK MANAGEMENT
364# =============================================================================
365
366print("\n" + "=" * 70)
367print("7. TASK MANAGEMENT")
368print("=" * 70)
369
370
371async def demonstrate_task_management():
372 """Demonstrate task creation and management"""
373 print("\n[TASK MANAGEMENT]")
374 print("-" * 50)
375
376 async def background_task(name: str, duration: float):
377 """Background task"""
378 print(f"[{name}] Started")
379 await asyncio.sleep(duration)
380 print(f"[{name}] Completed")
381 return f"{name} result"
382
383 # Create tasks
384 task1 = asyncio.create_task(background_task("Task-1", 0.3))
385 task2 = asyncio.create_task(background_task("Task-2", 0.2))
386 task3 = asyncio.create_task(background_task("Task-3", 0.4))
387
388 # Wait for specific task
389 print("Waiting for Task-2...")
390 result = await task2
391 print(f"Task-2 result: {result}")
392
393 # Wait for all tasks
394 print("\nWaiting for all tasks...")
395 results = await asyncio.gather(task1, task3)
396 print(f"All results: {results}")
397
398 # Cancel task example
399 print("\nTask cancellation example:")
400 long_task = asyncio.create_task(background_task("Long-Task", 5.0))
401 await asyncio.sleep(0.1)
402 long_task.cancel()
403
404 try:
405 await long_task
406 except asyncio.CancelledError:
407 print("Long-Task was cancelled")
408
409
410# =============================================================================
411# DEMONSTRATION
412# =============================================================================
413
414async def main():
415 """Main demonstration function"""
416 print("ASYNCIO DEMONSTRATIONS")
417 print("=" * 70)
418
419 await demonstrate_basic_async()
420 await demonstrate_concurrent_requests()
421 await demonstrate_rate_limiting()
422 await demonstrate_error_handling()
423 await demonstrate_async_context_manager()
424 await data_processing_pipeline()
425 await demonstrate_task_management()
426
427 print_summary()
428
429
430def print_summary():
431 print("\n" + "=" * 70)
432 print("ASYNCIO SUMMARY")
433 print("=" * 70)
434
435 print("""
4361. BASIC ASYNC/AWAIT
437 ✓ async def creates coroutine function
438 ✓ await suspends execution
439 ✓ Enables concurrency without threads
440
4412. CONCURRENT EXECUTION
442 ✓ asyncio.gather() runs multiple coroutines
443 ✓ Much faster than sequential
444 ✓ Great for I/O-bound operations
445
4463. RATE LIMITING
447 ✓ asyncio.Semaphore limits concurrency
448 ✓ Prevents overwhelming resources
449 ✓ async with for automatic release
450
4514. ERROR HANDLING
452 ✓ return_exceptions=True in gather
453 ✓ Individual try/except in coroutines
454 ✓ Handle errors without stopping all tasks
455
4565. ASYNC CONTEXT MANAGERS
457 ✓ async with for async resource management
458 ✓ __aenter__ and __aexit__ methods
459 ✓ Guaranteed cleanup
460
4616. TASK MANAGEMENT
462 ✓ asyncio.create_task() for background tasks
463 ✓ Can wait for specific tasks
464 ✓ Can cancel tasks
465
466WHEN TO USE ASYNCIO:
467 ✓ I/O-bound operations (HTTP, database, files)
468 ✓ Many concurrent operations
469 ✓ Network services
470 ✓ Web scraping
471 ✓ API integrations
472
473ASYNC vs THREADING:
474 Asyncio:
475 ✓ Single-threaded (no race conditions!)
476 ✓ Explicit concurrency points (await)
477 ✓ Better for I/O-bound with many tasks
478 ✓ Lower overhead
479
480 Threading:
481 ✓ True parallelism (on multiple cores)
482 ✓ Better for blocking I/O (no async support)
483 ✓ Implicit concurrency (harder to reason about)
484 ✓ Need locks for shared data
485
486BEST PRACTICES:
487 • Use aiohttp for real HTTP requests
488 • Always await async functions
489 • Use semaphores for rate limiting
490 • Handle errors appropriately
491 • Use async context managers
492 • Don't mix blocking and async code
493 • Use asyncio.run() as entry point
494 • Be careful with shared mutable state
495
496COMMON PITFALLS:
497 ✗ Forgetting await (coroutine never runs!)
498 ✗ Using blocking I/O in async code
499 ✗ Not handling exceptions
500 ✗ Creating too many concurrent tasks
501""")
502
503
504if __name__ == "__main__":
505 # Run the async main function
506 asyncio.run(main())