1"""
2Threading Demonstration
3
4Covers:
51. Thread pools
62. Producer-Consumer pattern with Queue
73. Locks and Semaphores
84. Race conditions (demonstration and fix)
95. Thread-safe data structures
10"""
11
12import threading
13import queue
14import time
15import random
16from typing import List
17from concurrent.futures import ThreadPoolExecutor, as_completed
18
19
20# =============================================================================
21# 1. BASIC THREADING
22# =============================================================================
23
24print("=" * 70)
25print("1. BASIC THREADING")
26print("=" * 70)
27
28
29def worker(name: str, work_time: float):
30 """Simple worker function"""
31 print(f"[{name}] Starting work...")
32 time.sleep(work_time)
33 print(f"[{name}] Finished work!")
34 return f"{name} completed"
35
36
37def demonstrate_basic_threading():
38 """Basic threading example"""
39 print("\n[BASIC THREADING]")
40 print("-" * 50)
41
42 # Create threads
43 threads = []
44 for i in range(3):
45 thread = threading.Thread(
46 target=worker,
47 args=(f"Worker-{i}", random.uniform(0.1, 0.5))
48 )
49 threads.append(thread)
50 thread.start()
51
52 # Wait for all threads to complete
53 for thread in threads:
54 thread.join()
55
56 print("All threads completed!")
57
58
59# =============================================================================
60# 2. THREAD POOL
61# =============================================================================
62
63print("\n" + "=" * 70)
64print("2. THREAD POOL")
65print("=" * 70)
66
67
68def process_task(task_id: int) -> dict:
69 """Simulate processing a task"""
70 print(f"Processing task {task_id} on thread {threading.current_thread().name}")
71 time.sleep(random.uniform(0.1, 0.5))
72 return {"task_id": task_id, "result": task_id * 2}
73
74
75def demonstrate_thread_pool():
76 """Thread pool executor example"""
77 print("\n[THREAD POOL]")
78 print("-" * 50)
79
80 tasks = range(10)
81
82 # Use ThreadPoolExecutor for better resource management
83 with ThreadPoolExecutor(max_workers=3) as executor:
84 # Submit all tasks
85 futures = [executor.submit(process_task, task_id) for task_id in tasks]
86
87 # Process results as they complete
88 results = []
89 for future in as_completed(futures):
90 result = future.result()
91 results.append(result)
92 print(f"Task {result['task_id']} completed: {result['result']}")
93
94 print(f"\nAll {len(results)} tasks completed!")
95
96
97# =============================================================================
98# 3. PRODUCER-CONSUMER PATTERN
99# =============================================================================
100
101print("\n" + "=" * 70)
102print("3. PRODUCER-CONSUMER PATTERN")
103print("=" * 70)
104
105
106class ProducerConsumer:
107 """Producer-Consumer pattern using Queue"""
108
109 def __init__(self, max_queue_size: int = 5):
110 self.queue = queue.Queue(maxsize=max_queue_size)
111 self.stop_event = threading.Event()
112
113 def producer(self, name: str, num_items: int):
114 """Producer thread function"""
115 for i in range(num_items):
116 if self.stop_event.is_set():
117 break
118
119 item = f"{name}-Item-{i}"
120 print(f"[{name}] Producing {item}...")
121 self.queue.put(item) # Blocks if queue is full
122 print(f"[{name}] Produced {item} (Queue size: {self.queue.qsize()})")
123 time.sleep(random.uniform(0.1, 0.3))
124
125 print(f"[{name}] Finished producing")
126
127 def consumer(self, name: str):
128 """Consumer thread function"""
129 while not self.stop_event.is_set():
130 try:
131 # Wait for item with timeout
132 item = self.queue.get(timeout=0.5)
133 print(f"[{name}] Consuming {item}...")
134 time.sleep(random.uniform(0.2, 0.4)) # Simulate processing
135 self.queue.task_done()
136 print(f"[{name}] Consumed {item}")
137 except queue.Empty:
138 # Queue is empty, check if we should stop
139 continue
140
141 print(f"[{name}] Finished consuming")
142
143 def run(self, num_producers: int = 2, num_consumers: int = 3, items_per_producer: int = 5):
144 """Run the producer-consumer demo"""
145 threads = []
146
147 # Start producers
148 for i in range(num_producers):
149 thread = threading.Thread(
150 target=self.producer,
151 args=(f"Producer-{i}", items_per_producer)
152 )
153 threads.append(thread)
154 thread.start()
155
156 # Start consumers
157 for i in range(num_consumers):
158 thread = threading.Thread(
159 target=self.consumer,
160 args=(f"Consumer-{i}",)
161 )
162 threads.append(thread)
163 thread.start()
164
165 # Wait for all producers to finish
166 for thread in threads[:num_producers]:
167 thread.join()
168
169 # Wait for queue to be empty
170 self.queue.join()
171
172 # Signal consumers to stop
173 self.stop_event.set()
174
175 # Wait for all consumers to finish
176 for thread in threads[num_producers:]:
177 thread.join()
178
179
180# =============================================================================
181# 4. LOCKS AND SYNCHRONIZATION
182# =============================================================================
183
184print("\n" + "=" * 70)
185print("4. LOCKS AND SYNCHRONIZATION")
186print("=" * 70)
187
188
189class BankAccount:
190 """Thread-safe bank account using Lock"""
191
192 def __init__(self, initial_balance: float = 0):
193 self.balance = initial_balance
194 self._lock = threading.Lock()
195
196 def deposit(self, amount: float, name: str = ""):
197 """Deposit money (thread-safe)"""
198 with self._lock: # Acquire lock
199 current = self.balance
200 time.sleep(0.001) # Simulate processing delay
201 self.balance = current + amount
202 print(f"[{name}] Deposited ${amount:.2f}, New balance: ${self.balance:.2f}")
203
204 def withdraw(self, amount: float, name: str = "") -> bool:
205 """Withdraw money (thread-safe)"""
206 with self._lock:
207 if self.balance >= amount:
208 current = self.balance
209 time.sleep(0.001) # Simulate processing delay
210 self.balance = current - amount
211 print(f"[{name}] Withdrew ${amount:.2f}, New balance: ${self.balance:.2f}")
212 return True
213 else:
214 print(f"[{name}] Insufficient funds for ${amount:.2f}")
215 return False
216
217 def get_balance(self) -> float:
218 """Get current balance (thread-safe)"""
219 with self._lock:
220 return self.balance
221
222
223def demonstrate_locks():
224 """Demonstrate lock usage"""
225 print("\n[LOCKS]")
226 print("-" * 50)
227
228 account = BankAccount(1000.0)
229
230 def make_transactions(name: str):
231 """Perform multiple transactions"""
232 for _ in range(3):
233 if random.random() > 0.5:
234 account.deposit(random.uniform(10, 50), name)
235 else:
236 account.withdraw(random.uniform(10, 50), name)
237 time.sleep(0.01)
238
239 # Create multiple threads performing transactions
240 threads = []
241 for i in range(3):
242 thread = threading.Thread(target=make_transactions, args=(f"Thread-{i}",))
243 threads.append(thread)
244 thread.start()
245
246 for thread in threads:
247 thread.join()
248
249 print(f"\nFinal balance: ${account.get_balance():.2f}")
250
251
252# =============================================================================
253# 5. RACE CONDITIONS (DEMONSTRATION AND FIX)
254# =============================================================================
255
256print("\n" + "=" * 70)
257print("5. RACE CONDITIONS")
258print("=" * 70)
259
260
261class CounterWithRaceCondition:
262 """Counter with deliberate race condition"""
263
264 def __init__(self):
265 self.count = 0
266
267 def increment(self):
268 """Increment counter (NOT thread-safe!)"""
269 current = self.count
270 time.sleep(0.0001) # Simulate work - amplifies race condition
271 self.count = current + 1
272
273
274class ThreadSafeCounter:
275 """Thread-safe counter using Lock"""
276
277 def __init__(self):
278 self.count = 0
279 self._lock = threading.Lock()
280
281 def increment(self):
282 """Increment counter (thread-safe)"""
283 with self._lock:
284 current = self.count
285 time.sleep(0.0001)
286 self.count = current + 1
287
288
289def demonstrate_race_condition():
290 """Demonstrate race condition and its fix"""
291 print("\n[RACE CONDITION DEMO]")
292 print("-" * 50)
293
294 # ❌ BAD: Counter with race condition
295 print("WITHOUT LOCK (Race Condition):")
296 unsafe_counter = CounterWithRaceCondition()
297
298 def increment_unsafe(counter, times):
299 for _ in range(times):
300 counter.increment()
301
302 threads = []
303 num_threads = 10
304 increments_per_thread = 100
305
306 for _ in range(num_threads):
307 thread = threading.Thread(target=increment_unsafe, args=(unsafe_counter, increments_per_thread))
308 threads.append(thread)
309 thread.start()
310
311 for thread in threads:
312 thread.join()
313
314 expected = num_threads * increments_per_thread
315 print(f"Expected: {expected}")
316 print(f"Actual: {unsafe_counter.count}")
317 print(f"Lost updates: {expected - unsafe_counter.count}")
318
319 # ✅ GOOD: Thread-safe counter
320 print("\nWITH LOCK (Thread-Safe):")
321 safe_counter = ThreadSafeCounter()
322
323 def increment_safe(counter, times):
324 for _ in range(times):
325 counter.increment()
326
327 threads = []
328 for _ in range(num_threads):
329 thread = threading.Thread(target=increment_safe, args=(safe_counter, increments_per_thread))
330 threads.append(thread)
331 thread.start()
332
333 for thread in threads:
334 thread.join()
335
336 print(f"Expected: {expected}")
337 print(f"Actual: {safe_counter.count}")
338 print(f"Lost updates: {expected - safe_counter.count}")
339
340
341# =============================================================================
342# 6. SEMAPHORE (LIMITED RESOURCES)
343# =============================================================================
344
345print("\n" + "=" * 70)
346print("6. SEMAPHORE")
347print("=" * 70)
348
349
350class ResourcePool:
351 """Limited resource pool using Semaphore"""
352
353 def __init__(self, max_resources: int):
354 self.semaphore = threading.Semaphore(max_resources)
355 self.max_resources = max_resources
356
357 def use_resource(self, name: str, duration: float):
358 """Use a resource (blocks if all resources busy)"""
359 print(f"[{name}] Waiting for resource...")
360 with self.semaphore: # Acquire resource
361 print(f"[{name}] Acquired resource, using for {duration:.2f}s")
362 time.sleep(duration)
363 print(f"[{name}] Released resource")
364
365
366def demonstrate_semaphore():
367 """Demonstrate semaphore for resource limiting"""
368 print("\n[SEMAPHORE DEMO]")
369 print("-" * 50)
370
371 # Only 2 concurrent resource users allowed
372 pool = ResourcePool(max_resources=2)
373
374 def worker(name: str):
375 pool.use_resource(name, random.uniform(0.2, 0.5))
376
377 # Create more threads than available resources
378 threads = []
379 for i in range(5):
380 thread = threading.Thread(target=worker, args=(f"Worker-{i}",))
381 threads.append(thread)
382 thread.start()
383
384 for thread in threads:
385 thread.join()
386
387
388# =============================================================================
389# DEMONSTRATION
390# =============================================================================
391
392def print_summary():
393 print("\n" + "=" * 70)
394 print("THREADING SUMMARY")
395 print("=" * 70)
396
397 print("""
3981. BASIC THREADING
399 ✓ threading.Thread for simple parallelism
400 ✓ .start() to begin execution
401 ✓ .join() to wait for completion
402
4032. THREAD POOL
404 ✓ ThreadPoolExecutor for better resource management
405 ✓ Automatic thread reuse
406 ✓ Context manager ensures cleanup
407 ✓ as_completed() for results as they finish
408
4093. PRODUCER-CONSUMER
410 ✓ queue.Queue for thread-safe communication
411 ✓ Automatic blocking when full/empty
412 ✓ task_done() and join() for synchronization
413
4144. LOCKS
415 ✓ threading.Lock for mutual exclusion
416 ✓ 'with' statement for automatic acquire/release
417 ✓ Prevents race conditions
418
4195. RACE CONDITIONS
420 ✓ Occur when threads access shared data without synchronization
421 ✓ Results in lost updates
422 ✓ Fixed with locks
423
4246. SEMAPHORE
425 ✓ Limits concurrent access to resources
426 ✓ Like a lock but allows N concurrent users
427 ✓ Good for resource pools (DB connections, etc.)
428
429WHEN TO USE THREADING:
430 ✓ I/O-bound operations (network, file I/O)
431 ✓ Concurrent API calls
432 ✓ GUI applications (keep UI responsive)
433
434WHEN NOT TO USE THREADING:
435 ✗ CPU-bound operations (use multiprocessing instead)
436 ✗ GIL limits Python threading for CPU work
437
438BEST PRACTICES:
439 • Use ThreadPoolExecutor instead of raw threads
440 • Always synchronize shared data access
441 • Prefer queue.Queue for thread communication
442 • Use 'with' statement for locks
443 • Avoid global state when possible
444 • Be careful with thread-unsafe libraries
445""")
446
447
448if __name__ == "__main__":
449 print("THREADING DEMONSTRATIONS")
450 print("=" * 70)
451
452 demonstrate_basic_threading()
453 time.sleep(0.5)
454
455 demonstrate_thread_pool()
456 time.sleep(0.5)
457
458 print("\n" + "=" * 70)
459 print("PRODUCER-CONSUMER PATTERN")
460 print("=" * 70)
461 pc = ProducerConsumer(max_queue_size=3)
462 pc.run(num_producers=2, num_consumers=3, items_per_producer=5)
463
464 time.sleep(0.5)
465 demonstrate_locks()
466
467 time.sleep(0.5)
468 demonstrate_race_condition()
469
470 time.sleep(0.5)
471 demonstrate_semaphore()
472
473 print_summary()