threading_demo.py

Download
python 474 lines 13.4 KB
  1"""
  2Threading Demonstration
  3
  4Covers:
  51. Thread pools
  62. Producer-Consumer pattern with Queue
  73. Locks and Semaphores
  84. Race conditions (demonstration and fix)
  95. Thread-safe data structures
 10"""
 11
 12import threading
 13import queue
 14import time
 15import random
 16from typing import List
 17from concurrent.futures import ThreadPoolExecutor, as_completed
 18
 19
 20# =============================================================================
 21# 1. BASIC THREADING
 22# =============================================================================
 23
 24print("=" * 70)
 25print("1. BASIC THREADING")
 26print("=" * 70)
 27
 28
 29def worker(name: str, work_time: float):
 30    """Simple worker function"""
 31    print(f"[{name}] Starting work...")
 32    time.sleep(work_time)
 33    print(f"[{name}] Finished work!")
 34    return f"{name} completed"
 35
 36
 37def demonstrate_basic_threading():
 38    """Basic threading example"""
 39    print("\n[BASIC THREADING]")
 40    print("-" * 50)
 41
 42    # Create threads
 43    threads = []
 44    for i in range(3):
 45        thread = threading.Thread(
 46            target=worker,
 47            args=(f"Worker-{i}", random.uniform(0.1, 0.5))
 48        )
 49        threads.append(thread)
 50        thread.start()
 51
 52    # Wait for all threads to complete
 53    for thread in threads:
 54        thread.join()
 55
 56    print("All threads completed!")
 57
 58
 59# =============================================================================
 60# 2. THREAD POOL
 61# =============================================================================
 62
 63print("\n" + "=" * 70)
 64print("2. THREAD POOL")
 65print("=" * 70)
 66
 67
 68def process_task(task_id: int) -> dict:
 69    """Simulate processing a task"""
 70    print(f"Processing task {task_id} on thread {threading.current_thread().name}")
 71    time.sleep(random.uniform(0.1, 0.5))
 72    return {"task_id": task_id, "result": task_id * 2}
 73
 74
 75def demonstrate_thread_pool():
 76    """Thread pool executor example"""
 77    print("\n[THREAD POOL]")
 78    print("-" * 50)
 79
 80    tasks = range(10)
 81
 82    # Use ThreadPoolExecutor for better resource management
 83    with ThreadPoolExecutor(max_workers=3) as executor:
 84        # Submit all tasks
 85        futures = [executor.submit(process_task, task_id) for task_id in tasks]
 86
 87        # Process results as they complete
 88        results = []
 89        for future in as_completed(futures):
 90            result = future.result()
 91            results.append(result)
 92            print(f"Task {result['task_id']} completed: {result['result']}")
 93
 94    print(f"\nAll {len(results)} tasks completed!")
 95
 96
 97# =============================================================================
 98# 3. PRODUCER-CONSUMER PATTERN
 99# =============================================================================
100
101print("\n" + "=" * 70)
102print("3. PRODUCER-CONSUMER PATTERN")
103print("=" * 70)
104
105
106class ProducerConsumer:
107    """Producer-Consumer pattern using Queue"""
108
109    def __init__(self, max_queue_size: int = 5):
110        self.queue = queue.Queue(maxsize=max_queue_size)
111        self.stop_event = threading.Event()
112
113    def producer(self, name: str, num_items: int):
114        """Producer thread function"""
115        for i in range(num_items):
116            if self.stop_event.is_set():
117                break
118
119            item = f"{name}-Item-{i}"
120            print(f"[{name}] Producing {item}...")
121            self.queue.put(item)  # Blocks if queue is full
122            print(f"[{name}] Produced {item} (Queue size: {self.queue.qsize()})")
123            time.sleep(random.uniform(0.1, 0.3))
124
125        print(f"[{name}] Finished producing")
126
127    def consumer(self, name: str):
128        """Consumer thread function"""
129        while not self.stop_event.is_set():
130            try:
131                # Wait for item with timeout
132                item = self.queue.get(timeout=0.5)
133                print(f"[{name}] Consuming {item}...")
134                time.sleep(random.uniform(0.2, 0.4))  # Simulate processing
135                self.queue.task_done()
136                print(f"[{name}] Consumed {item}")
137            except queue.Empty:
138                # Queue is empty, check if we should stop
139                continue
140
141        print(f"[{name}] Finished consuming")
142
143    def run(self, num_producers: int = 2, num_consumers: int = 3, items_per_producer: int = 5):
144        """Run the producer-consumer demo"""
145        threads = []
146
147        # Start producers
148        for i in range(num_producers):
149            thread = threading.Thread(
150                target=self.producer,
151                args=(f"Producer-{i}", items_per_producer)
152            )
153            threads.append(thread)
154            thread.start()
155
156        # Start consumers
157        for i in range(num_consumers):
158            thread = threading.Thread(
159                target=self.consumer,
160                args=(f"Consumer-{i}",)
161            )
162            threads.append(thread)
163            thread.start()
164
165        # Wait for all producers to finish
166        for thread in threads[:num_producers]:
167            thread.join()
168
169        # Wait for queue to be empty
170        self.queue.join()
171
172        # Signal consumers to stop
173        self.stop_event.set()
174
175        # Wait for all consumers to finish
176        for thread in threads[num_producers:]:
177            thread.join()
178
179
180# =============================================================================
181# 4. LOCKS AND SYNCHRONIZATION
182# =============================================================================
183
184print("\n" + "=" * 70)
185print("4. LOCKS AND SYNCHRONIZATION")
186print("=" * 70)
187
188
189class BankAccount:
190    """Thread-safe bank account using Lock"""
191
192    def __init__(self, initial_balance: float = 0):
193        self.balance = initial_balance
194        self._lock = threading.Lock()
195
196    def deposit(self, amount: float, name: str = ""):
197        """Deposit money (thread-safe)"""
198        with self._lock:  # Acquire lock
199            current = self.balance
200            time.sleep(0.001)  # Simulate processing delay
201            self.balance = current + amount
202            print(f"[{name}] Deposited ${amount:.2f}, New balance: ${self.balance:.2f}")
203
204    def withdraw(self, amount: float, name: str = "") -> bool:
205        """Withdraw money (thread-safe)"""
206        with self._lock:
207            if self.balance >= amount:
208                current = self.balance
209                time.sleep(0.001)  # Simulate processing delay
210                self.balance = current - amount
211                print(f"[{name}] Withdrew ${amount:.2f}, New balance: ${self.balance:.2f}")
212                return True
213            else:
214                print(f"[{name}] Insufficient funds for ${amount:.2f}")
215                return False
216
217    def get_balance(self) -> float:
218        """Get current balance (thread-safe)"""
219        with self._lock:
220            return self.balance
221
222
223def demonstrate_locks():
224    """Demonstrate lock usage"""
225    print("\n[LOCKS]")
226    print("-" * 50)
227
228    account = BankAccount(1000.0)
229
230    def make_transactions(name: str):
231        """Perform multiple transactions"""
232        for _ in range(3):
233            if random.random() > 0.5:
234                account.deposit(random.uniform(10, 50), name)
235            else:
236                account.withdraw(random.uniform(10, 50), name)
237            time.sleep(0.01)
238
239    # Create multiple threads performing transactions
240    threads = []
241    for i in range(3):
242        thread = threading.Thread(target=make_transactions, args=(f"Thread-{i}",))
243        threads.append(thread)
244        thread.start()
245
246    for thread in threads:
247        thread.join()
248
249    print(f"\nFinal balance: ${account.get_balance():.2f}")
250
251
252# =============================================================================
253# 5. RACE CONDITIONS (DEMONSTRATION AND FIX)
254# =============================================================================
255
256print("\n" + "=" * 70)
257print("5. RACE CONDITIONS")
258print("=" * 70)
259
260
261class CounterWithRaceCondition:
262    """Counter with deliberate race condition"""
263
264    def __init__(self):
265        self.count = 0
266
267    def increment(self):
268        """Increment counter (NOT thread-safe!)"""
269        current = self.count
270        time.sleep(0.0001)  # Simulate work - amplifies race condition
271        self.count = current + 1
272
273
274class ThreadSafeCounter:
275    """Thread-safe counter using Lock"""
276
277    def __init__(self):
278        self.count = 0
279        self._lock = threading.Lock()
280
281    def increment(self):
282        """Increment counter (thread-safe)"""
283        with self._lock:
284            current = self.count
285            time.sleep(0.0001)
286            self.count = current + 1
287
288
289def demonstrate_race_condition():
290    """Demonstrate race condition and its fix"""
291    print("\n[RACE CONDITION DEMO]")
292    print("-" * 50)
293
294    # ❌ BAD: Counter with race condition
295    print("WITHOUT LOCK (Race Condition):")
296    unsafe_counter = CounterWithRaceCondition()
297
298    def increment_unsafe(counter, times):
299        for _ in range(times):
300            counter.increment()
301
302    threads = []
303    num_threads = 10
304    increments_per_thread = 100
305
306    for _ in range(num_threads):
307        thread = threading.Thread(target=increment_unsafe, args=(unsafe_counter, increments_per_thread))
308        threads.append(thread)
309        thread.start()
310
311    for thread in threads:
312        thread.join()
313
314    expected = num_threads * increments_per_thread
315    print(f"Expected: {expected}")
316    print(f"Actual: {unsafe_counter.count}")
317    print(f"Lost updates: {expected - unsafe_counter.count}")
318
319    # ✅ GOOD: Thread-safe counter
320    print("\nWITH LOCK (Thread-Safe):")
321    safe_counter = ThreadSafeCounter()
322
323    def increment_safe(counter, times):
324        for _ in range(times):
325            counter.increment()
326
327    threads = []
328    for _ in range(num_threads):
329        thread = threading.Thread(target=increment_safe, args=(safe_counter, increments_per_thread))
330        threads.append(thread)
331        thread.start()
332
333    for thread in threads:
334        thread.join()
335
336    print(f"Expected: {expected}")
337    print(f"Actual: {safe_counter.count}")
338    print(f"Lost updates: {expected - safe_counter.count}")
339
340
341# =============================================================================
342# 6. SEMAPHORE (LIMITED RESOURCES)
343# =============================================================================
344
345print("\n" + "=" * 70)
346print("6. SEMAPHORE")
347print("=" * 70)
348
349
350class ResourcePool:
351    """Limited resource pool using Semaphore"""
352
353    def __init__(self, max_resources: int):
354        self.semaphore = threading.Semaphore(max_resources)
355        self.max_resources = max_resources
356
357    def use_resource(self, name: str, duration: float):
358        """Use a resource (blocks if all resources busy)"""
359        print(f"[{name}] Waiting for resource...")
360        with self.semaphore:  # Acquire resource
361            print(f"[{name}] Acquired resource, using for {duration:.2f}s")
362            time.sleep(duration)
363            print(f"[{name}] Released resource")
364
365
366def demonstrate_semaphore():
367    """Demonstrate semaphore for resource limiting"""
368    print("\n[SEMAPHORE DEMO]")
369    print("-" * 50)
370
371    # Only 2 concurrent resource users allowed
372    pool = ResourcePool(max_resources=2)
373
374    def worker(name: str):
375        pool.use_resource(name, random.uniform(0.2, 0.5))
376
377    # Create more threads than available resources
378    threads = []
379    for i in range(5):
380        thread = threading.Thread(target=worker, args=(f"Worker-{i}",))
381        threads.append(thread)
382        thread.start()
383
384    for thread in threads:
385        thread.join()
386
387
388# =============================================================================
389# DEMONSTRATION
390# =============================================================================
391
392def print_summary():
393    print("\n" + "=" * 70)
394    print("THREADING SUMMARY")
395    print("=" * 70)
396
397    print("""
3981. BASIC THREADING
399   ✓ threading.Thread for simple parallelism
400   ✓ .start() to begin execution
401   ✓ .join() to wait for completion
402
4032. THREAD POOL
404   ✓ ThreadPoolExecutor for better resource management
405   ✓ Automatic thread reuse
406   ✓ Context manager ensures cleanup
407   ✓ as_completed() for results as they finish
408
4093. PRODUCER-CONSUMER
410   ✓ queue.Queue for thread-safe communication
411   ✓ Automatic blocking when full/empty
412   ✓ task_done() and join() for synchronization
413
4144. LOCKS
415   ✓ threading.Lock for mutual exclusion
416   ✓ 'with' statement for automatic acquire/release
417   ✓ Prevents race conditions
418
4195. RACE CONDITIONS
420   ✓ Occur when threads access shared data without synchronization
421   ✓ Results in lost updates
422   ✓ Fixed with locks
423
4246. SEMAPHORE
425   ✓ Limits concurrent access to resources
426   ✓ Like a lock but allows N concurrent users
427   ✓ Good for resource pools (DB connections, etc.)
428
429WHEN TO USE THREADING:
430  ✓ I/O-bound operations (network, file I/O)
431  ✓ Concurrent API calls
432  ✓ GUI applications (keep UI responsive)
433
434WHEN NOT TO USE THREADING:
435  ✗ CPU-bound operations (use multiprocessing instead)
436  ✗ GIL limits Python threading for CPU work
437
438BEST PRACTICES:
439  • Use ThreadPoolExecutor instead of raw threads
440  • Always synchronize shared data access
441  • Prefer queue.Queue for thread communication
442  • Use 'with' statement for locks
443  • Avoid global state when possible
444  • Be careful with thread-unsafe libraries
445""")
446
447
448if __name__ == "__main__":
449    print("THREADING DEMONSTRATIONS")
450    print("=" * 70)
451
452    demonstrate_basic_threading()
453    time.sleep(0.5)
454
455    demonstrate_thread_pool()
456    time.sleep(0.5)
457
458    print("\n" + "=" * 70)
459    print("PRODUCER-CONSUMER PATTERN")
460    print("=" * 70)
461    pc = ProducerConsumer(max_queue_size=3)
462    pc.run(num_producers=2, num_consumers=3, items_per_producer=5)
463
464    time.sleep(0.5)
465    demonstrate_locks()
466
467    time.sleep(0.5)
468    demonstrate_race_condition()
469
470    time.sleep(0.5)
471    demonstrate_semaphore()
472
473    print_summary()