async_demo.py

Download
python 507 lines 14.5 KB
  1"""
  2Asyncio Demonstration
  3
  4Covers:
  51. Basic async/await
  62. Concurrent task execution
  73. Gathering results
  84. Semaphores for rate limiting
  95. Error handling in async code
 106. Async context managers
 117. Practical examples (HTTP-like requests, data processing)
 12"""
 13
 14import asyncio
 15import time
 16import random
 17from typing import List, Dict, Any
 18from dataclasses import dataclass
 19from enum import Enum
 20
 21
 22# =============================================================================
 23# 1. BASIC ASYNC/AWAIT
 24# =============================================================================
 25
 26print("=" * 70)
 27print("1. BASIC ASYNC/AWAIT")
 28print("=" * 70)
 29
 30
 31async def simple_coroutine(name: str, delay: float) -> str:
 32    """Simple async function"""
 33    print(f"[{name}] Starting (delay: {delay:.2f}s)...")
 34    await asyncio.sleep(delay)  # Non-blocking sleep
 35    print(f"[{name}] Finished!")
 36    return f"{name} completed"
 37
 38
 39async def demonstrate_basic_async():
 40    """Basic async/await demonstration"""
 41    print("\n[BASIC ASYNC]")
 42    print("-" * 50)
 43
 44    # Sequential execution (still async, but one at a time)
 45    print("Sequential:")
 46    start = time.time()
 47    result1 = await simple_coroutine("Task-1", 0.3)
 48    result2 = await simple_coroutine("Task-2", 0.2)
 49    print(f"Results: {result1}, {result2}")
 50    print(f"Time: {time.time() - start:.2f}s\n")
 51
 52    # Concurrent execution
 53    print("Concurrent:")
 54    start = time.time()
 55    results = await asyncio.gather(
 56        simple_coroutine("Task-A", 0.3),
 57        simple_coroutine("Task-B", 0.2),
 58        simple_coroutine("Task-C", 0.25)
 59    )
 60    print(f"Results: {results}")
 61    print(f"Time: {time.time() - start:.2f}s (much faster!)")
 62
 63
 64# =============================================================================
 65# 2. SIMULATED HTTP REQUESTS
 66# =============================================================================
 67
 68print("\n" + "=" * 70)
 69print("2. SIMULATED HTTP REQUESTS")
 70print("=" * 70)
 71
 72
 73class ResponseStatus(Enum):
 74    """HTTP response status"""
 75    SUCCESS = 200
 76    NOT_FOUND = 404
 77    ERROR = 500
 78
 79
 80@dataclass
 81class Response:
 82    """Simulated HTTP response"""
 83    url: str
 84    status: ResponseStatus
 85    data: Any
 86    duration: float
 87
 88
 89async def fetch_url(url: str, delay: float = None) -> Response:
 90    """
 91    Simulate fetching a URL.
 92    In real code, use aiohttp library.
 93    """
 94    if delay is None:
 95        delay = random.uniform(0.1, 0.5)
 96
 97    print(f"Fetching {url}...")
 98    start = time.time()
 99
100    await asyncio.sleep(delay)  # Simulate network delay
101
102    # Simulate occasional errors
103    if random.random() < 0.1:
104        status = ResponseStatus.ERROR
105        data = {"error": "Server error"}
106    else:
107        status = ResponseStatus.SUCCESS
108        data = {"url": url, "content": f"Content from {url}"}
109
110    duration = time.time() - start
111    print(f"Fetched {url} in {duration:.2f}s")
112
113    return Response(url, status, data, duration)
114
115
116async def demonstrate_concurrent_requests():
117    """Demonstrate concurrent HTTP-like requests"""
118    print("\n[CONCURRENT REQUESTS]")
119    print("-" * 50)
120
121    urls = [
122        "https://api.example.com/users",
123        "https://api.example.com/posts",
124        "https://api.example.com/comments",
125        "https://api.example.com/photos",
126        "https://api.example.com/todos",
127    ]
128
129    # Fetch all URLs concurrently
130    start = time.time()
131    responses = await asyncio.gather(*[fetch_url(url) for url in urls])
132    total_time = time.time() - start
133
134    # Process results
135    successful = [r for r in responses if r.status == ResponseStatus.SUCCESS]
136    failed = [r for r in responses if r.status == ResponseStatus.ERROR]
137
138    print(f"\nTotal time: {total_time:.2f}s")
139    print(f"Successful: {len(successful)}")
140    print(f"Failed: {len(failed)}")
141
142    # If we did this synchronously:
143    total_delay = sum(r.duration for r in responses)
144    print(f"Sequential would take: {total_delay:.2f}s")
145    print(f"Speedup: {total_delay / total_time:.2f}x")
146
147
148# =============================================================================
149# 3. RATE LIMITING WITH SEMAPHORE
150# =============================================================================
151
152print("\n" + "=" * 70)
153print("3. RATE LIMITING WITH SEMAPHORE")
154print("=" * 70)
155
156
157async def rate_limited_fetch(url: str, semaphore: asyncio.Semaphore) -> Response:
158    """Fetch URL with rate limiting"""
159    async with semaphore:  # Only N concurrent requests
160        return await fetch_url(url)
161
162
163async def demonstrate_rate_limiting():
164    """Demonstrate rate limiting with semaphore"""
165    print("\n[RATE LIMITING]")
166    print("-" * 50)
167
168    urls = [f"https://api.example.com/item/{i}" for i in range(10)]
169
170    # Limit to 3 concurrent requests
171    semaphore = asyncio.Semaphore(3)
172
173    print(f"Fetching {len(urls)} URLs with max 3 concurrent requests...")
174    start = time.time()
175
176    tasks = [rate_limited_fetch(url, semaphore) for url in urls]
177    responses = await asyncio.gather(*tasks)
178
179    total_time = time.time() - start
180    print(f"\nCompleted in {total_time:.2f}s")
181    print(f"Average per request: {total_time / len(urls):.2f}s")
182
183
184# =============================================================================
185# 4. ERROR HANDLING
186# =============================================================================
187
188print("\n" + "=" * 70)
189print("4. ERROR HANDLING")
190print("=" * 70)
191
192
193async def risky_operation(task_id: int) -> dict:
194    """Operation that might fail"""
195    await asyncio.sleep(random.uniform(0.1, 0.3))
196
197    # 30% chance of failure
198    if random.random() < 0.3:
199        raise ValueError(f"Task {task_id} failed!")
200
201    return {"task_id": task_id, "result": task_id * 2}
202
203
204async def demonstrate_error_handling():
205    """Demonstrate error handling in async code"""
206    print("\n[ERROR HANDLING]")
207    print("-" * 50)
208
209    # Approach 1: gather with return_exceptions=True
210    print("Approach 1: gather with return_exceptions")
211    tasks = [risky_operation(i) for i in range(5)]
212    results = await asyncio.gather(*tasks, return_exceptions=True)
213
214    for i, result in enumerate(results):
215        if isinstance(result, Exception):
216            print(f"Task {i}: Error - {result}")
217        else:
218            print(f"Task {i}: Success - {result}")
219
220    # Approach 2: Individual try/except
221    print("\nApproach 2: Individual try/except wrappers")
222
223    async def safe_operation(task_id: int) -> dict:
224        try:
225            return await risky_operation(task_id)
226        except Exception as e:
227            return {"task_id": task_id, "error": str(e)}
228
229    tasks = [safe_operation(i) for i in range(5)]
230    results = await asyncio.gather(*tasks)
231
232    for result in results:
233        if "error" in result:
234            print(f"Task {result['task_id']}: Error - {result['error']}")
235        else:
236            print(f"Task {result['task_id']}: Success - {result['result']}")
237
238
239# =============================================================================
240# 5. ASYNC CONTEXT MANAGERS
241# =============================================================================
242
243print("\n" + "=" * 70)
244print("5. ASYNC CONTEXT MANAGERS")
245print("=" * 70)
246
247
248class AsyncDatabaseConnection:
249    """Simulated async database connection"""
250
251    def __init__(self, connection_string: str):
252        self.connection_string = connection_string
253        self.connected = False
254
255    async def __aenter__(self):
256        """Async context manager entry"""
257        print(f"Connecting to {self.connection_string}...")
258        await asyncio.sleep(0.1)  # Simulate connection delay
259        self.connected = True
260        print("Connected!")
261        return self
262
263    async def __aexit__(self, exc_type, exc_val, exc_tb):
264        """Async context manager exit"""
265        print("Disconnecting...")
266        await asyncio.sleep(0.05)  # Simulate disconnection delay
267        self.connected = False
268        print("Disconnected!")
269        return False
270
271    async def query(self, sql: str) -> List[dict]:
272        """Simulate async query"""
273        if not self.connected:
274            raise RuntimeError("Not connected")
275
276        print(f"Executing: {sql}")
277        await asyncio.sleep(0.1)  # Simulate query execution
278        return [{"id": 1, "name": "Result"}]
279
280
281async def demonstrate_async_context_manager():
282    """Demonstrate async context manager"""
283    print("\n[ASYNC CONTEXT MANAGER]")
284    print("-" * 50)
285
286    async with AsyncDatabaseConnection("postgresql://localhost/mydb") as db:
287        results = await db.query("SELECT * FROM users")
288        print(f"Query results: {results}")
289    # Connection automatically closed
290
291
292# =============================================================================
293# 6. PRACTICAL EXAMPLE: DATA PROCESSING PIPELINE
294# =============================================================================
295
296print("\n" + "=" * 70)
297print("6. PRACTICAL EXAMPLE: DATA PROCESSING PIPELINE")
298print("=" * 70)
299
300
301async def fetch_data(source_id: int) -> List[dict]:
302    """Fetch data from source"""
303    await asyncio.sleep(random.uniform(0.1, 0.3))
304    return [
305        {"source": source_id, "value": random.randint(1, 100)}
306        for _ in range(random.randint(5, 10))
307    ]
308
309
310async def process_item(item: dict) -> dict:
311    """Process a single item"""
312    await asyncio.sleep(0.01)  # Simulate processing
313    return {
314        **item,
315        "processed": True,
316        "doubled": item["value"] * 2
317    }
318
319
320async def save_results(results: List[dict]) -> None:
321    """Save processed results"""
322    await asyncio.sleep(0.1)  # Simulate save delay
323    print(f"Saved {len(results)} items to database")
324
325
326async def data_processing_pipeline():
327    """Complete data processing pipeline"""
328    print("\n[DATA PROCESSING PIPELINE]")
329    print("-" * 50)
330
331    # Step 1: Fetch data from multiple sources concurrently
332    print("Step 1: Fetching data from sources...")
333    source_ids = range(1, 6)
334    fetch_tasks = [fetch_data(source_id) for source_id in source_ids]
335    data_batches = await asyncio.gather(*fetch_tasks)
336
337    # Flatten data
338    all_data = [item for batch in data_batches for item in batch]
339    print(f"Fetched {len(all_data)} items")
340
341    # Step 2: Process items concurrently (with rate limiting)
342    print("\nStep 2: Processing items...")
343    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent processing
344
345    async def process_with_limit(item):
346        async with semaphore:
347            return await process_item(item)
348
349    process_tasks = [process_with_limit(item) for item in all_data]
350    processed_items = await asyncio.gather(*process_tasks)
351
352    print(f"Processed {len(processed_items)} items")
353
354    # Step 3: Save results
355    print("\nStep 3: Saving results...")
356    await save_results(processed_items)
357
358    print("\nPipeline completed!")
359    return processed_items
360
361
362# =============================================================================
363# 7. TASK MANAGEMENT
364# =============================================================================
365
366print("\n" + "=" * 70)
367print("7. TASK MANAGEMENT")
368print("=" * 70)
369
370
371async def demonstrate_task_management():
372    """Demonstrate task creation and management"""
373    print("\n[TASK MANAGEMENT]")
374    print("-" * 50)
375
376    async def background_task(name: str, duration: float):
377        """Background task"""
378        print(f"[{name}] Started")
379        await asyncio.sleep(duration)
380        print(f"[{name}] Completed")
381        return f"{name} result"
382
383    # Create tasks
384    task1 = asyncio.create_task(background_task("Task-1", 0.3))
385    task2 = asyncio.create_task(background_task("Task-2", 0.2))
386    task3 = asyncio.create_task(background_task("Task-3", 0.4))
387
388    # Wait for specific task
389    print("Waiting for Task-2...")
390    result = await task2
391    print(f"Task-2 result: {result}")
392
393    # Wait for all tasks
394    print("\nWaiting for all tasks...")
395    results = await asyncio.gather(task1, task3)
396    print(f"All results: {results}")
397
398    # Cancel task example
399    print("\nTask cancellation example:")
400    long_task = asyncio.create_task(background_task("Long-Task", 5.0))
401    await asyncio.sleep(0.1)
402    long_task.cancel()
403
404    try:
405        await long_task
406    except asyncio.CancelledError:
407        print("Long-Task was cancelled")
408
409
410# =============================================================================
411# DEMONSTRATION
412# =============================================================================
413
414async def main():
415    """Main demonstration function"""
416    print("ASYNCIO DEMONSTRATIONS")
417    print("=" * 70)
418
419    await demonstrate_basic_async()
420    await demonstrate_concurrent_requests()
421    await demonstrate_rate_limiting()
422    await demonstrate_error_handling()
423    await demonstrate_async_context_manager()
424    await data_processing_pipeline()
425    await demonstrate_task_management()
426
427    print_summary()
428
429
430def print_summary():
431    print("\n" + "=" * 70)
432    print("ASYNCIO SUMMARY")
433    print("=" * 70)
434
435    print("""
4361. BASIC ASYNC/AWAIT
437   ✓ async def creates coroutine function
438   ✓ await suspends execution
439   ✓ Enables concurrency without threads
440
4412. CONCURRENT EXECUTION
442   ✓ asyncio.gather() runs multiple coroutines
443   ✓ Much faster than sequential
444   ✓ Great for I/O-bound operations
445
4463. RATE LIMITING
447   ✓ asyncio.Semaphore limits concurrency
448   ✓ Prevents overwhelming resources
449   ✓ async with for automatic release
450
4514. ERROR HANDLING
452   ✓ return_exceptions=True in gather
453   ✓ Individual try/except in coroutines
454   ✓ Handle errors without stopping all tasks
455
4565. ASYNC CONTEXT MANAGERS
457   ✓ async with for async resource management
458   ✓ __aenter__ and __aexit__ methods
459   ✓ Guaranteed cleanup
460
4616. TASK MANAGEMENT
462   ✓ asyncio.create_task() for background tasks
463   ✓ Can wait for specific tasks
464   ✓ Can cancel tasks
465
466WHEN TO USE ASYNCIO:
467  ✓ I/O-bound operations (HTTP, database, files)
468  ✓ Many concurrent operations
469  ✓ Network services
470  ✓ Web scraping
471  ✓ API integrations
472
473ASYNC vs THREADING:
474  Asyncio:
475    ✓ Single-threaded (no race conditions!)
476    ✓ Explicit concurrency points (await)
477    ✓ Better for I/O-bound with many tasks
478    ✓ Lower overhead
479
480  Threading:
481    ✓ True parallelism (on multiple cores)
482    ✓ Better for blocking I/O (no async support)
483    ✓ Implicit concurrency (harder to reason about)
484    ✓ Need locks for shared data
485
486BEST PRACTICES:
487  • Use aiohttp for real HTTP requests
488  • Always await async functions
489  • Use semaphores for rate limiting
490  • Handle errors appropriately
491  • Use async context managers
492  • Don't mix blocking and async code
493  • Use asyncio.run() as entry point
494  • Be careful with shared mutable state
495
496COMMON PITFALLS:
497  ✗ Forgetting await (coroutine never runs!)
498  ✗ Using blocking I/O in async code
499  ✗ Not handling exceptions
500  ✗ Creating too many concurrent tasks
501""")
502
503
504if __name__ == "__main__":
505    # Run the async main function
506    asyncio.run(main())