authz_demo.py

Download
python 560 lines 17.9 KB
  1"""
  2Authorization Models Demo
  3=========================
  4
  5Educational demonstration of authorization concepts:
  6- RBAC (Role-Based Access Control)
  7- ABAC (Attribute-Based Access Control)
  8- ACL (Access Control List)
  9- Permission inheritance
 10
 11Uses only Python standard library. No external dependencies required.
 12"""
 13
 14from __future__ import annotations
 15from dataclasses import dataclass, field
 16from enum import Enum, Flag, auto
 17from typing import Any
 18from datetime import datetime, time
 19
 20print("=" * 65)
 21print("  Authorization Models Demo")
 22print("=" * 65)
 23print()
 24
 25
 26# ============================================================
 27# Section 1: RBAC (Role-Based Access Control)
 28# ============================================================
 29
 30print("-" * 65)
 31print("  Section 1: RBAC (Role-Based Access Control)")
 32print("-" * 65)
 33
 34print("""
 35  RBAC assigns permissions to roles, then roles to users.
 36  Users inherit all permissions from their assigned roles.
 37
 38  User  -->  Role  -->  Permission
 39  Alice -->  Admin -->  [read, write, delete, manage_users]
 40  Bob   -->  Editor --> [read, write]
 41  Carol -->  Viewer --> [read]
 42""")
 43
 44
 45class Permission(Flag):
 46    """Bitwise permission flags for efficient storage."""
 47    NONE = 0
 48    READ = auto()
 49    WRITE = auto()
 50    DELETE = auto()
 51    MANAGE_USERS = auto()
 52    MANAGE_ROLES = auto()
 53    ADMIN = READ | WRITE | DELETE | MANAGE_USERS | MANAGE_ROLES
 54
 55
 56@dataclass
 57class Role:
 58    """A named role with a set of permissions."""
 59    name: str
 60    permissions: Permission
 61    description: str = ""
 62
 63    def has_permission(self, perm: Permission) -> bool:
 64        return (self.permissions & perm) == perm
 65
 66
 67@dataclass
 68class RBACUser:
 69    """A user with one or more roles."""
 70    username: str
 71    roles: list[Role] = field(default_factory=list)
 72
 73    def has_permission(self, perm: Permission) -> bool:
 74        return any(role.has_permission(perm) for role in self.roles)
 75
 76    def get_all_permissions(self) -> Permission:
 77        result = Permission.NONE
 78        for role in self.roles:
 79            result |= role.permissions
 80        return result
 81
 82
 83# Define roles
 84admin_role = Role("admin", Permission.ADMIN, "Full system access")
 85editor_role = Role("editor", Permission.READ | Permission.WRITE, "Can read and write")
 86viewer_role = Role("viewer", Permission.READ, "Read-only access")
 87moderator_role = Role(
 88    "moderator",
 89    Permission.READ | Permission.WRITE | Permission.DELETE,
 90    "Can moderate content",
 91)
 92
 93# Assign roles to users
 94alice = RBACUser("alice", [admin_role])
 95bob = RBACUser("bob", [editor_role])
 96carol = RBACUser("carol", [viewer_role])
 97dave = RBACUser("dave", [editor_role, moderator_role])  # Multiple roles
 98
 99print("\n  Users and their roles:")
100for user in [alice, bob, carol, dave]:
101    role_names = ", ".join(r.name for r in user.roles)
102    perms = user.get_all_permissions()
103    perm_names = [p.name for p in Permission if p in perms and p.name != "ADMIN"]
104    print(f"    {user.username:<8} roles=[{role_names}]")
105    print(f"             permissions={perm_names}")
106print()
107
108# Permission checks
109print("  Permission checks:")
110checks = [
111    (alice, Permission.MANAGE_USERS, "Alice: manage users"),
112    (bob, Permission.WRITE, "Bob: write content"),
113    (bob, Permission.DELETE, "Bob: delete content"),
114    (carol, Permission.READ, "Carol: read content"),
115    (carol, Permission.WRITE, "Carol: write content"),
116    (dave, Permission.DELETE, "Dave: delete content"),
117]
118for user, perm, desc in checks:
119    result = user.has_permission(perm)
120    status = "ALLOWED" if result else "DENIED"
121    print(f"    {desc:<28} -> {status}")
122print()
123
124
125# ============================================================
126# Section 2: ABAC (Attribute-Based Access Control)
127# ============================================================
128
129print("-" * 65)
130print("  Section 2: ABAC (Attribute-Based Access Control)")
131print("-" * 65)
132
133print("""
134  ABAC makes access decisions based on attributes of:
135  - Subject (user): role, department, clearance level
136  - Resource: type, classification, owner
137  - Action: read, write, delete
138  - Environment: time of day, IP address, location
139
140  More flexible than RBAC but more complex to manage.
141""")
142
143
144@dataclass
145class Subject:
146    """The entity requesting access."""
147    user_id: str
148    department: str
149    clearance_level: int  # 1=Public, 2=Internal, 3=Confidential, 4=Secret
150    roles: list[str] = field(default_factory=list)
151
152
153@dataclass
154class Resource:
155    """The object being accessed."""
156    resource_id: str
157    resource_type: str
158    classification: int  # 1=Public, 2=Internal, 3=Confidential, 4=Secret
159    owner_department: str
160    owner_id: str
161
162
163@dataclass
164class Environment:
165    """Contextual attributes."""
166    current_time: time
167    ip_address: str
168    is_vpn: bool = False
169
170
171class ABACPolicy:
172    """A single ABAC policy rule."""
173
174    def __init__(self, name: str, description: str,
175                 condition: callable, effect: str = "allow"):
176        self.name = name
177        self.description = description
178        self.condition = condition
179        self.effect = effect  # "allow" or "deny"
180
181    def evaluate(self, subject: Subject, resource: Resource,
182                 action: str, environment: Environment) -> str | None:
183        """Returns effect if condition matches, None otherwise."""
184        if self.condition(subject, resource, action, environment):
185            return self.effect
186        return None
187
188
189class ABACEngine:
190    """Policy evaluation engine using deny-overrides combining."""
191
192    def __init__(self):
193        self.policies: list[ABACPolicy] = []
194
195    def add_policy(self, policy: ABACPolicy):
196        self.policies.append(policy)
197
198    def evaluate(self, subject: Subject, resource: Resource,
199                 action: str, environment: Environment) -> tuple[bool, list[str]]:
200        """Evaluate all policies. Deny overrides allow."""
201        reasons = []
202        has_allow = False
203
204        for policy in self.policies:
205            result = policy.evaluate(subject, resource, action, environment)
206            if result == "deny":
207                reasons.append(f"DENIED by '{policy.name}': {policy.description}")
208                return False, reasons
209            elif result == "allow":
210                has_allow = True
211                reasons.append(f"ALLOWED by '{policy.name}'")
212
213        if has_allow:
214            return True, reasons
215        reasons.append("DENIED: no matching allow policy")
216        return False, reasons
217
218
219# Create ABAC engine with policies
220engine = ABACEngine()
221
222# Policy 1: Clearance level must meet or exceed resource classification
223engine.add_policy(ABACPolicy(
224    "clearance_check",
225    "Subject clearance must >= resource classification",
226    lambda s, r, a, e: s.clearance_level >= r.classification,
227    "allow",
228))
229
230# Policy 2: Deny access to Secret resources outside business hours
231engine.add_policy(ABACPolicy(
232    "business_hours_secret",
233    "Secret resources only during business hours (9-18)",
234    lambda s, r, a, e: (
235        r.classification == 4 and not (time(9, 0) <= e.current_time <= time(18, 0))
236    ),
237    "deny",
238))
239
240# Policy 3: Deny write/delete to resources from other departments
241engine.add_policy(ABACPolicy(
242    "department_write_restriction",
243    "Write/delete only within own department",
244    lambda s, r, a, e: (
245        a in ("write", "delete") and s.department != r.owner_department
246    ),
247    "deny",
248))
249
250# Policy 4: Require VPN for confidential+ resources
251engine.add_policy(ABACPolicy(
252    "vpn_required_confidential",
253    "Confidential+ resources require VPN",
254    lambda s, r, a, e: r.classification >= 3 and not e.is_vpn,
255    "deny",
256))
257
258# Test scenarios
259print("\n  ABAC Policy Evaluation Scenarios:")
260print()
261
262scenarios = [
263    {
264        "desc": "Engineer reads public docs (clearance 2, public resource)",
265        "subject": Subject("eng_01", "engineering", 2, ["engineer"]),
266        "resource": Resource("doc_1", "document", 1, "engineering", "eng_01"),
267        "action": "read",
268        "env": Environment(time(10, 0), "10.0.0.1", is_vpn=True),
269    },
270    {
271        "desc": "Engineer reads secret doc without VPN",
272        "subject": Subject("eng_02", "engineering", 4, ["engineer"]),
273        "resource": Resource("doc_s", "document", 4, "engineering", "eng_02"),
274        "action": "read",
275        "env": Environment(time(14, 0), "203.0.113.5", is_vpn=False),
276    },
277    {
278        "desc": "Marketing writes to engineering resource",
279        "subject": Subject("mkt_01", "marketing", 2, ["analyst"]),
280        "resource": Resource("spec_1", "spec", 2, "engineering", "eng_01"),
281        "action": "write",
282        "env": Environment(time(11, 0), "10.0.0.2", is_vpn=True),
283    },
284    {
285        "desc": "Admin reads secret doc at night via VPN",
286        "subject": Subject("adm_01", "security", 4, ["admin"]),
287        "resource": Resource("doc_s2", "document", 4, "security", "adm_01"),
288        "action": "read",
289        "env": Environment(time(23, 0), "10.0.0.3", is_vpn=True),
290    },
291    {
292        "desc": "Low-clearance user reads confidential resource",
293        "subject": Subject("int_01", "intern", 1, ["intern"]),
294        "resource": Resource("plan_1", "plan", 3, "engineering", "eng_01"),
295        "action": "read",
296        "env": Environment(time(10, 0), "10.0.0.4", is_vpn=True),
297    },
298]
299
300for scenario in scenarios:
301    allowed, reasons = engine.evaluate(
302        scenario["subject"], scenario["resource"],
303        scenario["action"], scenario["env"],
304    )
305    status = "ALLOWED" if allowed else "DENIED"
306    print(f"  Scenario: {scenario['desc']}")
307    print(f"    Result: {status}")
308    for reason in reasons:
309        print(f"    Reason: {reason}")
310    print()
311
312
313# ============================================================
314# Section 3: ACL (Access Control List)
315# ============================================================
316
317print("-" * 65)
318print("  Section 3: ACL (Access Control List)")
319print("-" * 65)
320
321print("""
322  ACL attaches permissions directly to resources.
323  Each resource has a list of (subject, permissions) entries.
324
325  Similar to Unix file permissions:
326    -rwxr-xr--  owner=rwx  group=r-x  others=r--
327""")
328
329
330class ACLPermission(Enum):
331    READ = "read"
332    WRITE = "write"
333    EXECUTE = "execute"
334    DELETE = "delete"
335    ADMIN = "admin"
336
337
338@dataclass
339class ACLEntry:
340    """Single entry in an access control list."""
341    principal: str      # user_id or group_id
342    principal_type: str  # "user" or "group"
343    permissions: set[ACLPermission]
344
345
346class ACL:
347    """Access Control List for a resource."""
348
349    def __init__(self, resource_id: str, owner: str):
350        self.resource_id = resource_id
351        self.owner = owner
352        self.entries: list[ACLEntry] = []
353
354    def add_entry(self, principal: str, principal_type: str,
355                  permissions: set[ACLPermission]):
356        self.entries.append(ACLEntry(principal, principal_type, permissions))
357
358    def check_access(self, user_id: str, groups: list[str],
359                     permission: ACLPermission) -> bool:
360        """Check if user has the specified permission."""
361        # Owner always has full access
362        if user_id == self.owner:
363            return True
364
365        for entry in self.entries:
366            if entry.principal_type == "user" and entry.principal == user_id:
367                if permission in entry.permissions:
368                    return True
369            elif entry.principal_type == "group" and entry.principal in groups:
370                if permission in entry.permissions:
371                    return True
372        return False
373
374    def display(self):
375        """Display ACL in a readable format."""
376        print(f"    Resource: {self.resource_id}  (owner: {self.owner})")
377        for entry in self.entries:
378            perms = ", ".join(p.value for p in entry.permissions)
379            print(f"      {entry.principal_type}:{entry.principal} -> [{perms}]")
380
381
382# Create ACL for a document
383doc_acl = ACL("project_plan.docx", "alice")
384doc_acl.add_entry("bob", "user", {ACLPermission.READ, ACLPermission.WRITE})
385doc_acl.add_entry("carol", "user", {ACLPermission.READ})
386doc_acl.add_entry("engineering", "group", {ACLPermission.READ})
387doc_acl.add_entry("managers", "group",
388                   {ACLPermission.READ, ACLPermission.WRITE, ACLPermission.DELETE})
389
390print("\n  Document ACL:")
391doc_acl.display()
392print()
393
394# ACL checks
395acl_checks = [
396    ("alice", [], ACLPermission.DELETE, "Alice (owner) delete"),
397    ("bob", ["engineering"], ACLPermission.WRITE, "Bob write"),
398    ("bob", ["engineering"], ACLPermission.DELETE, "Bob delete"),
399    ("carol", [], ACLPermission.READ, "Carol read"),
400    ("dave", ["engineering"], ACLPermission.READ, "Dave (engineering group) read"),
401    ("dave", ["engineering"], ACLPermission.WRITE, "Dave (engineering group) write"),
402    ("eve", ["managers"], ACLPermission.DELETE, "Eve (managers group) delete"),
403]
404
405print("  ACL Permission Checks:")
406for user_id, groups, perm, desc in acl_checks:
407    result = doc_acl.check_access(user_id, groups, perm)
408    status = "ALLOWED" if result else "DENIED"
409    print(f"    {desc:<36} -> {status}")
410print()
411
412
413# ============================================================
414# Section 4: Permission Inheritance
415# ============================================================
416
417print("-" * 65)
418print("  Section 4: Permission Inheritance")
419print("-" * 65)
420
421print("""
422  Permissions can be inherited through hierarchies:
423  - Role hierarchy:  Admin > Manager > Editor > Viewer
424  - Resource hierarchy: Organization > Department > Project > File
425
426  Example:
427    /company
428      /company/engineering        (engineering team: read/write)
429      /company/engineering/proj1  (inherits from parent)
430      /company/engineering/proj1/secret.md  (override: restricted)
431""")
432
433
434class PermissionNode:
435    """A node in a hierarchical permission tree."""
436
437    def __init__(self, name: str, parent: PermissionNode | None = None):
438        self.name = name
439        self.parent = parent
440        self.children: list[PermissionNode] = []
441        self.acl: dict[str, set[str]] = {}  # user -> permissions
442        self.inherit: bool = True  # Whether to inherit from parent
443
444        if parent:
445            parent.children.append(self)
446
447    def set_permissions(self, user: str, perms: set[str]):
448        self.acl[user] = perms
449
450    def get_effective_permissions(self, user: str) -> set[str]:
451        """Get permissions considering inheritance chain."""
452        # Own explicit permissions
453        own_perms = self.acl.get(user, None)
454
455        if own_perms is not None:
456            # Explicit permissions override inheritance
457            return own_perms
458
459        # Inherit from parent if allowed
460        if self.inherit and self.parent:
461            return self.parent.get_effective_permissions(user)
462
463        return set()  # No permissions
464
465    def get_full_path(self) -> str:
466        if self.parent:
467            return f"{self.parent.get_full_path()}/{self.name}"
468        return f"/{self.name}"
469
470
471# Build resource hierarchy
472company = PermissionNode("company")
473engineering = PermissionNode("engineering", company)
474marketing = PermissionNode("marketing", company)
475proj1 = PermissionNode("proj1", engineering)
476proj2 = PermissionNode("proj2", engineering)
477secret_file = PermissionNode("secret.md", proj1)
478
479# Set permissions at various levels
480company.set_permissions("admin", {"read", "write", "delete", "manage"})
481company.set_permissions("ceo", {"read"})
482engineering.set_permissions("eng_team", {"read", "write"})
483marketing.set_permissions("mkt_team", {"read", "write"})
484proj1.set_permissions("proj1_lead", {"read", "write", "delete"})
485
486# Override: secret.md has restricted access (no inheritance)
487secret_file.inherit = False
488secret_file.set_permissions("admin", {"read", "write"})
489secret_file.set_permissions("proj1_lead", {"read"})
490
491# Display hierarchy and effective permissions
492print("\n  Resource Hierarchy and Permissions:")
493
494all_nodes = [company, engineering, marketing, proj1, proj2, secret_file]
495all_users = ["admin", "ceo", "eng_team", "mkt_team", "proj1_lead"]
496
497for node in all_nodes:
498    depth = 0
499    n = node
500    while n.parent:
501        depth += 1
502        n = n.parent
503    indent = "  " * depth
504    inherit_mark = "" if node.inherit else " [NO INHERIT]"
505    print(f"    {indent}{node.name}{inherit_mark}")
506    if node.acl:
507        for user, perms in node.acl.items():
508            print(f"    {indent}  -> {user}: {sorted(perms)}")
509
510print()
511
512# Test effective permissions
513print("  Effective Permission Resolution:")
514test_cases = [
515    ("admin", company, "Admin at /company"),
516    ("admin", proj1, "Admin at /company/engineering/proj1 (inherited)"),
517    ("admin", secret_file, "Admin at secret.md (explicit override)"),
518    ("eng_team", engineering, "eng_team at /engineering"),
519    ("eng_team", proj1, "eng_team at /proj1 (inherited from engineering)"),
520    ("eng_team", secret_file, "eng_team at secret.md (no inherit)"),
521    ("proj1_lead", proj1, "proj1_lead at /proj1"),
522    ("proj1_lead", secret_file, "proj1_lead at secret.md (read only)"),
523    ("ceo", marketing, "CEO at /marketing (inherited from company)"),
524]
525
526for user, node, desc in test_cases:
527    perms = node.get_effective_permissions(user)
528    perm_str = sorted(perms) if perms else "NONE"
529    print(f"    {desc}")
530    print(f"      -> {perm_str}")
531print()
532
533
534# ============================================================
535# Section 5: Summary
536# ============================================================
537
538print("=" * 65)
539print("  Summary: Authorization Models Comparison")
540print("=" * 65)
541print("""
542  Model | Complexity | Flexibility | Best For
543  ------+------------+-------------+----------------------------
544  RBAC  | Low        | Medium      | Most applications
545  ABAC  | High       | Very High   | Complex enterprises, gov
546  ACL   | Medium     | Medium      | File systems, documents
547  ReBAC | Medium     | High        | Social networks, sharing
548
549  RBAC:  Simple, role-centric. Good default choice.
550  ABAC:  Policy-based, context-aware. Maximum flexibility.
551  ACL:   Per-resource control. Good for file/document systems.
552
553  Key Principles:
554  - Principle of Least Privilege: grant minimum needed access
555  - Separation of Duties: split critical tasks across roles
556  - Defense in Depth: layer authorization checks
557  - Deny by Default: require explicit allow
558  - Audit everything: log all access decisions
559""")