05_normalization.py

Download
python 448 lines 14.8 KB
  1"""
  2Normalization and Normal Forms
  3
  4Demonstrates database normalization process:
  5- 1NF (First Normal Form): Atomic values, no repeating groups
  6- 2NF (Second Normal Form): 1NF + no partial dependencies
  7- 3NF (Third Normal Form): 2NF + no transitive dependencies
  8- BCNF (Boyce-Codd Normal Form): 3NF + every determinant is a candidate key
  9- Lossless-join decomposition
 10- Dependency-preserving decomposition
 11
 12Theory:
 13- Normalization eliminates redundancy and update anomalies
 14- Higher normal forms reduce redundancy but may require more joins
 15- BCNF is stricter than 3NF (every FD X→Y requires X to be a superkey)
 16- 3NF preserves dependencies; BCNF may not
 17- Decomposition must be lossless and preferably dependency-preserving
 18"""
 19
 20import sqlite3
 21from typing import Set, List, Tuple
 22from itertools import combinations
 23
 24
 25class Relation:
 26    """Represents a database relation with attributes and FDs."""
 27
 28    def __init__(self, name: str, attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]]):
 29        self.name = name
 30        self.attributes = attributes
 31        self.fds = fds
 32
 33    def __repr__(self):
 34        return f"{self.name}({', '.join(sorted(self.attributes))})"
 35
 36
 37def demonstrate_1nf():
 38    """Demonstrate First Normal Form (1NF)."""
 39    print("=" * 60)
 40    print("FIRST NORMAL FORM (1NF)")
 41    print("=" * 60)
 42    print("Requirements:")
 43    print("  1. Each attribute contains atomic (indivisible) values")
 44    print("  2. No repeating groups")
 45    print("  3. Each row is unique (has a primary key)")
 46    print()
 47
 48    conn = sqlite3.connect(':memory:')
 49    cursor = conn.cursor()
 50
 51    # Violation: Non-atomic values
 52    print("VIOLATION 1: Non-atomic values")
 53    print("-" * 60)
 54    print("Employee(emp_id, name, phones)")
 55    print("Data:")
 56    print("  (1, 'Alice', '555-1234, 555-5678')  ← phones is non-atomic")
 57    print("  (2, 'Bob', '555-9999')")
 58
 59    print("\nβœ“ Fix: Separate table for phones")
 60    print("Employee(emp_id, name)")
 61    print("Phone(emp_id, phone_number)")
 62    cursor.execute("CREATE TABLE Employee (emp_id INTEGER PRIMARY KEY, name TEXT)")
 63    cursor.execute("CREATE TABLE Phone (emp_id INTEGER, phone_number TEXT, PRIMARY KEY(emp_id, phone_number))")
 64    cursor.execute("INSERT INTO Employee VALUES (1, 'Alice'), (2, 'Bob')")
 65    cursor.execute("INSERT INTO Phone VALUES (1, '555-1234'), (1, '555-5678'), (2, '555-9999')")
 66
 67    # Violation: Repeating groups
 68    print("\n\nVIOLATION 2: Repeating groups")
 69    print("-" * 60)
 70    print("Student(student_id, name, course1, course2, course3)")
 71    print("Data:")
 72    print("  (1, 'Alice', 'CS101', 'MATH101', NULL)  ← fixed number of courses")
 73
 74    print("\nβœ“ Fix: Separate table for enrollments")
 75    print("Student(student_id, name)")
 76    print("Enrollment(student_id, course_id)")
 77
 78    conn.close()
 79    print()
 80
 81
 82def compute_closure(attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]]) -> Set[str]:
 83    """Compute attribute closure."""
 84    closure = set(attributes)
 85    changed = True
 86    while changed:
 87        changed = False
 88        for lhs, rhs in fds:
 89            if set(lhs).issubset(closure):
 90                before = len(closure)
 91                closure.update(rhs)
 92                if len(closure) > before:
 93                    changed = True
 94    return closure
 95
 96
 97def find_candidate_keys(attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]]) -> List[Set[str]]:
 98    """Find all candidate keys."""
 99    # Attributes that never appear on RHS must be in every key
100    rhs_attrs = set()
101    for _, rhs in fds:
102        rhs_attrs.update(rhs)
103    must_include = attributes - rhs_attrs
104
105    if compute_closure(must_include, fds) == attributes:
106        return [must_include]
107
108    keys = []
109    other_attrs = list(attributes - must_include)
110
111    for size in range(1, len(other_attrs) + 1):
112        for combo in combinations(other_attrs, size):
113            candidate = must_include | set(combo)
114            if compute_closure(candidate, fds) == attributes:
115                # Check minimality
116                is_minimal = True
117                for attr in combo:
118                    subset = candidate - {attr}
119                    if compute_closure(subset, fds) == attributes:
120                        is_minimal = False
121                        break
122                if is_minimal:
123                    keys.append(candidate)
124
125    return keys if keys else [must_include]
126
127
128def is_2nf(attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]], keys: List[Set[str]]) -> Tuple[bool, str]:
129    """
130    Check if relation is in 2NF.
131    2NF: No partial dependencies (no non-prime attribute depends on proper subset of any candidate key)
132    """
133    # Find prime attributes (attributes in any candidate key)
134    prime_attrs = set()
135    for key in keys:
136        prime_attrs.update(key)
137
138    non_prime = attributes - prime_attrs
139
140    # Check for partial dependencies
141    for lhs, rhs in fds:
142        lhs_set = set(lhs)
143        rhs_set = set(rhs)
144
145        # Check if RHS contains non-prime attributes
146        non_prime_in_rhs = rhs_set & non_prime
147        if not non_prime_in_rhs:
148            continue
149
150        # Check if LHS is a proper subset of any candidate key
151        for key in keys:
152            if lhs_set < key:  # Proper subset
153                attrs = ', '.join(sorted(non_prime_in_rhs))
154                return False, f"Partial dependency: {{{', '.join(sorted(lhs))}}} β†’ {{{attrs}}} (part of key {{{', '.join(sorted(key))}}})"
155
156    return True, "No partial dependencies"
157
158
159def is_3nf(attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]], keys: List[Set[str]]) -> Tuple[bool, str]:
160    """
161    Check if relation is in 3NF.
162    3NF: For every FD X→A, either:
163      - A is in X (trivial), OR
164      - X is a superkey, OR
165      - A is a prime attribute
166    """
167    prime_attrs = set()
168    for key in keys:
169        prime_attrs.update(key)
170
171    for lhs, rhs in fds:
172        lhs_set = set(lhs)
173        for attr in rhs:
174            # Skip if trivial
175            if attr in lhs_set:
176                continue
177
178            # Check if LHS is a superkey
179            if compute_closure(lhs_set, fds) == attributes:
180                continue
181
182            # Check if attr is prime
183            if attr not in prime_attrs:
184                return False, f"Transitive dependency: {{{', '.join(sorted(lhs))}}} β†’ {attr} (non-prime, LHS not superkey)"
185
186    return True, "No transitive dependencies on non-prime attributes"
187
188
189def is_bcnf(attributes: Set[str], fds: Set[Tuple[Set[str], Set[str]]]) -> Tuple[bool, str]:
190    """
191    Check if relation is in BCNF.
192    BCNF: For every non-trivial FD X→Y, X must be a superkey
193    """
194    for lhs, rhs in fds:
195        lhs_set = set(lhs)
196        rhs_set = set(rhs)
197
198        # Skip trivial dependencies
199        if rhs_set.issubset(lhs_set):
200            continue
201
202        # Check if LHS is a superkey
203        if compute_closure(lhs_set, fds) != attributes:
204            return False, f"Violating FD: {{{', '.join(sorted(lhs))}}} β†’ {{{', '.join(sorted(rhs))}}} (LHS not a superkey)"
205
206    return True, "All determinants are superkeys"
207
208
209def demonstrate_2nf():
210    """Demonstrate Second Normal Form (2NF)."""
211    print("=" * 60)
212    print("SECOND NORMAL FORM (2NF)")
213    print("=" * 60)
214    print("Requirements: 1NF + No partial dependencies")
215    print("(No non-prime attribute depends on part of a candidate key)")
216    print()
217
218    # Example: Student enrollments
219    print("Example: StudentCourse(student_id, course_id, student_name, instructor)")
220    print("-" * 60)
221    attributes = {'student_id', 'course_id', 'student_name', 'instructor'}
222    fds = {
223        (frozenset({'student_id', 'course_id'}), frozenset({'instructor'})),
224        (frozenset({'student_id'}), frozenset({'student_name'})),  # Partial dependency!
225        (frozenset({'course_id'}), frozenset({'instructor'}))       # Partial dependency!
226    }
227
228    print("FDs:")
229    print("  {student_id, course_id} β†’ instructor")
230    print("  student_id β†’ student_name (partial!)")
231    print("  course_id β†’ instructor (partial!)")
232    print()
233
234    keys = find_candidate_keys(attributes, fds)
235    print(f"Candidate key: {{{', '.join(sorted(keys[0]))}}}")
236
237    is_2, reason = is_2nf(attributes, fds, keys)
238    print(f"\n2NF: {'βœ“' if is_2 else 'βœ—'} {reason}")
239
240    print("\nβœ“ Fix: Decompose into 3 relations")
241    print("  Student(student_id, student_name)")
242    print("  Course(course_id, instructor)")
243    print("  Enrollment(student_id, course_id)")
244    print()
245
246
247def demonstrate_3nf():
248    """Demonstrate Third Normal Form (3NF)."""
249    print("=" * 60)
250    print("THIRD NORMAL FORM (3NF)")
251    print("=" * 60)
252    print("Requirements: 2NF + No transitive dependencies")
253    print("(Non-prime attributes depend only on candidate keys)")
254    print()
255
256    # Example: Employee with department location
257    print("Example: Employee(emp_id, dept_id, dept_location)")
258    print("-" * 60)
259    attributes = {'emp_id', 'dept_id', 'dept_location'}
260    fds = {
261        (frozenset({'emp_id'}), frozenset({'dept_id'})),
262        (frozenset({'dept_id'}), frozenset({'dept_location'}))  # Transitive!
263    }
264
265    print("FDs:")
266    print("  emp_id β†’ dept_id")
267    print("  dept_id β†’ dept_location (transitive!)")
268    print()
269
270    keys = find_candidate_keys(attributes, fds)
271    print(f"Candidate key: {{{', '.join(sorted(keys[0]))}}}")
272    print("Prime attributes: {emp_id}")
273    print("Non-prime attributes: {dept_id, dept_location}")
274
275    is_3, reason = is_3nf(attributes, fds, keys)
276    print(f"\n3NF: {'βœ“' if is_3 else 'βœ—'} {reason}")
277
278    print("\nExplanation:")
279    print("  emp_id β†’ dept_id β†’ dept_location")
280    print("  dept_location depends on emp_id through dept_id (transitive)")
281
282    print("\nβœ“ Fix: Decompose into 2 relations")
283    print("  Employee(emp_id, dept_id)")
284    print("  Department(dept_id, dept_location)")
285    print()
286
287
288def demonstrate_bcnf():
289    """Demonstrate Boyce-Codd Normal Form (BCNF)."""
290    print("=" * 60)
291    print("BOYCE-CODD NORMAL FORM (BCNF)")
292    print("=" * 60)
293    print("Requirements: For every non-trivial FD X→Y, X is a superkey")
294    print()
295
296    # Classic BCNF example: Course enrollment with time conflicts
297    print("Example: CourseSchedule(student_id, course, instructor)")
298    print("-" * 60)
299    print("Constraint: Each instructor teaches only one course")
300    print()
301
302    attributes = {'student_id', 'course', 'instructor'}
303    fds = {
304        (frozenset({'student_id', 'course'}), frozenset({'instructor'})),
305        (frozenset({'instructor'}), frozenset({'course'}))  # BCNF violation!
306    }
307
308    print("FDs:")
309    print("  {student_id, course} β†’ instructor")
310    print("  instructor β†’ course (BCNF violation!)")
311    print()
312
313    keys = find_candidate_keys(attributes, fds)
314    print(f"Candidate key: {{{', '.join(sorted(keys[0]))}}}")
315
316    # Check 3NF
317    is_3, reason_3 = is_3nf(attributes, fds, keys)
318    print(f"\n3NF: {'βœ“' if is_3 else 'βœ—'}")
319    if is_3:
320        print("  (instructor is prime attribute, so 3NF is satisfied)")
321
322    # Check BCNF
323    is_b, reason_b = is_bcnf(attributes, fds)
324    print(f"\nBCNF: {'βœ“' if is_b else 'βœ—'} {reason_b}")
325
326    print("\nExplanation:")
327    print("  instructor β†’ course, but instructor is not a superkey")
328    print("  This causes redundancy: instructor→course mapping repeated")
329
330    print("\nβœ“ Fix: Decompose into 2 relations")
331    print("  Enrollment(student_id, instructor)")
332    print("  Teaching(instructor, course)")
333    print()
334
335
336def demonstrate_lossless_join():
337    """Demonstrate lossless-join decomposition."""
338    print("=" * 60)
339    print("LOSSLESS-JOIN DECOMPOSITION")
340    print("=" * 60)
341    print()
342
343    conn = sqlite3.connect(':memory:')
344    cursor = conn.cursor()
345
346    # Original relation
347    cursor.execute('''
348        CREATE TABLE Employee (
349            emp_id INTEGER,
350            dept_id INTEGER,
351            dept_name TEXT,
352            PRIMARY KEY (emp_id)
353        )
354    ''')
355
356    cursor.execute("INSERT INTO Employee VALUES (1, 10, 'Engineering')")
357    cursor.execute("INSERT INTO Employee VALUES (2, 10, 'Engineering')")
358    cursor.execute("INSERT INTO Employee VALUES (3, 20, 'Sales')")
359
360    print("Original relation: Employee(emp_id, dept_id, dept_name)")
361    print("-" * 60)
362    cursor.execute("SELECT * FROM Employee")
363    for row in cursor.fetchall():
364        print(f"  {row}")
365
366    # Lossless decomposition
367    print("\nLossless decomposition:")
368    print("  R1(emp_id, dept_id)")
369    print("  R2(dept_id, dept_name)")
370    print()
371
372    cursor.execute("CREATE TABLE R1 AS SELECT emp_id, dept_id FROM Employee")
373    cursor.execute("CREATE TABLE R2 AS SELECT DISTINCT dept_id, dept_name FROM Employee")
374
375    cursor.execute("SELECT * FROM R1")
376    print("R1:")
377    for row in cursor.fetchall():
378        print(f"  {row}")
379
380    cursor.execute("SELECT * FROM R2")
381    print("\nR2:")
382    for row in cursor.fetchall():
383        print(f"  {row}")
384
385    # Reconstruct
386    cursor.execute("""
387        SELECT R1.emp_id, R1.dept_id, R2.dept_name
388        FROM R1 JOIN R2 ON R1.dept_id = R2.dept_id
389    """)
390    print("\nReconstructed (R1 β‹ˆ R2):")
391    for row in cursor.fetchall():
392        print(f"  {row}")
393
394    print("\nβœ“ Lossless: Original = R1 β‹ˆ R2 (FD: dept_id β†’ dept_name)")
395
396    # Lossy decomposition example
397    print("\n" + "-" * 60)
398    print("Lossy decomposition (bad):")
399    print("  S1(emp_id, dept_name)")
400    print("  S2(dept_id, dept_name)")
401    print()
402
403    cursor.execute("CREATE TABLE S1 AS SELECT emp_id, dept_name FROM Employee")
404    cursor.execute("CREATE TABLE S2 AS SELECT dept_id, dept_name FROM Employee")
405
406    cursor.execute("""
407        SELECT S1.emp_id, S2.dept_id, S1.dept_name
408        FROM S1 JOIN S2 ON S1.dept_name = S2.dept_name
409    """)
410    print("Reconstructed (S1 β‹ˆ S2):")
411    for row in cursor.fetchall():
412        print(f"  {row}")
413
414    print("\nβœ— Lossy: Created spurious tuples (extra rows)")
415    print("  (emp 1 and 2 both get dept_id 10 due to same dept_name)")
416
417    conn.close()
418    print()
419
420
421if __name__ == "__main__":
422    print("""
423╔══════════════════════════════════════════════════════════════╗
424β•‘              DATABASE NORMALIZATION                          β•‘
425β•‘  1NF, 2NF, 3NF, BCNF, Lossless Decomposition                 β•‘
426β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
427""")
428
429    demonstrate_1nf()
430    demonstrate_2nf()
431    demonstrate_3nf()
432    demonstrate_bcnf()
433    demonstrate_lossless_join()
434
435    print("=" * 60)
436    print("SUMMARY OF NORMAL FORMS")
437    print("=" * 60)
438    print("1NF: Atomic values, no repeating groups")
439    print("2NF: 1NF + no partial dependencies")
440    print("3NF: 2NF + no transitive dependencies")
441    print("BCNF: Every determinant is a superkey")
442    print()
443    print("Trade-offs:")
444    print("  - Higher NF β†’ less redundancy, more tables/joins")
445    print("  - 3NF preserves dependencies, BCNF may not")
446    print("  - Decomposition must be lossless")
447    print("=" * 60)