06. Authorization and Access Control
06. Authorization and Access Control¶
Previous: 05. Authentication Systems | Next: 07. OWASP Top 10 (2021)
Authorization determines what an authenticated user is allowed to do. While authentication answers "Who are you?", authorization answers "What can you do?" A robust authorization system enforces the principle of least privilege, ensuring that users and services have only the minimum permissions necessary. This lesson covers the major access control models (RBAC, ABAC, ACL), policy engines, token-based authorization with JWT and OAuth scopes, practical implementation patterns in Python/Flask, and common authorization vulnerabilities.
Learning Objectives¶
- Distinguish between authentication and authorization
- Implement Role-Based Access Control (RBAC) systems
- Understand Attribute-Based Access Control (ABAC) and when to use it
- Work with Access Control Lists (ACLs) for resource-level permissions
- Use policy engines like OPA (Open Policy Agent) for externalized authorization
- Leverage JWT claims and OAuth 2.0 scopes for API authorization
- Build authorization middleware and decorators in Flask
- Identify and prevent common authorization vulnerabilities (IDOR, privilege escalation)
1. Authentication vs Authorization¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Authentication vs Authorization β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Authentication (AuthN) Authorization (AuthZ) β
β βββββββββββββββββββββ ββββββββββββββββββββββ β
β "Who are you?" "What can you do?" β
β β
β Verifies identity Verifies permissions β
β Happens FIRST Happens AFTER authentication β
β 401 Unauthorized 403 Forbidden β
β (not authenticated) (authenticated but not allowed) β
β β
β β
β Request Flow: β
β β
β ββββββββββββ ββββββββββββββββ βββββββββββββββ β
β β Request βββββΆβ AuthenticationβββββΆβAuthorizationβ β
β β β β "Who is this?"β β "Can they β β
β ββββββββββββ β β β do this?" β β
β ββββββββ¬ββββββββ ββββββββ¬βββββββ β
β β β β
β ββββββ΄ββββ ββββββ΄ββββ β
β β β β β β
β Valid Invalid Allowed Denied β
β β β β β β
β β 401 Error β 403 Error β
β β β β
β βββββββββ¬ββββββββββββ β
β β β
β βΌ β
β ββββββββββββ β
β β Resource β β
β β Access β β
β ββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2. Role-Based Access Control (RBAC)¶
2.1 RBAC Concepts¶
RBAC assigns permissions to roles, and users are assigned to roles. This simplifies management because you manage role-permission mappings rather than individual user permissions.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RBAC Model β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Users Roles Permissions β
β ββββββββββ ββββββββββββ ββββββββββββββββββββ β
β β Alice βββββββββΆβ Admin ββββββββΆβ create_user β β
β ββββββββββ β ββββββββΆβ delete_user β β
β ββββββββββ β ββββββββΆβ view_reports β β
β β Bob βββββ βββββββββββββββββββΆβ manage_settings β β
β ββββββββββ β ββββββββββββ ββββββββββββββββββββ β
β βββββΆβ Editor β ββββββββββββββββββββ β
β ββββββββββ β ββββββββΆβ create_post β β
β β Carol βββββββββΆβ ββββββββΆβ edit_post β β
β ββββββββββ βββββββββββββββββββΆβ delete_own_post β β
β ββββββββββ ββββββββββββ ββββββββββββββββββββ β
β β Dan βββββββββΆβ Viewer β ββββββββββββββββββββ β
β ββββββββββ β ββββββββΆβ view_post β β
β βββββββββββββββββββΆβ view_reports β β
β ββββββββββββββββββββ β
β β
β Hierarchy (optional): β
β Admin βββΆ Editor βββΆ Viewer β
β (Admin inherits all Editor and Viewer permissions) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2.2 RBAC Implementation in Python¶
"""
rbac.py - Role-Based Access Control implementation
"""
from dataclasses import dataclass, field
from typing import Set, Dict, Optional
from enum import Enum
from functools import wraps
# ==============================================================
# Core RBAC Model
# ==============================================================
class Permission(str, Enum):
"""Define all permissions in the system."""
# User management
CREATE_USER = "user:create"
READ_USER = "user:read"
UPDATE_USER = "user:update"
DELETE_USER = "user:delete"
# Post management
CREATE_POST = "post:create"
READ_POST = "post:read"
UPDATE_POST = "post:update"
DELETE_POST = "post:delete"
PUBLISH_POST = "post:publish"
# Report management
VIEW_REPORTS = "report:view"
EXPORT_REPORTS = "report:export"
# System
MANAGE_SETTINGS = "system:settings"
VIEW_AUDIT_LOG = "system:audit"
@dataclass
class Role:
"""A role is a named collection of permissions."""
name: str
permissions: Set[Permission] = field(default_factory=set)
parent: Optional['Role'] = None # For role hierarchy
def get_all_permissions(self) -> Set[Permission]:
"""Get all permissions including inherited ones."""
perms = set(self.permissions)
if self.parent:
perms |= self.parent.get_all_permissions()
return perms
def has_permission(self, permission: Permission) -> bool:
"""Check if this role has a specific permission."""
return permission in self.get_all_permissions()
@dataclass
class User:
"""A user with one or more roles."""
id: int
username: str
roles: Set[Role] = field(default_factory=set)
def has_permission(self, permission: Permission) -> bool:
"""Check if user has a specific permission through any role."""
return any(role.has_permission(permission) for role in self.roles)
def has_role(self, role_name: str) -> bool:
"""Check if user has a specific role."""
return any(role.name == role_name for role in self.roles)
def get_all_permissions(self) -> Set[Permission]:
"""Get all permissions from all roles."""
perms = set()
for role in self.roles:
perms |= role.get_all_permissions()
return perms
# ==============================================================
# Role Definitions
# ==============================================================
def create_default_roles() -> Dict[str, Role]:
"""Create the default role hierarchy."""
# Base role - everyone gets these
viewer = Role(
name="viewer",
permissions={
Permission.READ_POST,
Permission.READ_USER,
Permission.VIEW_REPORTS,
}
)
# Editor inherits from Viewer
editor = Role(
name="editor",
permissions={
Permission.CREATE_POST,
Permission.UPDATE_POST,
Permission.DELETE_POST,
Permission.PUBLISH_POST,
Permission.EXPORT_REPORTS,
},
parent=viewer
)
# Admin inherits from Editor
admin = Role(
name="admin",
permissions={
Permission.CREATE_USER,
Permission.UPDATE_USER,
Permission.DELETE_USER,
Permission.MANAGE_SETTINGS,
Permission.VIEW_AUDIT_LOG,
},
parent=editor
)
return {
"viewer": viewer,
"editor": editor,
"admin": admin,
}
# ==============================================================
# RBAC Manager
# ==============================================================
class RBACManager:
"""Centralized RBAC management."""
def __init__(self):
self.roles = create_default_roles()
self.users: Dict[int, User] = {}
def create_user(self, user_id: int, username: str,
role_names: list = None) -> User:
"""Create a user with specified roles."""
roles = set()
for name in (role_names or ["viewer"]):
if name in self.roles:
roles.add(self.roles[name])
else:
raise ValueError(f"Unknown role: {name}")
user = User(id=user_id, username=username, roles=roles)
self.users[user_id] = user
return user
def assign_role(self, user_id: int, role_name: str):
"""Assign a role to a user."""
if user_id not in self.users:
raise ValueError(f"User {user_id} not found")
if role_name not in self.roles:
raise ValueError(f"Role {role_name} not found")
self.users[user_id].roles.add(self.roles[role_name])
def revoke_role(self, user_id: int, role_name: str):
"""Remove a role from a user."""
if user_id not in self.users:
raise ValueError(f"User {user_id} not found")
self.users[user_id].roles = {
r for r in self.users[user_id].roles
if r.name != role_name
}
def check_access(self, user_id: int, permission: Permission) -> bool:
"""Check if a user has a specific permission."""
user = self.users.get(user_id)
if not user:
return False
return user.has_permission(permission)
# ==============================================================
# Demo
# ==============================================================
if __name__ == "__main__":
rbac = RBACManager()
# Create users
alice = rbac.create_user(1, "alice", ["admin"])
bob = rbac.create_user(2, "bob", ["editor"])
carol = rbac.create_user(3, "carol", ["viewer"])
# Check permissions
print("=== Permission Checks ===")
checks = [
(alice, Permission.DELETE_USER, "Alice can delete users"),
(alice, Permission.READ_POST, "Alice can read posts (inherited)"),
(bob, Permission.CREATE_POST, "Bob can create posts"),
(bob, Permission.DELETE_USER, "Bob can delete users"),
(carol, Permission.READ_POST, "Carol can read posts"),
(carol, Permission.CREATE_POST, "Carol can create posts"),
]
for user, perm, description in checks:
result = user.has_permission(perm)
status = "ALLOWED" if result else "DENIED"
print(f" [{status}] {description}")
# List all permissions
print(f"\nAlice's permissions: {len(alice.get_all_permissions())}")
for p in sorted(alice.get_all_permissions(), key=lambda x: x.value):
print(f" - {p.value}")
2.3 Flask RBAC Middleware¶
"""
flask_rbac.py - RBAC authorization middleware for Flask
"""
from flask import Flask, request, jsonify, g
from functools import wraps
import jwt
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key' # Use env var in production
# ==============================================================
# Authorization Decorators
# ==============================================================
def require_auth(f):
"""Decorator: Require authenticated user."""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return jsonify({"error": "Authentication required"}), 401
try:
payload = jwt.decode(
token,
app.config['SECRET_KEY'],
algorithms=['HS256']
)
g.current_user = {
'id': payload['sub'],
'roles': payload.get('roles', []),
'permissions': payload.get('permissions', []),
}
except jwt.InvalidTokenError as e:
return jsonify({"error": f"Invalid token: {str(e)}"}), 401
return f(*args, **kwargs)
return decorated
def require_role(*allowed_roles):
"""Decorator: Require user to have one of the specified roles."""
def decorator(f):
@wraps(f)
@require_auth
def decorated(*args, **kwargs):
user_roles = set(g.current_user.get('roles', []))
if not user_roles.intersection(set(allowed_roles)):
return jsonify({
"error": "Forbidden",
"detail": f"Required roles: {allowed_roles}"
}), 403
return f(*args, **kwargs)
return decorated
return decorator
def require_permission(*required_permissions):
"""Decorator: Require user to have all specified permissions."""
def decorator(f):
@wraps(f)
@require_auth
def decorated(*args, **kwargs):
user_perms = set(g.current_user.get('permissions', []))
missing = set(required_permissions) - user_perms
if missing:
return jsonify({
"error": "Forbidden",
"detail": f"Missing permissions: {list(missing)}"
}), 403
return f(*args, **kwargs)
return decorated
return decorator
# ==============================================================
# Protected Routes
# ==============================================================
@app.route('/api/posts', methods=['GET'])
@require_auth
def list_posts():
"""Any authenticated user can list posts."""
return jsonify({"posts": []})
@app.route('/api/posts', methods=['POST'])
@require_permission('post:create')
def create_post():
"""Only users with post:create permission."""
data = request.json
# Create post logic...
return jsonify({"message": "Post created"}), 201
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
@require_permission('post:delete')
def delete_post(post_id):
"""Only users with post:delete permission."""
# Additional check: can only delete own posts unless admin
post = get_post(post_id) # Your DB lookup
if not post:
return jsonify({"error": "Not found"}), 404
user = g.current_user
is_admin = 'admin' in user.get('roles', [])
is_owner = post['author_id'] == user['id']
if not is_admin and not is_owner:
return jsonify({"error": "Can only delete your own posts"}), 403
# Delete post logic...
return jsonify({"message": "Post deleted"})
@app.route('/api/admin/users', methods=['GET'])
@require_role('admin')
def list_users():
"""Admin-only endpoint."""
return jsonify({"users": []})
@app.route('/api/admin/settings', methods=['PUT'])
@require_role('admin')
def update_settings():
"""Admin-only: modify system settings."""
data = request.json
# Update settings logic...
return jsonify({"message": "Settings updated"})
# ==============================================================
# Dynamic Permission Check (for resource-level auth)
# ==============================================================
def check_resource_access(user_id: int, resource_type: str,
resource_id: int, action: str) -> bool:
"""
Check if a user can perform an action on a specific resource.
This goes beyond role-based checks to resource-level authorization.
"""
# Example: Check if user owns the resource or has admin role
user = get_user(user_id)
resource = get_resource(resource_type, resource_id)
if not user or not resource:
return False
# Admin can do anything
if 'admin' in user.get('roles', []):
return True
# Owner can read/update their own resources
if resource.get('owner_id') == user_id:
if action in ('read', 'update', 'delete'):
return True
# Check shared access
shared_with = resource.get('shared_with', [])
for share in shared_with:
if share['user_id'] == user_id:
if action in share.get('allowed_actions', []):
return True
return False
3. Attribute-Based Access Control (ABAC)¶
3.1 ABAC Concepts¶
ABAC makes access decisions based on attributes of the subject (user), resource, action, and environment. It is more flexible than RBAC but also more complex.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ABAC Model β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Access Decision = β
β f(Subject Attributes, Resource Attributes, β
β Action Attributes, Environment Attributes) β
β β
β Subject Attributes: Resource Attributes: β
β βββ role: "doctor" βββ type: "medical_record" β
β βββ department: "ER" βββ department: "ER" β
β βββ clearance: "L3" βββ sensitivity: "L2" β
β βββ certification: true βββ owner: "patient_123" β
β β
β Action Attributes: Environment Attributes: β
β βββ type: "read" βββ time: "14:30 UTC" β
β βββ purpose: "treatment"βββ ip_address: "10.0.1.50" β
β βββ location: "hospital_network" β
β βββ device_trust: "managed" β
β β
β Policy Example: β
β "A doctor in the ER department can read medical records β
β in the ER department during work hours from hospital β
β network on a managed device." β
β β
β Subject.role == "doctor" AND β
β Subject.department == Resource.department AND β
β Action.type == "read" AND β
β Environment.time BETWEEN "07:00" AND "19:00" AND β
β Environment.location == "hospital_network" β
β βββΆ ALLOW β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3.2 When to Use RBAC vs ABAC¶
| Criteria | RBAC | ABAC |
|---|---|---|
| Number of roles | Small, well-defined | Many or dynamic |
| Access decisions | Role membership | Multiple attributes |
| Complexity | Simple to implement | Complex but flexible |
| "Role explosion" | Risk (too many roles) | Not an issue |
| Context-aware | No (static roles) | Yes (time, location, etc.) |
| Compliance | Basic needs | Regulatory requirements |
| Best for | Most web apps, APIs | Healthcare, finance, government |
| Performance | Fast (role lookup) | Slower (policy evaluation) |
3.3 ABAC Implementation¶
"""
abac.py - Attribute-Based Access Control implementation
"""
from dataclasses import dataclass, field
from datetime import datetime, time
from typing import Any, Dict, List, Callable
from enum import Enum
class Decision(Enum):
ALLOW = "allow"
DENY = "deny"
NOT_APPLICABLE = "not_applicable"
@dataclass
class AccessRequest:
"""Represents a request for access to a resource."""
# Subject attributes (who)
subject: Dict[str, Any]
# Resource attributes (what)
resource: Dict[str, Any]
# Action attributes (how)
action: Dict[str, Any]
# Environment attributes (context)
environment: Dict[str, Any] = field(default_factory=dict)
@dataclass
class Policy:
"""A single ABAC policy."""
name: str
description: str
priority: int # Lower = higher priority
effect: Decision # ALLOW or DENY
condition: Callable[[AccessRequest], bool]
def evaluate(self, request: AccessRequest) -> Decision:
"""Evaluate this policy against a request."""
try:
if self.condition(request):
return self.effect
return Decision.NOT_APPLICABLE
except (KeyError, TypeError):
return Decision.NOT_APPLICABLE
class PolicyEngine:
"""Evaluates access requests against a set of policies."""
def __init__(self, default_decision: Decision = Decision.DENY):
self.policies: List[Policy] = []
self.default_decision = default_decision
def add_policy(self, policy: Policy):
"""Add a policy to the engine."""
self.policies.append(policy)
# Keep sorted by priority
self.policies.sort(key=lambda p: p.priority)
def evaluate(self, request: AccessRequest) -> Decision:
"""
Evaluate all policies. Uses deny-overrides combining algorithm:
- If any policy says DENY, result is DENY
- If at least one says ALLOW and none say DENY, result is ALLOW
- Otherwise, use default decision
"""
has_allow = False
for policy in self.policies:
decision = policy.evaluate(request)
if decision == Decision.DENY:
return Decision.DENY # Deny overrides
if decision == Decision.ALLOW:
has_allow = True
return Decision.ALLOW if has_allow else self.default_decision
# ==============================================================
# Example Policies
# ==============================================================
def create_medical_policies() -> PolicyEngine:
"""Create policies for a healthcare system."""
engine = PolicyEngine(default_decision=Decision.DENY)
# Policy 1: Doctors can read patient records in their department
engine.add_policy(Policy(
name="doctor_read_department_records",
description="Doctors can read records in their department",
priority=10,
effect=Decision.ALLOW,
condition=lambda req: (
req.subject.get("role") == "doctor" and
req.action.get("type") == "read" and
req.resource.get("type") == "medical_record" and
req.subject.get("department") == req.resource.get("department")
)
))
# Policy 2: Doctors can write records for their patients
engine.add_policy(Policy(
name="doctor_write_own_patients",
description="Doctors can update records of patients assigned to them",
priority=10,
effect=Decision.ALLOW,
condition=lambda req: (
req.subject.get("role") == "doctor" and
req.action.get("type") in ("write", "update") and
req.resource.get("type") == "medical_record" and
req.subject.get("id") in req.resource.get("assigned_doctors", [])
)
))
# Policy 3: Nurses can read records in their department during shifts
engine.add_policy(Policy(
name="nurse_read_during_shift",
description="Nurses can read records during their shift hours",
priority=20,
effect=Decision.ALLOW,
condition=lambda req: (
req.subject.get("role") == "nurse" and
req.action.get("type") == "read" and
req.resource.get("type") == "medical_record" and
req.subject.get("department") == req.resource.get("department") and
_is_during_shift(req.environment.get("time"),
req.subject.get("shift_start"),
req.subject.get("shift_end"))
)
))
# Policy 4: No access from untrusted devices
engine.add_policy(Policy(
name="deny_untrusted_devices",
description="Deny access from non-managed devices",
priority=1, # High priority - evaluated first
effect=Decision.DENY,
condition=lambda req: (
req.resource.get("sensitivity", "low") in ("high", "critical") and
req.environment.get("device_trust") != "managed"
)
))
# Policy 5: Emergency override - on-duty doctors in ER
engine.add_policy(Policy(
name="emergency_override",
description="ER doctors can access any record during emergency",
priority=5,
effect=Decision.ALLOW,
condition=lambda req: (
req.subject.get("role") == "doctor" and
req.subject.get("department") == "emergency" and
req.action.get("type") == "read" and
req.environment.get("emergency_mode") is True
)
))
return engine
def _is_during_shift(current_time, shift_start, shift_end):
"""Check if current time is within shift hours."""
if not all([current_time, shift_start, shift_end]):
return False
if isinstance(current_time, str):
current_time = datetime.fromisoformat(current_time).time()
shift_start = time.fromisoformat(shift_start)
shift_end = time.fromisoformat(shift_end)
if shift_start <= shift_end:
return shift_start <= current_time <= shift_end
else: # Overnight shift
return current_time >= shift_start or current_time <= shift_end
# ==============================================================
# Demo
# ==============================================================
if __name__ == "__main__":
engine = create_medical_policies()
# Test Case 1: Doctor reading records in their department
request1 = AccessRequest(
subject={"id": "dr_smith", "role": "doctor", "department": "cardiology"},
resource={"type": "medical_record", "department": "cardiology",
"sensitivity": "high"},
action={"type": "read"},
environment={"device_trust": "managed", "time": "2024-01-15T10:30:00"}
)
print(f"Doctor reads own dept: {engine.evaluate(request1).value}")
# Test Case 2: Doctor reading records in different department
request2 = AccessRequest(
subject={"id": "dr_smith", "role": "doctor", "department": "cardiology"},
resource={"type": "medical_record", "department": "neurology",
"sensitivity": "high"},
action={"type": "read"},
environment={"device_trust": "managed"}
)
print(f"Doctor reads other dept: {engine.evaluate(request2).value}")
# Test Case 3: Access from untrusted device (denied regardless)
request3 = AccessRequest(
subject={"id": "dr_smith", "role": "doctor", "department": "cardiology"},
resource={"type": "medical_record", "department": "cardiology",
"sensitivity": "high"},
action={"type": "read"},
environment={"device_trust": "personal"} # Not managed!
)
print(f"Untrusted device: {engine.evaluate(request3).value}")
# Test Case 4: Emergency override
request4 = AccessRequest(
subject={"id": "dr_jones", "role": "doctor", "department": "emergency"},
resource={"type": "medical_record", "department": "cardiology",
"sensitivity": "critical"},
action={"type": "read"},
environment={"device_trust": "managed", "emergency_mode": True}
)
print(f"Emergency override: {engine.evaluate(request4).value}")
4. Access Control Lists (ACL)¶
4.1 ACL Concepts¶
ACLs define permissions at the individual resource level. Each resource has a list of entries specifying which subjects (users, groups) can perform which actions.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ACL Model β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Resource: "Project Plan.docx" β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ACL Entry β Permissions β β
β βββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ€ β
β β alice (owner) β read, write, delete, share β β
β β bob β read, write β β
β β carol β read β β
β β engineering (group) β read, comment β β
β β * (everyone) β (no access) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Resource: "Company Financials.xlsx" β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ACL Entry β Permissions β β
β βββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ€ β
β β alice (owner) β read, write, delete, share β β
β β finance (group) β read, write β β
β β ceo β read β β
β β * (everyone) β (no access) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Unix File Permissions (simplified ACL): β
β -rwxr-xr-- owner group file.txt β
β βββ βββ βββ β
β βββ βββ ββ΄β΄ββ Others: read only β
β βββ ββ΄β΄ββββββ Group: read + execute β
β ββ΄β΄ββββββββββ Owner: read + write + execute β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
4.2 ACL Implementation¶
"""
acl.py - Access Control List implementation for resource-level permissions
"""
from dataclasses import dataclass, field
from typing import Set, Dict, Optional, List
from enum import Flag, auto
class ACLPermission(Flag):
"""Permission flags (combinable with bitwise OR)."""
NONE = 0
READ = auto()
WRITE = auto()
DELETE = auto()
SHARE = auto()
ADMIN = READ | WRITE | DELETE | SHARE # All permissions combined
@dataclass
class ACLEntry:
"""A single ACL entry mapping a principal to permissions."""
principal_type: str # "user" or "group"
principal_id: str
permissions: ACLPermission
@dataclass
class Resource:
"""A resource with an access control list."""
id: str
name: str
owner_id: str
acl: List[ACLEntry] = field(default_factory=list)
def grant(self, principal_type: str, principal_id: str,
permissions: ACLPermission):
"""Grant permissions to a principal."""
# Check if entry already exists
for entry in self.acl:
if (entry.principal_type == principal_type and
entry.principal_id == principal_id):
entry.permissions |= permissions # Add permissions
return
# Create new entry
self.acl.append(ACLEntry(
principal_type=principal_type,
principal_id=principal_id,
permissions=permissions,
))
def revoke(self, principal_type: str, principal_id: str,
permissions: ACLPermission = None):
"""Revoke permissions (or all access) from a principal."""
if permissions is None:
# Remove entire entry
self.acl = [
e for e in self.acl
if not (e.principal_type == principal_type and
e.principal_id == principal_id)
]
else:
for entry in self.acl:
if (entry.principal_type == principal_type and
entry.principal_id == principal_id):
entry.permissions &= ~permissions # Remove specific perms
def check_access(self, user_id: str, user_groups: Set[str],
required: ACLPermission) -> bool:
"""Check if a user has the required permissions."""
# Owner always has full access
if user_id == self.owner_id:
return True
effective_perms = ACLPermission.NONE
for entry in self.acl:
if entry.principal_type == "user" and entry.principal_id == user_id:
effective_perms |= entry.permissions
elif (entry.principal_type == "group" and
entry.principal_id in user_groups):
effective_perms |= entry.permissions
return bool(effective_perms & required)
def get_effective_permissions(self, user_id: str,
user_groups: Set[str]) -> ACLPermission:
"""Get all effective permissions for a user."""
if user_id == self.owner_id:
return ACLPermission.ADMIN
effective = ACLPermission.NONE
for entry in self.acl:
if entry.principal_type == "user" and entry.principal_id == user_id:
effective |= entry.permissions
elif (entry.principal_type == "group" and
entry.principal_id in user_groups):
effective |= entry.permissions
return effective
# ==============================================================
# ACL Manager with Database Backend (simplified)
# ==============================================================
class ACLManager:
"""Manage ACLs across multiple resources."""
def __init__(self):
self.resources: Dict[str, Resource] = {}
def create_resource(self, resource_id: str, name: str,
owner_id: str) -> Resource:
"""Create a new resource."""
resource = Resource(id=resource_id, name=name, owner_id=owner_id)
self.resources[resource_id] = resource
return resource
def check(self, user_id: str, user_groups: Set[str],
resource_id: str, permission: ACLPermission) -> bool:
"""Check access to a resource."""
resource = self.resources.get(resource_id)
if not resource:
return False
return resource.check_access(user_id, user_groups, permission)
def share(self, resource_id: str, requesting_user_id: str,
target_type: str, target_id: str,
permissions: ACLPermission) -> bool:
"""Share a resource (only owner or users with SHARE can do this)."""
resource = self.resources.get(resource_id)
if not resource:
return False
# Check if requesting user can share
if requesting_user_id != resource.owner_id:
if not resource.check_access(
requesting_user_id, set(), ACLPermission.SHARE
):
return False
resource.grant(target_type, target_id, permissions)
return True
# ==============================================================
# Demo
# ==============================================================
if __name__ == "__main__":
manager = ACLManager()
# Create a document
doc = manager.create_resource("doc1", "Project Plan.docx", owner_id="alice")
# Share with bob (read + write)
manager.share("doc1", "alice", "user", "bob",
ACLPermission.READ | ACLPermission.WRITE)
# Share with engineering group (read only)
manager.share("doc1", "alice", "group", "engineering", ACLPermission.READ)
# Check access
print("=== ACL Checks ===")
print(f"Alice (owner) read: {manager.check('alice', set(), 'doc1', ACLPermission.READ)}")
print(f"Alice (owner) delete: {manager.check('alice', set(), 'doc1', ACLPermission.DELETE)}")
print(f"Bob read: {manager.check('bob', set(), 'doc1', ACLPermission.READ)}")
print(f"Bob write: {manager.check('bob', set(), 'doc1', ACLPermission.WRITE)}")
print(f"Bob delete: {manager.check('bob', set(), 'doc1', ACLPermission.DELETE)}")
# Carol is in engineering group
print(f"Carol (eng) read: {manager.check('carol', {'engineering'}, 'doc1', ACLPermission.READ)}")
print(f"Carol (eng) write: {manager.check('carol', {'engineering'}, 'doc1', ACLPermission.WRITE)}")
# Dan has no access
print(f"Dan read: {manager.check('dan', set(), 'doc1', ACLPermission.READ)}")
5. Policy Engines: OPA (Open Policy Agent)¶
5.1 What is OPA?¶
Open Policy Agent (OPA) is a general-purpose policy engine that decouples policy decision-making from application logic. Policies are written in Rego, a declarative query language.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β OPA Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ β
β β Application ββββββββΆβ OPA β β
β β (Python, β Query β ββββββββ β β
β β Java, ββββββββββββΆβ Rego β β β
β β Go...) β β βPolicyβ β β
β β βββββββββ ββββββββ β β
β β βDecisionβ ββββββββ β β
β ββββββββββββββββ β β Data β β β
β β β(JSON)β β β
β β ββββββββ β β
β ββββββββββββββββ β
β β
β Deployment Options: β
β 1. Library (embedded in app) β
β 2. Sidecar (daemon alongside app) β
β 3. Standalone service (centralized) β
β β
β Key Benefits: β
β - Policy as Code (version controlled, tested, audited) β
β - Language-agnostic (any app can query OPA) β
β - Separation of concerns (dev writes logic, security writes β
β policy) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
5.2 Rego Policy Language¶
# authz.rego - Example OPA policy for API authorization
package authz
import rego.v1
# Default deny all requests
default allow := false
# Admin users can do anything
allow if {
input.user.roles[_] == "admin"
}
# Users can read their own profile
allow if {
input.action == "read"
input.resource.type == "profile"
input.resource.owner == input.user.id
}
# Editors can create and update posts
allow if {
input.user.roles[_] == "editor"
input.action in {"create", "update"}
input.resource.type == "post"
}
# Users can delete their own posts
allow if {
input.action == "delete"
input.resource.type == "post"
input.resource.owner == input.user.id
}
# Time-based access: deny outside business hours for sensitive data
deny if {
input.resource.sensitivity == "high"
not is_business_hours
}
is_business_hours if {
hour := time.clock(time.now_ns())[0]
hour >= 8
hour < 18
}
# Final decision (deny overrides allow)
decision := "allow" if {
allow
not deny
}
decision := "deny" if {
not allow
}
decision := "deny" if {
deny
}
5.3 Integrating OPA with Python¶
"""
opa_integration.py - Integrating OPA with a Python/Flask application
pip install requests
"""
import requests
import json
from flask import Flask, request, jsonify, g
from functools import wraps
app = Flask(__name__)
# OPA runs as a sidecar or standalone service
OPA_URL = "http://localhost:8181/v1/data/authz/decision"
def check_opa_policy(user: dict, action: str, resource: dict) -> bool:
"""Query OPA for an authorization decision."""
opa_input = {
"input": {
"user": user,
"action": action,
"resource": resource,
}
}
try:
response = requests.post(
OPA_URL,
json=opa_input,
timeout=1, # Fail fast
)
response.raise_for_status()
result = response.json()
return result.get("result") == "allow"
except requests.RequestException as e:
app.logger.error(f"OPA query failed: {e}")
return False # Fail closed (deny on error)
def require_opa(action: str, resource_fn=None):
"""
Decorator that checks OPA policy before allowing access.
resource_fn: function that builds resource dict from request context
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
user = g.get('current_user')
if not user:
return jsonify({"error": "Not authenticated"}), 401
# Build resource context
if resource_fn:
resource = resource_fn(request, **kwargs)
else:
resource = {"type": "unknown"}
# Query OPA
if not check_opa_policy(user, action, resource):
return jsonify({"error": "Access denied by policy"}), 403
return f(*args, **kwargs)
return decorated
return decorator
# Example resource builders
def post_resource(req, post_id=None, **kwargs):
"""Build resource dict for post endpoints."""
if post_id:
post = get_post(post_id) # Your DB lookup
return {
"type": "post",
"id": post_id,
"owner": post.get("author_id") if post else None,
"sensitivity": "normal",
}
return {"type": "post"}
# Protected routes using OPA
@app.route('/api/posts', methods=['POST'])
@require_opa("create", lambda req, **kw: {"type": "post"})
def create_post():
return jsonify({"message": "Post created"}), 201
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
@require_opa("delete", post_resource)
def delete_post(post_id):
return jsonify({"message": "Post deleted"})
6. JWT Claims for Authorization¶
6.1 Standard and Custom Claims¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β JWT Claims for Authorization β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Standard (Registered) Claims: β
β ββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ β
β β Claim β Purpose β β
β ββββββββββββββΌβββββββββββββββββββββββββββββββββββββββββββ€ β
β β sub β Subject (user ID) β β
β β iss β Issuer (who created the token) β β
β β aud β Audience (who should accept the token) β β
β β exp β Expiration time β β
β β iat β Issued at time β β
β β nbf β Not before time β β
β β jti β JWT ID (unique identifier) β β
β ββββββββββββββ΄βββββββββββββββββββββββββββββββββββββββββββ β
β β
β Custom Authorization Claims: β
β { β
β "sub": "user_123", β
β "roles": ["editor", "reviewer"], β
β "permissions": ["post:create", "post:update", "post:read"], β
β "org_id": "org_456", β
β "department": "engineering", β
β "tier": "premium", β
β "scope": "read write" β
β } β
β β
β WARNING: Keep JWT payload small! β
β - JWT is sent with every request (in Authorization header) β
β - Large payloads increase bandwidth and latency β
β - Put minimal auth info in JWT, fetch details from DB/cache β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
6.2 Claim-Based Authorization Middleware¶
"""
jwt_authz.py - JWT claim-based authorization
"""
from flask import Flask, request, jsonify, g
from functools import wraps
import jwt
app = Flask(__name__)
JWT_SECRET = "your-256-bit-secret" # Use env var in production
def decode_jwt(token: str) -> dict:
"""Decode and validate JWT."""
return jwt.decode(
token,
JWT_SECRET,
algorithms=["HS256"],
options={"require": ["exp", "sub", "iss"]}
)
def require_claims(**required_claims):
"""
Decorator that checks JWT claims match requirements.
Usage:
@require_claims(roles=["admin"], tier="premium")
@require_claims(permissions=["post:create"])
@require_claims(org_id=lambda v: v == g.resource_org_id)
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Extract token
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({"error": "Missing token"}), 401
try:
token = auth_header.split(' ', 1)[1]
claims = decode_jwt(token)
g.claims = claims
except jwt.InvalidTokenError as e:
return jsonify({"error": str(e)}), 401
# Check each required claim
for claim_name, expected in required_claims.items():
actual = claims.get(claim_name)
# Callable check (custom validation function)
if callable(expected):
if not expected(actual):
return jsonify({
"error": f"Claim '{claim_name}' validation failed"
}), 403
# List check (user must have at least one matching value)
elif isinstance(expected, list):
user_values = actual if isinstance(actual, list) else [actual]
if not set(expected).intersection(set(user_values)):
return jsonify({
"error": f"Required claim '{claim_name}': {expected}"
}), 403
# Direct value check
else:
if actual != expected:
return jsonify({
"error": f"Claim '{claim_name}' mismatch"
}), 403
return f(*args, **kwargs)
return decorated
return decorator
# Route examples
@app.route('/api/admin/dashboard')
@require_claims(roles=["admin"])
def admin_dashboard():
return jsonify({"message": "Welcome, admin"})
@app.route('/api/premium/features')
@require_claims(tier="premium")
def premium_features():
return jsonify({"features": ["advanced_analytics", "api_access"]})
@app.route('/api/posts', methods=['POST'])
@require_claims(permissions=["post:create"])
def create_post():
return jsonify({"message": "Post created"})
@app.route('/api/org/<org_id>/data')
@require_claims(org_id=lambda v: v == request.view_args.get('org_id'))
def org_data(org_id):
"""Only users belonging to this org can access."""
return jsonify({"org_id": org_id, "data": []})
7. OAuth 2.0 Scopes¶
7.1 Understanding Scopes¶
OAuth 2.0 scopes limit what an access token can do. They represent the permissions granted by the user to the client application.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β OAuth 2.0 Scopes β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Consent Screen: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β "MyApp" wants to access your account: β β
β β β β
β β [x] Read your profile (scope: profile:read) β β
β β [x] Read your emails (scope: email:read) β β
β β [ ] Send emails on your behalf (email:send) β β
β β [ ] Delete your data (data:delete) β β
β β β β
β β [Allow] [Deny] β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Resulting token: β
β { β
β "scope": "profile:read email:read", β
β "client_id": "myapp", β
β "sub": "user_123" β
β } β
β β
β Common Scope Patterns: β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Pattern β Example β β
β ββββββββββββββββββββΌββββββββββββββββββββββββββββββββ€ β
β β resource:action β posts:read, posts:write β β
β β resource.action β user.email, user.profile β β
β β hierarchical β admin (implies all) β β
β β OIDC standard β openid, profile, email β β
β β GitHub style β repo, user, gist β β
β β Google style β drive.readonly, calendar β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
7.2 Scope Enforcement¶
"""
scope_enforcement.py - OAuth scope checking for APIs
"""
from flask import Flask, request, jsonify, g
from functools import wraps
app = Flask(__name__)
def require_scope(*required_scopes):
"""
Decorator to enforce OAuth 2.0 scopes.
Token must contain ALL required scopes.
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Token scopes are typically space-separated
token_scopes = set(g.claims.get('scope', '').split())
required = set(required_scopes)
missing = required - token_scopes
if missing:
return jsonify({
"error": "insufficient_scope",
"error_description": f"Missing scopes: {', '.join(missing)}",
"required_scopes": list(required),
"granted_scopes": list(token_scopes),
}), 403
return f(*args, **kwargs)
return decorated
return decorator
@app.route('/api/user/profile')
@require_scope('profile:read')
def get_profile():
return jsonify({"name": "Alice", "email": "alice@example.com"})
@app.route('/api/user/profile', methods=['PUT'])
@require_scope('profile:read', 'profile:write')
def update_profile():
return jsonify({"message": "Profile updated"})
@app.route('/api/user/data/export')
@require_scope('data:export')
def export_data():
return jsonify({"download_url": "https://..."})
8. Resource-Level Permissions¶
8.1 Beyond Role-Based: Resource Ownership¶
Many applications need to check not just "does the user have the right role" but "does the user have access to this specific resource?"
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Resource-Level Permission Checks β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Level 1: Role Check β
β "Is the user an editor?" β Yes/No β
β β
β Level 2: Resource Ownership β
β "Is the user the owner of this post?" β Yes/No β
β β
β Level 3: Shared Access β
β "Has the post been shared with this user?" β Yes/No β
β β
β Level 4: Organizational Scope β
β "Does the user belong to the same org?" β Yes/No β
β β
β Combined: β
β ββββββββββββββ ββββββββββββββ ββββββββββββββββ β
β β Role check βββββΆβ Resource βββββΆβ Additional β β
β β (editor?) β β ownership β β constraints β β
β ββββββββββββββ β (owner?) β β (same org?) β β
β ββββββββββββββ ββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
8.2 Implementation: Multi-Layer Authorization¶
"""
resource_auth.py - Multi-layer authorization for resources
"""
from flask import Flask, request, jsonify, g, abort
from functools import wraps
from typing import Optional, Callable
app = Flask(__name__)
# ==============================================================
# Resource Authorization Framework
# ==============================================================
class ResourceAuthorizer:
"""
Multi-layer authorization for resources.
Checks are performed in order: role β ownership β sharing β custom.
"""
def __init__(self):
self.custom_checks = {}
def register_check(self, resource_type: str,
check_fn: Callable) -> None:
"""Register a custom authorization check for a resource type."""
if resource_type not in self.custom_checks:
self.custom_checks[resource_type] = []
self.custom_checks[resource_type].append(check_fn)
def authorize(self, user: dict, resource: dict,
action: str) -> bool:
"""
Check if user can perform action on resource.
Returns True if authorized.
"""
resource_type = resource.get('type')
# Layer 1: Super admin bypass
if 'super_admin' in user.get('roles', []):
return True
# Layer 2: Resource ownership
if resource.get('owner_id') == user.get('id'):
return True
# Layer 3: Role-based for the resource type
role_permissions = self._get_role_permissions(
user.get('roles', []), resource_type
)
if action in role_permissions:
return True
# Layer 4: Shared access
shares = resource.get('shares', [])
for share in shares:
if share.get('user_id') == user.get('id'):
if action in share.get('permissions', []):
return True
if share.get('group_id') in user.get('groups', []):
if action in share.get('permissions', []):
return True
# Layer 5: Custom checks
for check_fn in self.custom_checks.get(resource_type, []):
if check_fn(user, resource, action):
return True
return False
def _get_role_permissions(self, roles: list,
resource_type: str) -> set:
"""Get permissions for roles on a resource type."""
# In production, this comes from a database
role_perms = {
'admin': {
'post': {'read', 'create', 'update', 'delete', 'publish'},
'comment': {'read', 'create', 'update', 'delete'},
'user': {'read', 'create', 'update', 'delete'},
},
'editor': {
'post': {'read', 'create', 'update'},
'comment': {'read', 'create', 'update', 'delete'},
},
'viewer': {
'post': {'read'},
'comment': {'read', 'create'},
},
}
perms = set()
for role in roles:
if role in role_perms and resource_type in role_perms[role]:
perms |= role_perms[role][resource_type]
return perms
# Global authorizer instance
authorizer = ResourceAuthorizer()
# ==============================================================
# Authorization Decorator
# ==============================================================
def authorize_resource(action: str, resource_loader: Callable):
"""
Decorator for resource-level authorization.
resource_loader: function that returns the resource dict.
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
user = g.get('current_user')
if not user:
return jsonify({"error": "Not authenticated"}), 401
resource = resource_loader(**kwargs)
if not resource:
return jsonify({"error": "Resource not found"}), 404
if not authorizer.authorize(user, resource, action):
return jsonify({
"error": "Forbidden",
"detail": f"You don't have '{action}' access to this resource"
}), 403
g.resource = resource
return f(*args, **kwargs)
return decorated
return decorator
# Resource loaders
def load_post(post_id: int = None, **kwargs):
"""Load a post from the database."""
# Simulated DB lookup
posts = {
1: {"type": "post", "id": 1, "owner_id": "alice", "org_id": "org1",
"shares": [{"user_id": "bob", "permissions": ["read"]}]},
2: {"type": "post", "id": 2, "owner_id": "bob", "org_id": "org1",
"shares": []},
}
return posts.get(post_id)
# Routes
@app.route('/api/posts/<int:post_id>', methods=['GET'])
@authorize_resource('read', load_post)
def get_post(post_id):
return jsonify(g.resource)
@app.route('/api/posts/<int:post_id>', methods=['PUT'])
@authorize_resource('update', load_post)
def update_post(post_id):
return jsonify({"message": "Updated"})
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
@authorize_resource('delete', load_post)
def delete_post(post_id):
return jsonify({"message": "Deleted"})
9. Common Authorization Vulnerabilities¶
9.1 IDOR (Insecure Direct Object Reference)¶
IDOR occurs when an application exposes internal object references (like database IDs) without verifying that the requesting user is authorized to access them.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β IDOR Vulnerability β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Vulnerable: β
β GET /api/invoices/12345 β
β β Returns invoice 12345 (no ownership check!) β
β β
β Attacker changes ID: β
β GET /api/invoices/12346 β
β β Returns someone else's invoice! π¨ β
β β
β GET /api/users/100/profile β Attacker's profile β
β GET /api/users/101/profile β Another user's profile! β
β GET /api/users/102/profile β Yet another! β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
"""
idor_example.py - IDOR vulnerability and fix
"""
# ============================================================
# VULNERABLE: No authorization check on resource access
# ============================================================
@app.route('/api/invoices/<int:invoice_id>')
@require_auth # Only checks authentication, NOT authorization!
def get_invoice_vulnerable(invoice_id):
# Any authenticated user can access ANY invoice by ID
invoice = db.get_invoice(invoice_id)
if not invoice:
return jsonify({"error": "Not found"}), 404
return jsonify(invoice) # IDOR: No ownership check!
# ============================================================
# FIXED: Verify resource ownership
# ============================================================
@app.route('/api/invoices/<int:invoice_id>')
@require_auth
def get_invoice_secure(invoice_id):
invoice = db.get_invoice(invoice_id)
if not invoice:
return jsonify({"error": "Not found"}), 404
# Check: Does this invoice belong to the current user?
if invoice['user_id'] != g.current_user['id']:
# Return 404 (not 403) to avoid information disclosure
# A 403 tells attacker "the resource exists but you can't access it"
return jsonify({"error": "Not found"}), 404
return jsonify(invoice)
# ============================================================
# BETTER: Use indirect references
# ============================================================
@app.route('/api/my/invoices')
@require_auth
def list_my_invoices():
"""Only return the current user's invoices."""
invoices = db.get_invoices_for_user(g.current_user['id'])
return jsonify({"invoices": invoices})
9.2 Privilege Escalation¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Privilege Escalation Types β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Vertical Escalation (gaining higher privileges): β
β ββββββββββββ ββββββββββββ β
β β Regular β βββββββΆ β Admin β β
β β User β attacks β User β β
β ββββββββββββ ββββββββββββ β
β β
β Example: Modifying JWT role claim β
β Original: {"sub": "user1", "role": "user"} β
β Tampered: {"sub": "user1", "role": "admin"} β
β β
β Horizontal Escalation (accessing other users' data): β
β ββββββββββββ ββββββββββββ β
β β User A β βββββββΆ β User B β β
β β (self) β accessesβ (other) β β
β ββββββββββββ ββββββββββββ β
β β
β Example: Changing user_id parameter β
β Own data: GET /api/users/100/settings β
β Other's data: GET /api/users/101/settings β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
"""
privilege_escalation.py - Privilege escalation vulnerabilities and fixes
"""
# ============================================================
# VULNERABLE: Role in client-controlled input
# ============================================================
@app.route('/api/register', methods=['POST'])
def register_vulnerable():
data = request.json
user = {
'username': data['username'],
'email': data['email'],
'role': data.get('role', 'user'), # Attacker sends role=admin!
}
db.create_user(user)
return jsonify(user), 201
# Attack:
# POST /api/register
# {"username": "attacker", "email": "a@b.com", "role": "admin"}
# ============================================================
# FIXED: Never trust client-supplied role
# ============================================================
@app.route('/api/register', methods=['POST'])
def register_secure():
data = request.json
user = {
'username': data['username'],
'email': data['email'],
'role': 'user', # Always set server-side!
}
db.create_user(user)
return jsonify(user), 201
# ============================================================
# VULNERABLE: Mass assignment (updating fields not intended)
# ============================================================
@app.route('/api/users/<int:user_id>', methods=['PATCH'])
@require_auth
def update_user_vulnerable(user_id):
data = request.json
# Blindly update all provided fields
db.update_user(user_id, **data) # Attacker sends {"role": "admin"}!
return jsonify({"message": "Updated"})
# ============================================================
# FIXED: Whitelist allowed fields
# ============================================================
ALLOWED_UPDATE_FIELDS = {'username', 'email', 'bio', 'avatar_url'}
@app.route('/api/users/<int:user_id>', methods=['PATCH'])
@require_auth
def update_user_secure(user_id):
# Ensure user can only update their own profile
if user_id != g.current_user['id']:
return jsonify({"error": "Forbidden"}), 403
data = request.json
# Only allow whitelisted fields
safe_data = {
k: v for k, v in data.items()
if k in ALLOWED_UPDATE_FIELDS
}
if not safe_data:
return jsonify({"error": "No valid fields to update"}), 400
db.update_user(user_id, **safe_data)
return jsonify({"message": "Updated"})
# ============================================================
# VULNERABLE: Broken function-level access control
# ============================================================
# Admin endpoint with no authorization check
@app.route('/api/admin/delete-user/<int:user_id>', methods=['DELETE'])
@require_auth # Only checks if user is logged in, not if they're admin!
def delete_user_vulnerable(user_id):
db.delete_user(user_id)
return jsonify({"message": "User deleted"})
# ============================================================
# FIXED: Proper role check on admin endpoints
# ============================================================
@app.route('/api/admin/delete-user/<int:user_id>', methods=['DELETE'])
@require_role('admin') # Checks both auth AND admin role
def delete_user_secure(user_id):
# Additional safety: prevent deleting self or other admins
target = db.get_user(user_id)
if not target:
return jsonify({"error": "User not found"}), 404
if target['id'] == g.current_user['id']:
return jsonify({"error": "Cannot delete yourself"}), 400
if 'admin' in target.get('roles', []):
return jsonify({"error": "Cannot delete other admins via API"}), 403
db.delete_user(user_id)
return jsonify({"message": "User deleted"})
9.3 Authorization Vulnerability Checklist¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Authorization Security Checklist β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β [ ] Every endpoint has explicit authorization checks β
β [ ] Authorization is enforced server-side (never client-only) β
β [ ] Direct object references are validated for ownership β
β [ ] Admin functions require admin role verification β
β [ ] Role/permission changes require admin authorization β
β [ ] User input never controls role or permission assignments β
β [ ] API responses don't expose other users' data β
β [ ] Failed authorization returns 403 (or 404 for IDOR) β
β [ ] Authorization logic is centralized (not duplicated) β
β [ ] Mass assignment is prevented (field whitelisting) β
β [ ] Horizontal access is checked (user A can't access B's data)β
β [ ] Authorization decisions are logged for audit β
β [ ] Default-deny policy (deny unless explicitly allowed) β
β [ ] Token-based auth checks scope/claims β
β [ ] Multi-tenant isolation is enforced at data layer β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
10. Exercises¶
Exercise 1: Implement RBAC System¶
Build a complete RBAC system with role hierarchy:
"""
Exercise: Implement a complete RBAC system for a blogging platform.
Requirements:
- Roles: super_admin, admin, moderator, author, reader
- Role hierarchy: super_admin > admin > moderator > author > reader
- Permissions: user:*, post:*, comment:*, settings:*
- Users can have multiple roles
- Implement role assignment with authorization (only admin+ can assign)
"""
class BlogRBAC:
def create_role(self, name: str, permissions: set,
parent: str = None) -> bool:
"""Create a new role with optional parent."""
pass
def assign_role(self, admin_id: int, target_user_id: int,
role: str) -> bool:
"""Assign role (only admins can do this)."""
pass
def check_permission(self, user_id: int,
permission: str) -> bool:
"""Check if user has permission (including inherited)."""
pass
def get_accessible_resources(self, user_id: int,
resource_type: str) -> list:
"""Get all resources a user can access."""
pass
Exercise 2: Build ABAC Policy Engine¶
Create a healthcare ABAC system:
"""
Exercise: Build an ABAC engine for a hospital system.
Policies to implement:
1. Doctors can read patient records in their department
2. Nurses can read records during their shift only
3. Emergency doctors can override department restrictions
4. No one can access records from non-hospital IP addresses
5. Psychiatry records require additional clearance level
6. Research access requires IRB approval attribute
"""
class HospitalABAC:
def add_policy(self, name: str, condition: callable,
effect: str) -> None:
pass
def evaluate(self, subject: dict, resource: dict,
action: dict, context: dict) -> str:
pass
Exercise 3: Fix Authorization Vulnerabilities¶
Find and fix all authorization issues in this code:
"""
Exercise: This API has at least 7 authorization vulnerabilities.
Find and fix ALL of them.
"""
@app.route('/api/users', methods=['GET'])
def list_all_users():
# Issue 1: ???
return jsonify(db.get_all_users())
@app.route('/api/users/<int:id>/password', methods=['PUT'])
@require_auth
def change_password(id):
# Issue 2: ???
new_password = request.json['password']
db.update_password(id, new_password)
return jsonify({"status": "updated"})
@app.route('/api/posts/<int:id>', methods=['DELETE'])
@require_auth
def delete_post(id):
post = db.get_post(id)
# Issue 3: ???
db.delete_post(id)
return jsonify({"status": "deleted"})
@app.route('/api/admin/promote', methods=['POST'])
@require_auth
def promote_user():
# Issue 4: ???
user_id = request.json['user_id']
role = request.json['role'] # Issue 5: ???
db.set_role(user_id, role)
return jsonify({"status": "promoted"})
@app.route('/api/files/<path:filepath>')
@require_auth
def download_file(filepath):
# Issue 6: ???
return send_file(f'/uploads/{filepath}')
@app.route('/api/settings', methods=['PUT'])
@require_auth
def update_settings():
# Issue 7: ???
settings = request.json
db.update_all_settings(settings)
return jsonify({"status": "updated"})
Exercise 4: Multi-Tenant Authorization¶
Design and implement a multi-tenant authorization system:
"""
Exercise: Build authorization for a multi-tenant SaaS application.
Requirements:
- Each tenant (organization) has isolated data
- Users belong to exactly one organization
- Roles are per-organization (admin in org A != admin in org B)
- Cross-tenant access is never allowed
- Super-admins (platform level) can access any tenant
"""
class MultiTenantAuth:
def create_org(self, org_id: str, owner_id: str) -> dict:
pass
def check_tenant_access(self, user_id: str, org_id: str,
resource_id: str, action: str) -> bool:
pass
def ensure_tenant_isolation(self, query: str,
user_org_id: str) -> str:
"""Add tenant filter to database queries."""
pass
Exercise 5: OPA Policy Writing¶
Write Rego policies for these scenarios:
# Exercise: Write Rego policies for:
#
# 1. Users can only access resources in their department
# 2. Managers can access resources in their department and
# departments they manage
# 3. Financial reports require "finance" role AND "senior" level
# 4. API rate limiting: users with "basic" tier limited to 100 req/hour
# 5. Data classification: "top_secret" resources require MFA
# authentication within the last 30 minutes
package exercise
import rego.v1
# Write your policies here:
default allow := false
# Policy 1: Department access
# ...
# Policy 2: Manager cross-department access
# ...
# Policy 3: Financial reports
# ...
# Policy 4: Rate limiting
# ...
# Policy 5: MFA requirement for classified data
# ...
11. Summary¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Authorization and Access Control Summary β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Models: β
β - RBAC: Roles β Permissions. Simple, widely used. β
β - ABAC: Attributes β Policy β Decision. Flexible, context-awareβ
β - ACL: Per-resource permission lists. Fine-grained. β
β - Combine models as needed (RBAC + resource ownership is common)β
β β
β Key Principles: β
β - Least Privilege: Grant minimum necessary permissions β
β - Default Deny: Block everything not explicitly allowed β
β - Separation of Duties: No single role can do everything β
β - Defense in Depth: Check at multiple layers β
β - Centralize: Authorization logic in one place β
β β
β Implementation: β
β - Server-side enforcement (never trust the client) β
β - JWT claims for stateless API authorization β
β - OAuth scopes for third-party access delegation β
β - Policy engines (OPA) for complex or externalized policies β
β - Decorators/middleware for DRY authorization in Flask β
β β
β Common Vulnerabilities: β
β - IDOR: Always validate resource ownership β
β - Privilege escalation: Never trust client-supplied roles β
β - Mass assignment: Whitelist updateable fields β
β - Missing function-level checks: Every endpoint needs auth β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Previous: 05. Authentication Systems | Next: 07. OWASP Top 10 (2021)