1"""
2Normalization and Normal Forms
3
4Demonstrates database normalization process:
5- 1NF (First Normal Form): Atomic values, no repeating groups
6- 2NF (Second Normal Form): 1NF + no partial dependencies
7- 3NF (Third Normal Form): 2NF + no transitive dependencies
8- BCNF (Boyce-Codd Normal Form): 3NF + every determinant is a candidate key
9- Lossless-join decomposition
10- Dependency-preserving decomposition
11
12Theory:
13- Normalization eliminates redundancy and update anomalies
14- Higher normal forms reduce redundancy but may require more joins
15- BCNF is stricter than 3NF (every FD XβY requires X to be a superkey)
16- 3NF preserves dependencies; BCNF may not
17- Decomposition must be lossless and preferably dependency-preserving
18"""
19
20import sqlite3
21from typing import Set, List, Tuple
22from itertools import combinations
23
24
25class Relation:
26 """Represents a database relation with attributes and FDs."""
27
28 def __init__(self, name: str, attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]]):
29 self.name = name
30 self.attributes = attributes
31 self.fds = fds
32
33 def __repr__(self):
34 return f"{self.name}({', '.join(sorted(self.attributes))})"
35
36
37def demonstrate_1nf():
38 """Demonstrate First Normal Form (1NF)."""
39 print("=" * 60)
40 print("FIRST NORMAL FORM (1NF)")
41 print("=" * 60)
42 print("Requirements:")
43 print(" 1. Each attribute contains atomic (indivisible) values")
44 print(" 2. No repeating groups")
45 print(" 3. Each row is unique (has a primary key)")
46 print()
47
48 conn = sqlite3.connect(':memory:')
49 cursor = conn.cursor()
50
51 # Violation: Non-atomic values
52 print("VIOLATION 1: Non-atomic values")
53 print("-" * 60)
54 print("Employee(emp_id, name, phones)")
55 print("Data:")
56 print(" (1, 'Alice', '555-1234, 555-5678') β phones is non-atomic")
57 print(" (2, 'Bob', '555-9999')")
58
59 print("\nβ Fix: Separate table for phones")
60 print("Employee(emp_id, name)")
61 print("Phone(emp_id, phone_number)")
62 cursor.execute("CREATE TABLE Employee (emp_id INTEGER PRIMARY KEY, name TEXT)")
63 cursor.execute("CREATE TABLE Phone (emp_id INTEGER, phone_number TEXT, PRIMARY KEY(emp_id, phone_number))")
64 cursor.execute("INSERT INTO Employee VALUES (1, 'Alice'), (2, 'Bob')")
65 cursor.execute("INSERT INTO Phone VALUES (1, '555-1234'), (1, '555-5678'), (2, '555-9999')")
66
67 # Violation: Repeating groups
68 print("\n\nVIOLATION 2: Repeating groups")
69 print("-" * 60)
70 print("Student(student_id, name, course1, course2, course3)")
71 print("Data:")
72 print(" (1, 'Alice', 'CS101', 'MATH101', NULL) β fixed number of courses")
73
74 print("\nβ Fix: Separate table for enrollments")
75 print("Student(student_id, name)")
76 print("Enrollment(student_id, course_id)")
77
78 conn.close()
79 print()
80
81
82def compute_closure(attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]]) -> Set[str]:
83 """Compute attribute closure."""
84 closure = set(attributes)
85 changed = True
86 while changed:
87 changed = False
88 for lhs, rhs in fds:
89 if set(lhs).issubset(closure):
90 before = len(closure)
91 closure.update(rhs)
92 if len(closure) > before:
93 changed = True
94 return closure
95
96
97def find_candidate_keys(attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]]) -> List[Set[str]]:
98 """Find all candidate keys."""
99 # Attributes that never appear on RHS must be in every key
100 rhs_attrs = set()
101 for _, rhs in fds:
102 rhs_attrs.update(rhs)
103 must_include = attributes - rhs_attrs
104
105 if compute_closure(must_include, fds) == attributes:
106 return [must_include]
107
108 keys = []
109 other_attrs = list(attributes - must_include)
110
111 for size in range(1, len(other_attrs) + 1):
112 for combo in combinations(other_attrs, size):
113 candidate = must_include | set(combo)
114 if compute_closure(candidate, fds) == attributes:
115 # Check minimality
116 is_minimal = True
117 for attr in combo:
118 subset = candidate - {attr}
119 if compute_closure(subset, fds) == attributes:
120 is_minimal = False
121 break
122 if is_minimal:
123 keys.append(candidate)
124
125 return keys if keys else [must_include]
126
127
128def is_2nf(attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]], keys: List[Set[str]]) -> Tuple[bool, str]:
129 """
130 Check if relation is in 2NF.
131 2NF: No partial dependencies (no non-prime attribute depends on proper subset of any candidate key)
132 """
133 # Find prime attributes (attributes in any candidate key)
134 prime_attrs = set()
135 for key in keys:
136 prime_attrs.update(key)
137
138 non_prime = attributes - prime_attrs
139
140 # Check for partial dependencies
141 for lhs, rhs in fds:
142 lhs_set = set(lhs)
143 rhs_set = set(rhs)
144
145 # Check if RHS contains non-prime attributes
146 non_prime_in_rhs = rhs_set & non_prime
147 if not non_prime_in_rhs:
148 continue
149
150 # Check if LHS is a proper subset of any candidate key
151 for key in keys:
152 if lhs_set < key: # Proper subset
153 attrs = ', '.join(sorted(non_prime_in_rhs))
154 return False, f"Partial dependency: {{{', '.join(sorted(lhs))}}} β {{{attrs}}} (part of key {{{', '.join(sorted(key))}}})"
155
156 return True, "No partial dependencies"
157
158
159def is_3nf(attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]], keys: List[Set[str]]) -> Tuple[bool, str]:
160 """
161 Check if relation is in 3NF.
162 3NF: For every FD XβA, either:
163 - A is in X (trivial), OR
164 - X is a superkey, OR
165 - A is a prime attribute
166 """
167 prime_attrs = set()
168 for key in keys:
169 prime_attrs.update(key)
170
171 for lhs, rhs in fds:
172 lhs_set = set(lhs)
173 for attr in rhs:
174 # Skip if trivial
175 if attr in lhs_set:
176 continue
177
178 # Check if LHS is a superkey
179 if compute_closure(lhs_set, fds) == attributes:
180 continue
181
182 # Check if attr is prime
183 if attr not in prime_attrs:
184 return False, f"Transitive dependency: {{{', '.join(sorted(lhs))}}} β {attr} (non-prime, LHS not superkey)"
185
186 return True, "No transitive dependencies on non-prime attributes"
187
188
189def is_bcnf(attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]]) -> Tuple[bool, str]:
190 """
191 Check if relation is in BCNF.
192 BCNF: For every non-trivial FD XβY, X must be a superkey
193 """
194 for lhs, rhs in fds:
195 lhs_set = set(lhs)
196 rhs_set = set(rhs)
197
198 # Skip trivial dependencies
199 if rhs_set.issubset(lhs_set):
200 continue
201
202 # Check if LHS is a superkey
203 if compute_closure(lhs_set, fds) != attributes:
204 return False, f"Violating FD: {{{', '.join(sorted(lhs))}}} β {{{', '.join(sorted(rhs))}}} (LHS not a superkey)"
205
206 return True, "All determinants are superkeys"
207
208
209def demonstrate_2nf():
210 """Demonstrate Second Normal Form (2NF)."""
211 print("=" * 60)
212 print("SECOND NORMAL FORM (2NF)")
213 print("=" * 60)
214 print("Requirements: 1NF + No partial dependencies")
215 print("(No non-prime attribute depends on part of a candidate key)")
216 print()
217
218 # Example: Student enrollments
219 print("Example: StudentCourse(student_id, course_id, student_name, instructor)")
220 print("-" * 60)
221 attributes = {'student_id', 'course_id', 'student_name', 'instructor'}
222 fds = {
223 (frozenset({'student_id', 'course_id'}), frozenset({'instructor'})),
224 (frozenset({'student_id'}), frozenset({'student_name'})), # Partial dependency!
225 (frozenset({'course_id'}), frozenset({'instructor'})) # Partial dependency!
226 }
227
228 print("FDs:")
229 print(" {student_id, course_id} β instructor")
230 print(" student_id β student_name (partial!)")
231 print(" course_id β instructor (partial!)")
232 print()
233
234 keys = find_candidate_keys(attributes, fds)
235 print(f"Candidate key: {{{', '.join(sorted(keys[0]))}}}")
236
237 is_2, reason = is_2nf(attributes, fds, keys)
238 print(f"\n2NF: {'β' if is_2 else 'β'} {reason}")
239
240 print("\nβ Fix: Decompose into 3 relations")
241 print(" Student(student_id, student_name)")
242 print(" Course(course_id, instructor)")
243 print(" Enrollment(student_id, course_id)")
244 print()
245
246
247def demonstrate_3nf():
248 """Demonstrate Third Normal Form (3NF)."""
249 print("=" * 60)
250 print("THIRD NORMAL FORM (3NF)")
251 print("=" * 60)
252 print("Requirements: 2NF + No transitive dependencies")
253 print("(Non-prime attributes depend only on candidate keys)")
254 print()
255
256 # Example: Employee with department location
257 print("Example: Employee(emp_id, dept_id, dept_location)")
258 print("-" * 60)
259 attributes = {'emp_id', 'dept_id', 'dept_location'}
260 fds = {
261 (frozenset({'emp_id'}), frozenset({'dept_id'})),
262 (frozenset({'dept_id'}), frozenset({'dept_location'})) # Transitive!
263 }
264
265 print("FDs:")
266 print(" emp_id β dept_id")
267 print(" dept_id β dept_location (transitive!)")
268 print()
269
270 keys = find_candidate_keys(attributes, fds)
271 print(f"Candidate key: {{{', '.join(sorted(keys[0]))}}}")
272 print("Prime attributes: {emp_id}")
273 print("Non-prime attributes: {dept_id, dept_location}")
274
275 is_3, reason = is_3nf(attributes, fds, keys)
276 print(f"\n3NF: {'β' if is_3 else 'β'} {reason}")
277
278 print("\nExplanation:")
279 print(" emp_id β dept_id β dept_location")
280 print(" dept_location depends on emp_id through dept_id (transitive)")
281
282 print("\nβ Fix: Decompose into 2 relations")
283 print(" Employee(emp_id, dept_id)")
284 print(" Department(dept_id, dept_location)")
285 print()
286
287
288def demonstrate_bcnf():
289 """Demonstrate Boyce-Codd Normal Form (BCNF)."""
290 print("=" * 60)
291 print("BOYCE-CODD NORMAL FORM (BCNF)")
292 print("=" * 60)
293 print("Requirements: For every non-trivial FD XβY, X is a superkey")
294 print()
295
296 # Classic BCNF example: Course enrollment with time conflicts
297 print("Example: CourseSchedule(student_id, course, instructor)")
298 print("-" * 60)
299 print("Constraint: Each instructor teaches only one course")
300 print()
301
302 attributes = {'student_id', 'course', 'instructor'}
303 fds = {
304 (frozenset({'student_id', 'course'}), frozenset({'instructor'})),
305 (frozenset({'instructor'}), frozenset({'course'})) # BCNF violation!
306 }
307
308 print("FDs:")
309 print(" {student_id, course} β instructor")
310 print(" instructor β course (BCNF violation!)")
311 print()
312
313 keys = find_candidate_keys(attributes, fds)
314 print(f"Candidate key: {{{', '.join(sorted(keys[0]))}}}")
315
316 # Check 3NF
317 is_3, reason_3 = is_3nf(attributes, fds, keys)
318 print(f"\n3NF: {'β' if is_3 else 'β'}")
319 if is_3:
320 print(" (instructor is prime attribute, so 3NF is satisfied)")
321
322 # Check BCNF
323 is_b, reason_b = is_bcnf(attributes, fds)
324 print(f"\nBCNF: {'β' if is_b else 'β'} {reason_b}")
325
326 print("\nExplanation:")
327 print(" instructor β course, but instructor is not a superkey")
328 print(" This causes redundancy: instructorβcourse mapping repeated")
329
330 print("\nβ Fix: Decompose into 2 relations")
331 print(" Enrollment(student_id, instructor)")
332 print(" Teaching(instructor, course)")
333 print()
334
335
336def demonstrate_lossless_join():
337 """Demonstrate lossless-join decomposition."""
338 print("=" * 60)
339 print("LOSSLESS-JOIN DECOMPOSITION")
340 print("=" * 60)
341 print()
342
343 conn = sqlite3.connect(':memory:')
344 cursor = conn.cursor()
345
346 # Original relation
347 cursor.execute('''
348 CREATE TABLE Employee (
349 emp_id INTEGER,
350 dept_id INTEGER,
351 dept_name TEXT,
352 PRIMARY KEY (emp_id)
353 )
354 ''')
355
356 cursor.execute("INSERT INTO Employee VALUES (1, 10, 'Engineering')")
357 cursor.execute("INSERT INTO Employee VALUES (2, 10, 'Engineering')")
358 cursor.execute("INSERT INTO Employee VALUES (3, 20, 'Sales')")
359
360 print("Original relation: Employee(emp_id, dept_id, dept_name)")
361 print("-" * 60)
362 cursor.execute("SELECT * FROM Employee")
363 for row in cursor.fetchall():
364 print(f" {row}")
365
366 # Lossless decomposition
367 print("\nLossless decomposition:")
368 print(" R1(emp_id, dept_id)")
369 print(" R2(dept_id, dept_name)")
370 print()
371
372 cursor.execute("CREATE TABLE R1 AS SELECT emp_id, dept_id FROM Employee")
373 cursor.execute("CREATE TABLE R2 AS SELECT DISTINCT dept_id, dept_name FROM Employee")
374
375 cursor.execute("SELECT * FROM R1")
376 print("R1:")
377 for row in cursor.fetchall():
378 print(f" {row}")
379
380 cursor.execute("SELECT * FROM R2")
381 print("\nR2:")
382 for row in cursor.fetchall():
383 print(f" {row}")
384
385 # Reconstruct
386 cursor.execute("""
387 SELECT R1.emp_id, R1.dept_id, R2.dept_name
388 FROM R1 JOIN R2 ON R1.dept_id = R2.dept_id
389 """)
390 print("\nReconstructed (R1 β R2):")
391 for row in cursor.fetchall():
392 print(f" {row}")
393
394 print("\nβ Lossless: Original = R1 β R2 (FD: dept_id β dept_name)")
395
396 # Lossy decomposition example
397 print("\n" + "-" * 60)
398 print("Lossy decomposition (bad):")
399 print(" S1(emp_id, dept_name)")
400 print(" S2(dept_id, dept_name)")
401 print()
402
403 cursor.execute("CREATE TABLE S1 AS SELECT emp_id, dept_name FROM Employee")
404 cursor.execute("CREATE TABLE S2 AS SELECT dept_id, dept_name FROM Employee")
405
406 cursor.execute("""
407 SELECT S1.emp_id, S2.dept_id, S1.dept_name
408 FROM S1 JOIN S2 ON S1.dept_name = S2.dept_name
409 """)
410 print("Reconstructed (S1 β S2):")
411 for row in cursor.fetchall():
412 print(f" {row}")
413
414 print("\nβ Lossy: Created spurious tuples (extra rows)")
415 print(" (emp 1 and 2 both get dept_id 10 due to same dept_name)")
416
417 conn.close()
418 print()
419
420
421if __name__ == "__main__":
422 print("""
423ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
424β DATABASE NORMALIZATION β
425β 1NF, 2NF, 3NF, BCNF, Lossless Decomposition β
426ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
427""")
428
429 demonstrate_1nf()
430 demonstrate_2nf()
431 demonstrate_3nf()
432 demonstrate_bcnf()
433 demonstrate_lossless_join()
434
435 print("=" * 60)
436 print("SUMMARY OF NORMAL FORMS")
437 print("=" * 60)
438 print("1NF: Atomic values, no repeating groups")
439 print("2NF: 1NF + no partial dependencies")
440 print("3NF: 2NF + no transitive dependencies")
441 print("BCNF: Every determinant is a superkey")
442 print()
443 print("Trade-offs:")
444 print(" - Higher NF β less redundancy, more tables/joins")
445 print(" - 3NF preserves dependencies, BCNF may not")
446 print(" - Decomposition must be lossless")
447 print("=" * 60)