08_concurrency_mvcc.py

Download
python 431 lines 13.5 KB
  1"""
  2Concurrency Control: MVCC and 2PL
  3
  4Demonstrates concurrency control mechanisms:
  5- Multi-Version Concurrency Control (MVCC): timestamp-based
  6- Two-Phase Locking (2PL): lock-based
  7- Read/write conflicts and serialization
  8- Deadlock detection and prevention
  9
 10Theory:
 11- MVCC: Each transaction sees a consistent snapshot via versioning
 12  - Readers don't block writers, writers don't block readers
 13  - Uses transaction IDs/timestamps to determine visibility
 14- 2PL: Transactions acquire locks before accessing data
 15  - Growing phase: acquire locks
 16  - Shrinking phase: release locks
 17  - Strict 2PL: hold all locks until commit
 18- Conflicts: WW (write-write), WR (write-read), RW (read-write)
 19
 20Note: This is a simplified simulation for educational purposes.
 21Production databases implement these with much more sophistication.
 22"""
 23
 24import threading
 25import time
 26import random
 27from typing import Dict, Set, Optional, List, Tuple
 28from enum import Enum
 29from dataclasses import dataclass
 30
 31
 32class LockType(Enum):
 33    SHARED = "S"      # Read lock
 34    EXCLUSIVE = "X"   # Write lock
 35
 36
 37@dataclass
 38class Version:
 39    """A version of a data item in MVCC."""
 40    value: int
 41    txn_id: int       # Transaction that created this version
 42    timestamp: float
 43    committed: bool = False
 44
 45
 46class MVCCDatabase:
 47    """Simplified Multi-Version Concurrency Control implementation."""
 48
 49    def __init__(self):
 50        self.data: Dict[str, List[Version]] = {}
 51        self.current_txn_id = 0
 52        self.lock = threading.Lock()
 53
 54    def begin_transaction(self) -> int:
 55        """Start a new transaction and return its ID."""
 56        with self.lock:
 57            self.current_txn_id += 1
 58            return self.current_txn_id
 59
 60    def read(self, key: str, txn_id: int) -> Optional[int]:
 61        """Read value for given key as of transaction's snapshot."""
 62        with self.lock:
 63            if key not in self.data:
 64                return None
 65
 66            # Find latest committed version visible to this transaction
 67            # (created by transaction with ID <= txn_id)
 68            visible_versions = [
 69                v for v in self.data[key]
 70                if v.committed and v.txn_id <= txn_id
 71            ]
 72
 73            if not visible_versions:
 74                return None
 75
 76            # Return latest visible version
 77            latest = max(visible_versions, key=lambda v: v.txn_id)
 78            return latest.value
 79
 80    def write(self, key: str, value: int, txn_id: int):
 81        """Create new version of data item."""
 82        with self.lock:
 83            if key not in self.data:
 84                self.data[key] = []
 85
 86            # Create new version (uncommitted)
 87            version = Version(
 88                value=value,
 89                txn_id=txn_id,
 90                timestamp=time.time(),
 91                committed=False
 92            )
 93            self.data[key].append(version)
 94
 95    def commit(self, txn_id: int):
 96        """Commit transaction, making its versions visible."""
 97        with self.lock:
 98            for versions in self.data.values():
 99                for version in versions:
100                    if version.txn_id == txn_id:
101                        version.committed = True
102
103    def rollback(self, txn_id: int):
104        """Rollback transaction, removing its versions."""
105        with self.lock:
106            for key in self.data:
107                self.data[key] = [v for v in self.data[key] if v.txn_id != txn_id]
108
109    def print_versions(self, key: str):
110        """Print all versions of a key."""
111        with self.lock:
112            if key not in self.data:
113                print(f"  {key}: (no versions)")
114                return
115
116            print(f"  {key}:")
117            for v in sorted(self.data[key], key=lambda x: x.txn_id):
118                status = "committed" if v.committed else "uncommitted"
119                print(f"    T{v.txn_id}: {v.value} ({status})")
120
121
122class TwoPhaseLockingDatabase:
123    """Simplified Two-Phase Locking implementation."""
124
125    def __init__(self):
126        self.data: Dict[str, int] = {}
127        self.locks: Dict[str, Set[Tuple[int, LockType]]] = {}  # key -> {(txn_id, lock_type)}
128        self.lock = threading.Lock()
129        self.wait_graph: Dict[int, Set[int]] = {}  # For deadlock detection
130
131    def acquire_lock(self, key: str, txn_id: int, lock_type: LockType, timeout: float = 2.0) -> bool:
132        """Acquire lock on key for transaction."""
133        start_time = time.time()
134
135        while True:
136            with self.lock:
137                if key not in self.locks:
138                    self.locks[key] = set()
139
140                current_locks = self.locks[key]
141
142                # Check if lock can be granted
143                can_grant = True
144                if lock_type == LockType.SHARED:
145                    # Shared lock: OK if no exclusive locks by other txns
146                    for tid, lt in current_locks:
147                        if tid != txn_id and lt == LockType.EXCLUSIVE:
148                            can_grant = False
149                            break
150                else:  # EXCLUSIVE
151                    # Exclusive lock: OK if no locks by other txns
152                    for tid, lt in current_locks:
153                        if tid != txn_id:
154                            can_grant = False
155                            break
156
157                if can_grant:
158                    self.locks[key].add((txn_id, lock_type))
159                    return True
160
161            # Check timeout
162            if time.time() - start_time > timeout:
163                return False  # Timeout (potential deadlock)
164
165            time.sleep(0.01)  # Wait before retry
166
167    def release_locks(self, txn_id: int):
168        """Release all locks held by transaction."""
169        with self.lock:
170            for key in self.locks:
171                self.locks[key] = {(tid, lt) for tid, lt in self.locks[key] if tid != txn_id}
172
173    def read(self, key: str, txn_id: int) -> Optional[int]:
174        """Read value with shared lock."""
175        if not self.acquire_lock(key, txn_id, LockType.SHARED):
176            raise Exception(f"T{txn_id}: Cannot acquire shared lock on {key} (deadlock?)")
177
178        with self.lock:
179            return self.data.get(key)
180
181    def write(self, key: str, value: int, txn_id: int):
182        """Write value with exclusive lock."""
183        if not self.acquire_lock(key, txn_id, LockType.EXCLUSIVE):
184            raise Exception(f"T{txn_id}: Cannot acquire exclusive lock on {key} (deadlock?)")
185
186        with self.lock:
187            self.data[key] = value
188
189    def commit(self, txn_id: int):
190        """Commit transaction and release locks (strict 2PL)."""
191        self.release_locks(txn_id)
192
193
194def demonstrate_mvcc():
195    """Demonstrate MVCC snapshot isolation."""
196    print("=" * 60)
197    print("MULTI-VERSION CONCURRENCY CONTROL (MVCC)")
198    print("=" * 60)
199    print()
200
201    db = MVCCDatabase()
202
203    # Setup initial data
204    txn0 = db.begin_transaction()
205    db.write("x", 100, txn0)
206    db.write("y", 200, txn0)
207    db.commit(txn0)
208
209    print("Initial state:")
210    print("-" * 60)
211    db.print_versions("x")
212    db.print_versions("y")
213
214    # Concurrent transactions
215    print("\n\nConcurrent transactions:")
216    print("-" * 60)
217
218    # T1: Read x, sleep, read x again (repeatable read)
219    txn1 = db.begin_transaction()
220    print(f"T{txn1}: BEGIN")
221    x1 = db.read("x", txn1)
222    print(f"T{txn1}: READ x = {x1}")
223
224    # T2: Modify x and commit
225    txn2 = db.begin_transaction()
226    print(f"T{txn2}: BEGIN")
227    db.write("x", 150, txn2)
228    print(f"T{txn2}: WRITE x = 150")
229    db.commit(txn2)
230    print(f"T{txn2}: COMMIT")
231
232    # T1 reads again (should still see 100 due to snapshot isolation)
233    x1_again = db.read("x", txn1)
234    print(f"T{txn1}: READ x = {x1_again} (snapshot isolation - sees old version)")
235    db.commit(txn1)
236    print(f"T{txn1}: COMMIT")
237
238    print("\n\nVersions after both commits:")
239    print("-" * 60)
240    db.print_versions("x")
241
242    # T3: Read latest committed value
243    txn3 = db.begin_transaction()
244    x3 = db.read("x", txn3)
245    print(f"\nT{txn3}: READ x = {x3} (sees latest committed version)")
246    db.commit(txn3)
247
248    print("\n✓ MVCC allows:")
249    print("  - Readers don't block writers")
250    print("  - Writers don't block readers")
251    print("  - Each transaction sees consistent snapshot")
252    print()
253
254
255def demonstrate_2pl():
256    """Demonstrate Two-Phase Locking."""
257    print("=" * 60)
258    print("TWO-PHASE LOCKING (2PL)")
259    print("=" * 60)
260    print()
261
262    db = TwoPhaseLockingDatabase()
263
264    # Initialize data
265    db.data["x"] = 100
266    db.data["y"] = 200
267
268    print("Initial state: x=100, y=200")
269    print("-" * 60)
270
271    # Transaction 1: x = x + 10
272    print("\nT1: Increment x by 10")
273    txn1 = 1
274    try:
275        x = db.read("x", txn1)
276        print(f"T1: READ x = {x} (acquired S lock)")
277        time.sleep(0.1)
278        db.write("x", x + 10, txn1)
279        print(f"T1: WRITE x = {x + 10} (upgraded to X lock)")
280        db.commit(txn1)
281        print(f"T1: COMMIT (released locks)")
282    except Exception as e:
283        print(f"T1: ERROR - {e}")
284        db.release_locks(txn1)
285
286    print(f"\nFinal x = {db.data['x']}")
287
288    # Demonstrate lock conflict
289    print("\n\n" + "=" * 60)
290    print("LOCK CONFLICTS")
291    print("=" * 60)
292    print()
293
294    db.data["balance"] = 1000
295
296    def transfer_out(amount: int, txn_id: int, delay: float):
297        """Transfer money out."""
298        try:
299            print(f"T{txn_id}: Request X lock on balance")
300            balance = db.read("balance", txn_id)
301            print(f"T{txn_id}: READ balance = {balance}")
302            time.sleep(delay)
303            db.write("balance", balance - amount, txn_id)
304            print(f"T{txn_id}: WRITE balance = {balance - amount}")
305            db.commit(txn_id)
306            print(f"T{txn_id}: COMMIT")
307        except Exception as e:
308            print(f"T{txn_id}: ABORTED - {e}")
309            db.release_locks(txn_id)
310
311    # Concurrent conflicting transactions
312    print("Two transactions trying to modify balance concurrently:")
313    print("-" * 60)
314
315    t1 = threading.Thread(target=transfer_out, args=(100, 2, 0.5))
316    t2 = threading.Thread(target=transfer_out, args=(200, 3, 0.3))
317
318    t1.start()
319    time.sleep(0.1)  # T2 starts slightly after T1
320    t2.start()
321
322    t1.join()
323    t2.join()
324
325    print(f"\nFinal balance = {db.data['balance']}")
326    print("✓ 2PL ensures serializability through locking")
327    print()
328
329
330def demonstrate_deadlock():
331    """Demonstrate deadlock scenario."""
332    print("=" * 60)
333    print("DEADLOCK SCENARIO")
334    print("=" * 60)
335    print()
336
337    db = TwoPhaseLockingDatabase()
338    db.data["x"] = 100
339    db.data["y"] = 200
340
341    print("Initial: x=100, y=200")
342    print("-" * 60)
343    print("\nScenario: T1 and T2 both need locks on x and y")
344    print("  T1: Lock x, then y")
345    print("  T2: Lock y, then x")
346    print("  → Circular wait → Deadlock")
347    print()
348
349    deadlock_detected = [False]
350
351    def transaction1():
352        txn_id = 1
353        try:
354            print(f"T{txn_id}: Request lock on x")
355            db.read("x", txn_id)
356            print(f"T{txn_id}: Acquired lock on x")
357            time.sleep(0.2)
358            print(f"T{txn_id}: Request lock on y")
359            db.read("y", txn_id)
360            print(f"T{txn_id}: Acquired lock on y")
361            db.commit(txn_id)
362            print(f"T{txn_id}: COMMIT")
363        except Exception as e:
364            print(f"T{txn_id}: DEADLOCK DETECTED - {e}")
365            deadlock_detected[0] = True
366            db.release_locks(txn_id)
367
368    def transaction2():
369        txn_id = 2
370        try:
371            time.sleep(0.1)  # Start slightly after T1
372            print(f"T{txn_id}: Request lock on y")
373            db.read("y", txn_id)
374            print(f"T{txn_id}: Acquired lock on y")
375            time.sleep(0.2)
376            print(f"T{txn_id}: Request lock on x")
377            db.read("x", txn_id)
378            print(f"T{txn_id}: Acquired lock on x")
379            db.commit(txn_id)
380            print(f"T{txn_id}: COMMIT")
381        except Exception as e:
382            print(f"T{txn_id}: DEADLOCK DETECTED - {e}")
383            deadlock_detected[0] = True
384            db.release_locks(txn_id)
385
386    t1 = threading.Thread(target=transaction1)
387    t2 = threading.Thread(target=transaction2)
388
389    t1.start()
390    t2.start()
391    t1.join()
392    t2.join()
393
394    if deadlock_detected[0]:
395        print("\n✓ Deadlock detected via timeout")
396        print("\nDeadlock prevention strategies:")
397        print("  1. Lock ordering: always acquire locks in same order")
398        print("  2. Timeout: abort transaction if lock wait too long")
399        print("  3. Wait-die/Wound-wait: timestamp-based schemes")
400    print()
401
402
403if __name__ == "__main__":
404    print("""
405╔══════════════════════════════════════════════════════════════╗
406║          CONCURRENCY CONTROL: MVCC AND 2PL                   ║
407║  Multi-Version Concurrency Control, Two-Phase Locking        ║
408╚══════════════════════════════════════════════════════════════╝
409""")
410
411    demonstrate_mvcc()
412    demonstrate_2pl()
413    demonstrate_deadlock()
414
415    print("=" * 60)
416    print("SUMMARY")
417    print("=" * 60)
418    print("MVCC (Multi-Version Concurrency Control):")
419    print("  ✓ Readers don't block writers")
420    print("  ✓ Snapshot isolation")
421    print("  ✗ More storage (multiple versions)")
422    print()
423    print("2PL (Two-Phase Locking):")
424    print("  ✓ Guarantees serializability")
425    print("  ✗ Readers block writers (and vice versa)")
426    print("  ✗ Deadlock possible")
427    print()
428    print("Modern databases often use hybrid approaches")
429    print("  (e.g., MVCC with 2PL for writes)")
430    print("=" * 60)