14_concurrency.py

Download
python 467 lines 12.7 KB
  1"""
  2Python Concurrency: Threading and Multiprocessing
  3
  4Demonstrates:
  5- threading basics
  6- Thread synchronization (Lock, RLock)
  7- ThreadPoolExecutor
  8- multiprocessing.Pool
  9- Queue for producer-consumer
 10- GIL limitations
 11- When to use threads vs processes
 12"""
 13
 14import threading
 15import multiprocessing
 16import time
 17import queue
 18from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
 19from typing import List
 20
 21
 22def section(title: str) -> None:
 23    """Print a section header."""
 24    print("\n" + "=" * 60)
 25    print(f"  {title}")
 26    print("=" * 60)
 27
 28
 29# =============================================================================
 30# Basic Threading
 31# =============================================================================
 32
 33section("Basic Threading")
 34
 35
 36def worker(name: str, delay: float):
 37    """Simple worker function."""
 38    print(f"  {name}: Starting (delay={delay}s)")
 39    time.sleep(delay)
 40    print(f"  {name}: Finished")
 41
 42
 43# Create and start threads
 44threads = []
 45for i in range(3):
 46    t = threading.Thread(target=worker, args=(f"Thread-{i}", 0.1 * (i + 1)))
 47    threads.append(t)
 48    t.start()
 49
 50# Wait for all threads to complete
 51for t in threads:
 52    t.join()
 53
 54print("All threads completed")
 55
 56
 57# =============================================================================
 58# Thread with Return Value
 59# =============================================================================
 60
 61section("Thread with Return Value")
 62
 63
 64def compute_square(n: int) -> int:
 65    """Compute square of number."""
 66    time.sleep(0.05)
 67    return n * n
 68
 69
 70class ThreadWithReturnValue(threading.Thread):
 71    """Thread that stores return value."""
 72
 73    def __init__(self, target, args=()):
 74        super().__init__(target=target, args=args)
 75        self.result = None
 76
 77    def run(self):
 78        self.result = self._target(*self._args)
 79
 80
 81threads = []
 82for i in range(5):
 83    t = ThreadWithReturnValue(target=compute_square, args=(i,))
 84    t.start()
 85    threads.append(t)
 86
 87results = []
 88for t in threads:
 89    t.join()
 90    results.append(t.result)
 91
 92print(f"Results: {results}")
 93
 94
 95# =============================================================================
 96# Thread Synchronization - Lock
 97# =============================================================================
 98
 99section("Thread Synchronization - Lock")
100
101counter = 0
102counter_lock = threading.Lock()
103
104
105def increment_with_lock(name: str, iterations: int):
106    """Increment counter with lock."""
107    global counter
108    for _ in range(iterations):
109        with counter_lock:  # Acquire lock
110            counter += 1
111
112
113# Without lock (race condition)
114counter = 0
115threads = [
116    threading.Thread(target=lambda: increment_with_lock("T1", 1000)),
117    threading.Thread(target=lambda: increment_with_lock("T2", 1000)),
118]
119for t in threads:
120    t.start()
121for t in threads:
122    t.join()
123
124print(f"Counter with lock: {counter} (expected: 2000)")
125
126
127# =============================================================================
128# ThreadPoolExecutor
129# =============================================================================
130
131section("ThreadPoolExecutor")
132
133
134def download_file(file_id: int) -> dict:
135    """Simulate file download."""
136    time.sleep(0.1)
137    return {"id": file_id, "size": file_id * 1024}
138
139
140# Using ThreadPoolExecutor
141start = time.perf_counter()
142
143with ThreadPoolExecutor(max_workers=4) as executor:
144    # Submit tasks
145    futures = [executor.submit(download_file, i) for i in range(10)]
146
147    # Collect results
148    results = [future.result() for future in futures]
149
150elapsed = time.perf_counter() - start
151
152print(f"Downloaded {len(results)} files in {elapsed:.2f}s")
153print(f"Results: {results[:3]}...")
154
155
156# Using map()
157section("ThreadPoolExecutor with map()")
158
159with ThreadPoolExecutor(max_workers=4) as executor:
160    file_ids = range(10)
161    results = list(executor.map(download_file, file_ids))
162
163print(f"Downloaded {len(results)} files using map()")
164
165
166# =============================================================================
167# Producer-Consumer with Queue
168# =============================================================================
169
170section("Producer-Consumer with Queue")
171
172task_queue = queue.Queue()
173result_queue = queue.Queue()
174
175
176def producer(num_items: int):
177    """Produce items."""
178    for i in range(num_items):
179        task_queue.put(i)
180        print(f"  Producer: Added {i}")
181        time.sleep(0.02)
182    print("  Producer: Done")
183
184
185def consumer(worker_id: int):
186    """Consume items."""
187    while True:
188        try:
189            item = task_queue.get(timeout=0.5)
190            print(f"  Consumer-{worker_id}: Processing {item}")
191            result = item * 2
192            result_queue.put(result)
193            time.sleep(0.05)
194            task_queue.task_done()
195        except queue.Empty:
196            break
197
198
199# Start producer
200producer_thread = threading.Thread(target=producer, args=(10,))
201producer_thread.start()
202
203# Start consumers
204consumer_threads = []
205for i in range(3):
206    t = threading.Thread(target=consumer, args=(i,))
207    t.start()
208    consumer_threads.append(t)
209
210# Wait for completion
211producer_thread.join()
212task_queue.join()
213
214for t in consumer_threads:
215    t.join()
216
217print(f"\nResults collected: {result_queue.qsize()} items")
218
219
220# =============================================================================
221# Multiprocessing - CPU-Bound Tasks
222# =============================================================================
223
224section("Multiprocessing - CPU-Bound Tasks")
225
226
227def cpu_intensive(n: int) -> int:
228    """CPU-intensive task."""
229    total = 0
230    for i in range(n):
231        total += i * i
232    return total
233
234
235# Sequential
236start = time.perf_counter()
237results_seq = [cpu_intensive(1000000) for _ in range(4)]
238time_seq = time.perf_counter() - start
239
240# Multiprocessing
241start = time.perf_counter()
242with multiprocessing.Pool(processes=4) as pool:
243    results_mp = pool.map(cpu_intensive, [1000000] * 4)
244time_mp = time.perf_counter() - start
245
246print(f"Sequential: {time_seq:.2f}s")
247print(f"Multiprocessing: {time_mp:.2f}s")
248print(f"Speedup: {time_seq / time_mp:.2f}x")
249
250
251# =============================================================================
252# ProcessPoolExecutor
253# =============================================================================
254
255section("ProcessPoolExecutor")
256
257
258def fibonacci(n: int) -> int:
259    """Compute Fibonacci (CPU-bound)."""
260    if n <= 1:
261        return n
262    a, b = 0, 1
263    for _ in range(n - 1):
264        a, b = b, a + b
265    return b
266
267
268numbers = [100000, 200000, 300000, 400000]
269
270# Using ProcessPoolExecutor
271start = time.perf_counter()
272with ProcessPoolExecutor(max_workers=4) as executor:
273    results = list(executor.map(fibonacci, numbers))
274elapsed = time.perf_counter() - start
275
276print(f"Computed {len(results)} Fibonacci numbers in {elapsed:.2f}s")
277print(f"Last 10 digits: {[str(r)[-10:] for r in results]}")
278
279
280# =============================================================================
281# as_completed - Process Results as They Finish
282# =============================================================================
283
284section("as_completed - Process Results as They Finish")
285
286
287def slow_task(task_id: int) -> dict:
288    """Task with variable duration."""
289    duration = 0.1 * (4 - task_id % 4)  # 0.1s to 0.4s
290    time.sleep(duration)
291    return {"id": task_id, "duration": duration}
292
293
294with ThreadPoolExecutor(max_workers=4) as executor:
295    futures = {executor.submit(slow_task, i): i for i in range(8)}
296
297    print("Processing results as they complete:")
298    for future in as_completed(futures):
299        result = future.result()
300        print(f"  Task {result['id']} completed (took {result['duration']:.2f}s)")
301
302
303# =============================================================================
304# Thread-Local Storage
305# =============================================================================
306
307section("Thread-Local Storage")
308
309thread_local = threading.local()
310
311
312def use_thread_local(name: str):
313    """Each thread has its own local storage."""
314    # Set thread-local value
315    thread_local.data = f"{name}-data"
316
317    time.sleep(0.1)
318
319    # Access thread-local value
320    print(f"  {name}: {thread_local.data}")
321
322
323threads = [
324    threading.Thread(target=use_thread_local, args=(f"Thread-{i}",))
325    for i in range(3)
326]
327
328for t in threads:
329    t.start()
330for t in threads:
331    t.join()
332
333
334# =============================================================================
335# Daemon Threads
336# =============================================================================
337
338section("Daemon Threads")
339
340
341def daemon_worker():
342    """Daemon thread runs in background."""
343    print("  Daemon: Starting")
344    time.sleep(5)  # Long-running task
345    print("  Daemon: Finished")  # Won't print if main exits
346
347
348daemon = threading.Thread(target=daemon_worker, daemon=True)
349daemon.start()
350
351print("Main: Started daemon thread")
352time.sleep(0.1)
353print("Main: Exiting (daemon will be killed)")
354
355
356# =============================================================================
357# GIL Demonstration
358# =============================================================================
359
360section("GIL - Global Interpreter Lock")
361
362print("""
363Python's Global Interpreter Lock (GIL):
364- Only one thread executes Python bytecode at a time
365- Protects Python object memory
366- Prevents true parallel execution of Python code
367
368Impact:
369- CPU-bound tasks: Threading provides NO speedup (GIL limitation)
370  → Use multiprocessing for CPU-bound tasks
371- I/O-bound tasks: Threading DOES provide speedup (releases GIL during I/O)
372  → Use threading for I/O-bound tasks (network, disk)
373
374Example speedup comparison:
375""")
376
377
378def io_bound_task():
379    """I/O-bound - releases GIL during sleep."""
380    time.sleep(0.1)
381    return "done"
382
383
384def cpu_bound_task():
385    """CPU-bound - holds GIL."""
386    return sum(i * i for i in range(100000))
387
388
389# I/O-bound with threads (good speedup)
390start = time.perf_counter()
391with ThreadPoolExecutor(max_workers=4) as executor:
392    list(executor.map(lambda _: io_bound_task(), range(4)))
393time_io_threads = time.perf_counter() - start
394
395# CPU-bound with threads (no speedup due to GIL)
396start = time.perf_counter()
397with ThreadPoolExecutor(max_workers=4) as executor:
398    list(executor.map(lambda _: cpu_bound_task(), range(4)))
399time_cpu_threads = time.perf_counter() - start
400
401# CPU-bound with processes (good speedup, no GIL)
402start = time.perf_counter()
403with ProcessPoolExecutor(max_workers=4) as executor:
404    list(executor.map(lambda _: cpu_bound_task(), range(4)))
405time_cpu_processes = time.perf_counter() - start
406
407print(f"I/O-bound with threads: {time_io_threads:.3f}s (good)")
408print(f"CPU-bound with threads: {time_cpu_threads:.3f}s (no speedup)")
409print(f"CPU-bound with processes: {time_cpu_processes:.3f}s (good speedup)")
410
411
412# =============================================================================
413# Summary
414# =============================================================================
415
416section("Summary")
417
418print("""
419Threading:
420- Use for I/O-bound tasks (network, disk, database)
421- Lightweight (shared memory)
422- GIL prevents true parallelism for CPU-bound tasks
423- threading.Thread - basic threading
424- ThreadPoolExecutor - high-level interface
425- Lock/RLock - synchronization
426- Queue - thread-safe communication
427
428Multiprocessing:
429- Use for CPU-bound tasks (computation, data processing)
430- Separate Python interpreter per process (no GIL)
431- Higher overhead (separate memory)
432- multiprocessing.Pool - pool of worker processes
433- ProcessPoolExecutor - high-level interface
434
435Guidelines:
436┌─────────────────────┬──────────────┬──────────────────┐
437│ Task Type           │ Use          │ Reason           │
438├─────────────────────┼──────────────┼──────────────────┤
439│ I/O-bound          │ Threading    │ GIL released     │
440│ CPU-bound          │ Multiprocess │ No GIL           │
441│ Mixed              │ Both         │ Combine          │
442│ Simple concurrency │ asyncio      │ Single-threaded  │
443└─────────────────────┴──────────────┴──────────────────┘
444
445ThreadPoolExecutor vs ProcessPoolExecutor:
446- Same interface (concurrent.futures)
447- Easy to switch between them
448- Use ProcessPoolExecutor for CPU-intensive work
449- Use ThreadPoolExecutor for I/O-intensive work
450
451Synchronization primitives:
452- Lock - mutual exclusion
453- RLock - reentrant lock
454- Semaphore - limit concurrent access
455- Event - thread signaling
456- Condition - wait for condition
457- Queue - thread-safe FIFO
458
459Best practices:
4601. Avoid shared state when possible
4612. Use locks to protect shared state
4623. Prefer ProcessPoolExecutor for CPU-bound
4634. Prefer ThreadPoolExecutor for I/O-bound
4645. Consider asyncio for simple I/O concurrency
4656. Profile before optimizing
466""")