1"""
2Python Concurrency: Threading and Multiprocessing
3
4Demonstrates:
5- threading basics
6- Thread synchronization (Lock, RLock)
7- ThreadPoolExecutor
8- multiprocessing.Pool
9- Queue for producer-consumer
10- GIL limitations
11- When to use threads vs processes
12"""
13
14import threading
15import multiprocessing
16import time
17import queue
18from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
19from typing import List
20
21
22def section(title: str) -> None:
23 """Print a section header."""
24 print("\n" + "=" * 60)
25 print(f" {title}")
26 print("=" * 60)
27
28
29# =============================================================================
30# Basic Threading
31# =============================================================================
32
33section("Basic Threading")
34
35
36def worker(name: str, delay: float):
37 """Simple worker function."""
38 print(f" {name}: Starting (delay={delay}s)")
39 time.sleep(delay)
40 print(f" {name}: Finished")
41
42
43# Create and start threads
44threads = []
45for i in range(3):
46 t = threading.Thread(target=worker, args=(f"Thread-{i}", 0.1 * (i + 1)))
47 threads.append(t)
48 t.start()
49
50# Wait for all threads to complete
51for t in threads:
52 t.join()
53
54print("All threads completed")
55
56
57# =============================================================================
58# Thread with Return Value
59# =============================================================================
60
61section("Thread with Return Value")
62
63
64def compute_square(n: int) -> int:
65 """Compute square of number."""
66 time.sleep(0.05)
67 return n * n
68
69
70class ThreadWithReturnValue(threading.Thread):
71 """Thread that stores return value."""
72
73 def __init__(self, target, args=()):
74 super().__init__(target=target, args=args)
75 self.result = None
76
77 def run(self):
78 self.result = self._target(*self._args)
79
80
81threads = []
82for i in range(5):
83 t = ThreadWithReturnValue(target=compute_square, args=(i,))
84 t.start()
85 threads.append(t)
86
87results = []
88for t in threads:
89 t.join()
90 results.append(t.result)
91
92print(f"Results: {results}")
93
94
95# =============================================================================
96# Thread Synchronization - Lock
97# =============================================================================
98
99section("Thread Synchronization - Lock")
100
101counter = 0
102counter_lock = threading.Lock()
103
104
105def increment_with_lock(name: str, iterations: int):
106 """Increment counter with lock."""
107 global counter
108 for _ in range(iterations):
109 with counter_lock: # Acquire lock
110 counter += 1
111
112
113# Without lock (race condition)
114counter = 0
115threads = [
116 threading.Thread(target=lambda: increment_with_lock("T1", 1000)),
117 threading.Thread(target=lambda: increment_with_lock("T2", 1000)),
118]
119for t in threads:
120 t.start()
121for t in threads:
122 t.join()
123
124print(f"Counter with lock: {counter} (expected: 2000)")
125
126
127# =============================================================================
128# ThreadPoolExecutor
129# =============================================================================
130
131section("ThreadPoolExecutor")
132
133
134def download_file(file_id: int) -> dict:
135 """Simulate file download."""
136 time.sleep(0.1)
137 return {"id": file_id, "size": file_id * 1024}
138
139
140# Using ThreadPoolExecutor
141start = time.perf_counter()
142
143with ThreadPoolExecutor(max_workers=4) as executor:
144 # Submit tasks
145 futures = [executor.submit(download_file, i) for i in range(10)]
146
147 # Collect results
148 results = [future.result() for future in futures]
149
150elapsed = time.perf_counter() - start
151
152print(f"Downloaded {len(results)} files in {elapsed:.2f}s")
153print(f"Results: {results[:3]}...")
154
155
156# Using map()
157section("ThreadPoolExecutor with map()")
158
159with ThreadPoolExecutor(max_workers=4) as executor:
160 file_ids = range(10)
161 results = list(executor.map(download_file, file_ids))
162
163print(f"Downloaded {len(results)} files using map()")
164
165
166# =============================================================================
167# Producer-Consumer with Queue
168# =============================================================================
169
170section("Producer-Consumer with Queue")
171
172task_queue = queue.Queue()
173result_queue = queue.Queue()
174
175
176def producer(num_items: int):
177 """Produce items."""
178 for i in range(num_items):
179 task_queue.put(i)
180 print(f" Producer: Added {i}")
181 time.sleep(0.02)
182 print(" Producer: Done")
183
184
185def consumer(worker_id: int):
186 """Consume items."""
187 while True:
188 try:
189 item = task_queue.get(timeout=0.5)
190 print(f" Consumer-{worker_id}: Processing {item}")
191 result = item * 2
192 result_queue.put(result)
193 time.sleep(0.05)
194 task_queue.task_done()
195 except queue.Empty:
196 break
197
198
199# Start producer
200producer_thread = threading.Thread(target=producer, args=(10,))
201producer_thread.start()
202
203# Start consumers
204consumer_threads = []
205for i in range(3):
206 t = threading.Thread(target=consumer, args=(i,))
207 t.start()
208 consumer_threads.append(t)
209
210# Wait for completion
211producer_thread.join()
212task_queue.join()
213
214for t in consumer_threads:
215 t.join()
216
217print(f"\nResults collected: {result_queue.qsize()} items")
218
219
220# =============================================================================
221# Multiprocessing - CPU-Bound Tasks
222# =============================================================================
223
224section("Multiprocessing - CPU-Bound Tasks")
225
226
227def cpu_intensive(n: int) -> int:
228 """CPU-intensive task."""
229 total = 0
230 for i in range(n):
231 total += i * i
232 return total
233
234
235# Sequential
236start = time.perf_counter()
237results_seq = [cpu_intensive(1000000) for _ in range(4)]
238time_seq = time.perf_counter() - start
239
240# Multiprocessing
241start = time.perf_counter()
242with multiprocessing.Pool(processes=4) as pool:
243 results_mp = pool.map(cpu_intensive, [1000000] * 4)
244time_mp = time.perf_counter() - start
245
246print(f"Sequential: {time_seq:.2f}s")
247print(f"Multiprocessing: {time_mp:.2f}s")
248print(f"Speedup: {time_seq / time_mp:.2f}x")
249
250
251# =============================================================================
252# ProcessPoolExecutor
253# =============================================================================
254
255section("ProcessPoolExecutor")
256
257
258def fibonacci(n: int) -> int:
259 """Compute Fibonacci (CPU-bound)."""
260 if n <= 1:
261 return n
262 a, b = 0, 1
263 for _ in range(n - 1):
264 a, b = b, a + b
265 return b
266
267
268numbers = [100000, 200000, 300000, 400000]
269
270# Using ProcessPoolExecutor
271start = time.perf_counter()
272with ProcessPoolExecutor(max_workers=4) as executor:
273 results = list(executor.map(fibonacci, numbers))
274elapsed = time.perf_counter() - start
275
276print(f"Computed {len(results)} Fibonacci numbers in {elapsed:.2f}s")
277print(f"Last 10 digits: {[str(r)[-10:] for r in results]}")
278
279
280# =============================================================================
281# as_completed - Process Results as They Finish
282# =============================================================================
283
284section("as_completed - Process Results as They Finish")
285
286
287def slow_task(task_id: int) -> dict:
288 """Task with variable duration."""
289 duration = 0.1 * (4 - task_id % 4) # 0.1s to 0.4s
290 time.sleep(duration)
291 return {"id": task_id, "duration": duration}
292
293
294with ThreadPoolExecutor(max_workers=4) as executor:
295 futures = {executor.submit(slow_task, i): i for i in range(8)}
296
297 print("Processing results as they complete:")
298 for future in as_completed(futures):
299 result = future.result()
300 print(f" Task {result['id']} completed (took {result['duration']:.2f}s)")
301
302
303# =============================================================================
304# Thread-Local Storage
305# =============================================================================
306
307section("Thread-Local Storage")
308
309thread_local = threading.local()
310
311
312def use_thread_local(name: str):
313 """Each thread has its own local storage."""
314 # Set thread-local value
315 thread_local.data = f"{name}-data"
316
317 time.sleep(0.1)
318
319 # Access thread-local value
320 print(f" {name}: {thread_local.data}")
321
322
323threads = [
324 threading.Thread(target=use_thread_local, args=(f"Thread-{i}",))
325 for i in range(3)
326]
327
328for t in threads:
329 t.start()
330for t in threads:
331 t.join()
332
333
334# =============================================================================
335# Daemon Threads
336# =============================================================================
337
338section("Daemon Threads")
339
340
341def daemon_worker():
342 """Daemon thread runs in background."""
343 print(" Daemon: Starting")
344 time.sleep(5) # Long-running task
345 print(" Daemon: Finished") # Won't print if main exits
346
347
348daemon = threading.Thread(target=daemon_worker, daemon=True)
349daemon.start()
350
351print("Main: Started daemon thread")
352time.sleep(0.1)
353print("Main: Exiting (daemon will be killed)")
354
355
356# =============================================================================
357# GIL Demonstration
358# =============================================================================
359
360section("GIL - Global Interpreter Lock")
361
362print("""
363Python's Global Interpreter Lock (GIL):
364- Only one thread executes Python bytecode at a time
365- Protects Python object memory
366- Prevents true parallel execution of Python code
367
368Impact:
369- CPU-bound tasks: Threading provides NO speedup (GIL limitation)
370 → Use multiprocessing for CPU-bound tasks
371- I/O-bound tasks: Threading DOES provide speedup (releases GIL during I/O)
372 → Use threading for I/O-bound tasks (network, disk)
373
374Example speedup comparison:
375""")
376
377
378def io_bound_task():
379 """I/O-bound - releases GIL during sleep."""
380 time.sleep(0.1)
381 return "done"
382
383
384def cpu_bound_task():
385 """CPU-bound - holds GIL."""
386 return sum(i * i for i in range(100000))
387
388
389# I/O-bound with threads (good speedup)
390start = time.perf_counter()
391with ThreadPoolExecutor(max_workers=4) as executor:
392 list(executor.map(lambda _: io_bound_task(), range(4)))
393time_io_threads = time.perf_counter() - start
394
395# CPU-bound with threads (no speedup due to GIL)
396start = time.perf_counter()
397with ThreadPoolExecutor(max_workers=4) as executor:
398 list(executor.map(lambda _: cpu_bound_task(), range(4)))
399time_cpu_threads = time.perf_counter() - start
400
401# CPU-bound with processes (good speedup, no GIL)
402start = time.perf_counter()
403with ProcessPoolExecutor(max_workers=4) as executor:
404 list(executor.map(lambda _: cpu_bound_task(), range(4)))
405time_cpu_processes = time.perf_counter() - start
406
407print(f"I/O-bound with threads: {time_io_threads:.3f}s (good)")
408print(f"CPU-bound with threads: {time_cpu_threads:.3f}s (no speedup)")
409print(f"CPU-bound with processes: {time_cpu_processes:.3f}s (good speedup)")
410
411
412# =============================================================================
413# Summary
414# =============================================================================
415
416section("Summary")
417
418print("""
419Threading:
420- Use for I/O-bound tasks (network, disk, database)
421- Lightweight (shared memory)
422- GIL prevents true parallelism for CPU-bound tasks
423- threading.Thread - basic threading
424- ThreadPoolExecutor - high-level interface
425- Lock/RLock - synchronization
426- Queue - thread-safe communication
427
428Multiprocessing:
429- Use for CPU-bound tasks (computation, data processing)
430- Separate Python interpreter per process (no GIL)
431- Higher overhead (separate memory)
432- multiprocessing.Pool - pool of worker processes
433- ProcessPoolExecutor - high-level interface
434
435Guidelines:
436┌─────────────────────┬──────────────┬──────────────────┐
437│ Task Type │ Use │ Reason │
438├─────────────────────┼──────────────┼──────────────────┤
439│ I/O-bound │ Threading │ GIL released │
440│ CPU-bound │ Multiprocess │ No GIL │
441│ Mixed │ Both │ Combine │
442│ Simple concurrency │ asyncio │ Single-threaded │
443└─────────────────────┴──────────────┴──────────────────┘
444
445ThreadPoolExecutor vs ProcessPoolExecutor:
446- Same interface (concurrent.futures)
447- Easy to switch between them
448- Use ProcessPoolExecutor for CPU-intensive work
449- Use ThreadPoolExecutor for I/O-intensive work
450
451Synchronization primitives:
452- Lock - mutual exclusion
453- RLock - reentrant lock
454- Semaphore - limit concurrent access
455- Event - thread signaling
456- Condition - wait for condition
457- Queue - thread-safe FIFO
458
459Best practices:
4601. Avoid shared state when possible
4612. Use locks to protect shared state
4623. Prefer ProcessPoolExecutor for CPU-bound
4634. Prefer ThreadPoolExecutor for I/O-bound
4645. Consider asyncio for simple I/O concurrency
4656. Profile before optimizing
466""")