1"""
2Concurrency Control: MVCC and 2PL
3
4Demonstrates concurrency control mechanisms:
5- Multi-Version Concurrency Control (MVCC): timestamp-based
6- Two-Phase Locking (2PL): lock-based
7- Read/write conflicts and serialization
8- Deadlock detection and prevention
9
10Theory:
11- MVCC: Each transaction sees a consistent snapshot via versioning
12 - Readers don't block writers, writers don't block readers
13 - Uses transaction IDs/timestamps to determine visibility
14- 2PL: Transactions acquire locks before accessing data
15 - Growing phase: acquire locks
16 - Shrinking phase: release locks
17 - Strict 2PL: hold all locks until commit
18- Conflicts: WW (write-write), WR (write-read), RW (read-write)
19
20Note: This is a simplified simulation for educational purposes.
21Production databases implement these with much more sophistication.
22"""
23
24import threading
25import time
26import random
27from typing import Dict, Set, Optional, List, Tuple
28from enum import Enum
29from dataclasses import dataclass
30
31
32class LockType(Enum):
33 SHARED = "S" # Read lock
34 EXCLUSIVE = "X" # Write lock
35
36
37@dataclass
38class Version:
39 """A version of a data item in MVCC."""
40 value: int
41 txn_id: int # Transaction that created this version
42 timestamp: float
43 committed: bool = False
44
45
46class MVCCDatabase:
47 """Simplified Multi-Version Concurrency Control implementation."""
48
49 def __init__(self):
50 self.data: Dict[str, List[Version]] = {}
51 self.current_txn_id = 0
52 self.lock = threading.Lock()
53
54 def begin_transaction(self) -> int:
55 """Start a new transaction and return its ID."""
56 with self.lock:
57 self.current_txn_id += 1
58 return self.current_txn_id
59
60 def read(self, key: str, txn_id: int) -> Optional[int]:
61 """Read value for given key as of transaction's snapshot."""
62 with self.lock:
63 if key not in self.data:
64 return None
65
66 # Find latest committed version visible to this transaction
67 # (created by transaction with ID <= txn_id)
68 visible_versions = [
69 v for v in self.data[key]
70 if v.committed and v.txn_id <= txn_id
71 ]
72
73 if not visible_versions:
74 return None
75
76 # Return latest visible version
77 latest = max(visible_versions, key=lambda v: v.txn_id)
78 return latest.value
79
80 def write(self, key: str, value: int, txn_id: int):
81 """Create new version of data item."""
82 with self.lock:
83 if key not in self.data:
84 self.data[key] = []
85
86 # Create new version (uncommitted)
87 version = Version(
88 value=value,
89 txn_id=txn_id,
90 timestamp=time.time(),
91 committed=False
92 )
93 self.data[key].append(version)
94
95 def commit(self, txn_id: int):
96 """Commit transaction, making its versions visible."""
97 with self.lock:
98 for versions in self.data.values():
99 for version in versions:
100 if version.txn_id == txn_id:
101 version.committed = True
102
103 def rollback(self, txn_id: int):
104 """Rollback transaction, removing its versions."""
105 with self.lock:
106 for key in self.data:
107 self.data[key] = [v for v in self.data[key] if v.txn_id != txn_id]
108
109 def print_versions(self, key: str):
110 """Print all versions of a key."""
111 with self.lock:
112 if key not in self.data:
113 print(f" {key}: (no versions)")
114 return
115
116 print(f" {key}:")
117 for v in sorted(self.data[key], key=lambda x: x.txn_id):
118 status = "committed" if v.committed else "uncommitted"
119 print(f" T{v.txn_id}: {v.value} ({status})")
120
121
122class TwoPhaseLockingDatabase:
123 """Simplified Two-Phase Locking implementation."""
124
125 def __init__(self):
126 self.data: Dict[str, int] = {}
127 self.locks: Dict[str, Set[Tuple[int, LockType]]] = {} # key -> {(txn_id, lock_type)}
128 self.lock = threading.Lock()
129 self.wait_graph: Dict[int, Set[int]] = {} # For deadlock detection
130
131 def acquire_lock(self, key: str, txn_id: int, lock_type: LockType, timeout: float = 2.0) -> bool:
132 """Acquire lock on key for transaction."""
133 start_time = time.time()
134
135 while True:
136 with self.lock:
137 if key not in self.locks:
138 self.locks[key] = set()
139
140 current_locks = self.locks[key]
141
142 # Check if lock can be granted
143 can_grant = True
144 if lock_type == LockType.SHARED:
145 # Shared lock: OK if no exclusive locks by other txns
146 for tid, lt in current_locks:
147 if tid != txn_id and lt == LockType.EXCLUSIVE:
148 can_grant = False
149 break
150 else: # EXCLUSIVE
151 # Exclusive lock: OK if no locks by other txns
152 for tid, lt in current_locks:
153 if tid != txn_id:
154 can_grant = False
155 break
156
157 if can_grant:
158 self.locks[key].add((txn_id, lock_type))
159 return True
160
161 # Check timeout
162 if time.time() - start_time > timeout:
163 return False # Timeout (potential deadlock)
164
165 time.sleep(0.01) # Wait before retry
166
167 def release_locks(self, txn_id: int):
168 """Release all locks held by transaction."""
169 with self.lock:
170 for key in self.locks:
171 self.locks[key] = {(tid, lt) for tid, lt in self.locks[key] if tid != txn_id}
172
173 def read(self, key: str, txn_id: int) -> Optional[int]:
174 """Read value with shared lock."""
175 if not self.acquire_lock(key, txn_id, LockType.SHARED):
176 raise Exception(f"T{txn_id}: Cannot acquire shared lock on {key} (deadlock?)")
177
178 with self.lock:
179 return self.data.get(key)
180
181 def write(self, key: str, value: int, txn_id: int):
182 """Write value with exclusive lock."""
183 if not self.acquire_lock(key, txn_id, LockType.EXCLUSIVE):
184 raise Exception(f"T{txn_id}: Cannot acquire exclusive lock on {key} (deadlock?)")
185
186 with self.lock:
187 self.data[key] = value
188
189 def commit(self, txn_id: int):
190 """Commit transaction and release locks (strict 2PL)."""
191 self.release_locks(txn_id)
192
193
194def demonstrate_mvcc():
195 """Demonstrate MVCC snapshot isolation."""
196 print("=" * 60)
197 print("MULTI-VERSION CONCURRENCY CONTROL (MVCC)")
198 print("=" * 60)
199 print()
200
201 db = MVCCDatabase()
202
203 # Setup initial data
204 txn0 = db.begin_transaction()
205 db.write("x", 100, txn0)
206 db.write("y", 200, txn0)
207 db.commit(txn0)
208
209 print("Initial state:")
210 print("-" * 60)
211 db.print_versions("x")
212 db.print_versions("y")
213
214 # Concurrent transactions
215 print("\n\nConcurrent transactions:")
216 print("-" * 60)
217
218 # T1: Read x, sleep, read x again (repeatable read)
219 txn1 = db.begin_transaction()
220 print(f"T{txn1}: BEGIN")
221 x1 = db.read("x", txn1)
222 print(f"T{txn1}: READ x = {x1}")
223
224 # T2: Modify x and commit
225 txn2 = db.begin_transaction()
226 print(f"T{txn2}: BEGIN")
227 db.write("x", 150, txn2)
228 print(f"T{txn2}: WRITE x = 150")
229 db.commit(txn2)
230 print(f"T{txn2}: COMMIT")
231
232 # T1 reads again (should still see 100 due to snapshot isolation)
233 x1_again = db.read("x", txn1)
234 print(f"T{txn1}: READ x = {x1_again} (snapshot isolation - sees old version)")
235 db.commit(txn1)
236 print(f"T{txn1}: COMMIT")
237
238 print("\n\nVersions after both commits:")
239 print("-" * 60)
240 db.print_versions("x")
241
242 # T3: Read latest committed value
243 txn3 = db.begin_transaction()
244 x3 = db.read("x", txn3)
245 print(f"\nT{txn3}: READ x = {x3} (sees latest committed version)")
246 db.commit(txn3)
247
248 print("\n✓ MVCC allows:")
249 print(" - Readers don't block writers")
250 print(" - Writers don't block readers")
251 print(" - Each transaction sees consistent snapshot")
252 print()
253
254
255def demonstrate_2pl():
256 """Demonstrate Two-Phase Locking."""
257 print("=" * 60)
258 print("TWO-PHASE LOCKING (2PL)")
259 print("=" * 60)
260 print()
261
262 db = TwoPhaseLockingDatabase()
263
264 # Initialize data
265 db.data["x"] = 100
266 db.data["y"] = 200
267
268 print("Initial state: x=100, y=200")
269 print("-" * 60)
270
271 # Transaction 1: x = x + 10
272 print("\nT1: Increment x by 10")
273 txn1 = 1
274 try:
275 x = db.read("x", txn1)
276 print(f"T1: READ x = {x} (acquired S lock)")
277 time.sleep(0.1)
278 db.write("x", x + 10, txn1)
279 print(f"T1: WRITE x = {x + 10} (upgraded to X lock)")
280 db.commit(txn1)
281 print(f"T1: COMMIT (released locks)")
282 except Exception as e:
283 print(f"T1: ERROR - {e}")
284 db.release_locks(txn1)
285
286 print(f"\nFinal x = {db.data['x']}")
287
288 # Demonstrate lock conflict
289 print("\n\n" + "=" * 60)
290 print("LOCK CONFLICTS")
291 print("=" * 60)
292 print()
293
294 db.data["balance"] = 1000
295
296 def transfer_out(amount: int, txn_id: int, delay: float):
297 """Transfer money out."""
298 try:
299 print(f"T{txn_id}: Request X lock on balance")
300 balance = db.read("balance", txn_id)
301 print(f"T{txn_id}: READ balance = {balance}")
302 time.sleep(delay)
303 db.write("balance", balance - amount, txn_id)
304 print(f"T{txn_id}: WRITE balance = {balance - amount}")
305 db.commit(txn_id)
306 print(f"T{txn_id}: COMMIT")
307 except Exception as e:
308 print(f"T{txn_id}: ABORTED - {e}")
309 db.release_locks(txn_id)
310
311 # Concurrent conflicting transactions
312 print("Two transactions trying to modify balance concurrently:")
313 print("-" * 60)
314
315 t1 = threading.Thread(target=transfer_out, args=(100, 2, 0.5))
316 t2 = threading.Thread(target=transfer_out, args=(200, 3, 0.3))
317
318 t1.start()
319 time.sleep(0.1) # T2 starts slightly after T1
320 t2.start()
321
322 t1.join()
323 t2.join()
324
325 print(f"\nFinal balance = {db.data['balance']}")
326 print("✓ 2PL ensures serializability through locking")
327 print()
328
329
330def demonstrate_deadlock():
331 """Demonstrate deadlock scenario."""
332 print("=" * 60)
333 print("DEADLOCK SCENARIO")
334 print("=" * 60)
335 print()
336
337 db = TwoPhaseLockingDatabase()
338 db.data["x"] = 100
339 db.data["y"] = 200
340
341 print("Initial: x=100, y=200")
342 print("-" * 60)
343 print("\nScenario: T1 and T2 both need locks on x and y")
344 print(" T1: Lock x, then y")
345 print(" T2: Lock y, then x")
346 print(" → Circular wait → Deadlock")
347 print()
348
349 deadlock_detected = [False]
350
351 def transaction1():
352 txn_id = 1
353 try:
354 print(f"T{txn_id}: Request lock on x")
355 db.read("x", txn_id)
356 print(f"T{txn_id}: Acquired lock on x")
357 time.sleep(0.2)
358 print(f"T{txn_id}: Request lock on y")
359 db.read("y", txn_id)
360 print(f"T{txn_id}: Acquired lock on y")
361 db.commit(txn_id)
362 print(f"T{txn_id}: COMMIT")
363 except Exception as e:
364 print(f"T{txn_id}: DEADLOCK DETECTED - {e}")
365 deadlock_detected[0] = True
366 db.release_locks(txn_id)
367
368 def transaction2():
369 txn_id = 2
370 try:
371 time.sleep(0.1) # Start slightly after T1
372 print(f"T{txn_id}: Request lock on y")
373 db.read("y", txn_id)
374 print(f"T{txn_id}: Acquired lock on y")
375 time.sleep(0.2)
376 print(f"T{txn_id}: Request lock on x")
377 db.read("x", txn_id)
378 print(f"T{txn_id}: Acquired lock on x")
379 db.commit(txn_id)
380 print(f"T{txn_id}: COMMIT")
381 except Exception as e:
382 print(f"T{txn_id}: DEADLOCK DETECTED - {e}")
383 deadlock_detected[0] = True
384 db.release_locks(txn_id)
385
386 t1 = threading.Thread(target=transaction1)
387 t2 = threading.Thread(target=transaction2)
388
389 t1.start()
390 t2.start()
391 t1.join()
392 t2.join()
393
394 if deadlock_detected[0]:
395 print("\n✓ Deadlock detected via timeout")
396 print("\nDeadlock prevention strategies:")
397 print(" 1. Lock ordering: always acquire locks in same order")
398 print(" 2. Timeout: abort transaction if lock wait too long")
399 print(" 3. Wait-die/Wound-wait: timestamp-based schemes")
400 print()
401
402
403if __name__ == "__main__":
404 print("""
405╔══════════════════════════════════════════════════════════════╗
406║ CONCURRENCY CONTROL: MVCC AND 2PL ║
407║ Multi-Version Concurrency Control, Two-Phase Locking ║
408╚══════════════════════════════════════════════════════════════╝
409""")
410
411 demonstrate_mvcc()
412 demonstrate_2pl()
413 demonstrate_deadlock()
414
415 print("=" * 60)
416 print("SUMMARY")
417 print("=" * 60)
418 print("MVCC (Multi-Version Concurrency Control):")
419 print(" ✓ Readers don't block writers")
420 print(" ✓ Snapshot isolation")
421 print(" ✗ More storage (multiple versions)")
422 print()
423 print("2PL (Two-Phase Locking):")
424 print(" ✓ Guarantees serializability")
425 print(" ✗ Readers block writers (and vice versa)")
426 print(" ✗ Deadlock possible")
427 print()
428 print("Modern databases often use hybrid approaches")
429 print(" (e.g., MVCC with 2PL for writes)")
430 print("=" * 60)