06. 인가 및 접근 제어
06. 인가 및 접근 제어¶
이전: 05. 인증 시스템 | 다음: 07. OWASP Top 10 (2021)
인가(Authorization)는 인증된 사용자가 무엇을 할 수 있는지를 결정합니다. 인증이 "당신은 누구입니까?"라는 질문에 답한다면, 인가는 "당신은 무엇을 할 수 있습니까?"라는 질문에 답합니다. 강력한 인가 시스템은 최소 권한의 원칙을 강제하여, 사용자와 서비스가 필요한 최소한의 권한만 가지도록 보장합니다. 이 레슨에서는 주요 접근 제어 모델(RBAC, ABAC, ACL), 정책 엔진, JWT 및 OAuth 스코프를 사용한 토큰 기반 인가, Python/Flask에서의 실용적인 구현 패턴, 그리고 일반적인 인가 취약점을 다룹니다.
학습 목표¶
- 인증과 인가의 차이점 구분
- 역할 기반 접근 제어(RBAC) 시스템 구현
- 속성 기반 접근 제어(ABAC)와 사용 시점 이해
- 리소스 수준 권한을 위한 접근 제어 목록(ACL) 작업
- 외부화된 인가를 위한 OPA(Open Policy Agent)와 같은 정책 엔진 사용
- API 인가를 위한 JWT 클레임 및 OAuth 2.0 스코프 활용
- Flask에서 인가 미들웨어 및 데코레이터 구축
- 일반적인 인가 취약점(IDOR, 권한 상승) 식별 및 방지
1. 인증 vs 인가¶
┌─────────────────────────────────────────────────────────────────┐
│ 인증 vs 인가 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 인증 (AuthN) 인가 (AuthZ) │
│ ───────────────────── ────────────────────── │
│ "당신은 누구입니까?" "무엇을 할 수 있습니까?" │
│ │
│ 신원 확인 권한 확인 │
│ 먼저 발생 인증 후 발생 │
│ 401 Unauthorized 403 Forbidden │
│ (인증되지 않음) (인증됐지만 허용되지 않음) │
│ │
│ │
│ 요청 흐름: │
│ │
│ ┌──────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ 요청 │───▶│ 인증 │───▶│ 인가 │ │
│ │ │ │ "이 사람은 │ │ "이 작업을 │ │
│ └──────────┘ │ 누구인가?" │ │ 할 수 있나?"│ │
│ └──────┬───────┘ └──────┬──────┘ │
│ │ │ │
│ ┌────┴───┐ ┌────┴───┐ │
│ │ │ │ │ │
│ 유효 무효 허용 거부 │
│ │ │ │ │ │
│ │ 401 에러 │ 403 에러 │
│ │ │ │
│ └───────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ 리소스 │ │
│ │ 접근 │ │
│ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
2. 역할 기반 접근 제어 (RBAC)¶
2.1 RBAC 개념¶
RBAC는 권한을 역할(roles)에 할당하고, 사용자는 역할에 할당됩니다. 개별 사용자 권한이 아닌 역할-권한 매핑을 관리하기 때문에 관리가 간단합니다.
┌─────────────────────────────────────────────────────────────────┐
│ RBAC 모델 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 사용자 역할 권한 │
│ ┌────────┐ ┌──────────┐ ┌──────────────────┐ │
│ │ Alice │───────▶│ Admin │──────▶│ create_user │ │
│ └────────┘ │ │──────▶│ delete_user │ │
│ ┌────────┐ │ │──────▶│ view_reports │ │
│ │ Bob │───┐ └──────────┘──────▶│ manage_settings │ │
│ └────────┘ │ ┌──────────┐ └──────────────────┘ │
│ └───▶│ Editor │ ┌──────────────────┐ │
│ ┌────────┐ │ │──────▶│ create_post │ │
│ │ Carol │───────▶│ │──────▶│ edit_post │ │
│ └────────┘ └──────────┘──────▶│ delete_own_post │ │
│ ┌────────┐ ┌──────────┐ └──────────────────┘ │
│ │ Dan │───────▶│ Viewer │ ┌──────────────────┐ │
│ └────────┘ │ │──────▶│ view_post │ │
│ └──────────┘──────▶│ view_reports │ │
│ └──────────────────┘ │
│ │
│ 계층 구조 (선택 사항): │
│ Admin ──▶ Editor ──▶ Viewer │
│ (Admin은 모든 Editor 및 Viewer 권한을 상속) │
│ │
└─────────────────────────────────────────────────────────────────┘
2.2 Python에서 RBAC 구현¶
"""
rbac.py - Role-Based Access Control implementation
"""
from dataclasses import dataclass, field
from typing import Set, Dict, Optional
from enum import Enum
from functools import wraps
# ==============================================================
# Core RBAC Model
# ==============================================================
class Permission(str, Enum):
"""Define all permissions in the system."""
# User management
CREATE_USER = "user:create"
READ_USER = "user:read"
UPDATE_USER = "user:update"
DELETE_USER = "user:delete"
# Post management
CREATE_POST = "post:create"
READ_POST = "post:read"
UPDATE_POST = "post:update"
DELETE_POST = "post:delete"
PUBLISH_POST = "post:publish"
# Report management
VIEW_REPORTS = "report:view"
EXPORT_REPORTS = "report:export"
# System
MANAGE_SETTINGS = "system:settings"
VIEW_AUDIT_LOG = "system:audit"
@dataclass
class Role:
"""A role is a named collection of permissions."""
name: str
permissions: Set[Permission] = field(default_factory=set)
parent: Optional['Role'] = None # For role hierarchy
def get_all_permissions(self) -> Set[Permission]:
"""Get all permissions including inherited ones."""
perms = set(self.permissions)
if self.parent:
perms |= self.parent.get_all_permissions()
return perms
def has_permission(self, permission: Permission) -> bool:
"""Check if this role has a specific permission."""
return permission in self.get_all_permissions()
@dataclass
class User:
"""A user with one or more roles."""
id: int
username: str
roles: Set[Role] = field(default_factory=set)
def has_permission(self, permission: Permission) -> bool:
"""Check if user has a specific permission through any role."""
return any(role.has_permission(permission) for role in self.roles)
def has_role(self, role_name: str) -> bool:
"""Check if user has a specific role."""
return any(role.name == role_name for role in self.roles)
def get_all_permissions(self) -> Set[Permission]:
"""Get all permissions from all roles."""
perms = set()
for role in self.roles:
perms |= role.get_all_permissions()
return perms
# ==============================================================
# Role Definitions
# ==============================================================
def create_default_roles() -> Dict[str, Role]:
"""Create the default role hierarchy."""
# Base role - everyone gets these
viewer = Role(
name="viewer",
permissions={
Permission.READ_POST,
Permission.READ_USER,
Permission.VIEW_REPORTS,
}
)
# Editor inherits from Viewer
editor = Role(
name="editor",
permissions={
Permission.CREATE_POST,
Permission.UPDATE_POST,
Permission.DELETE_POST,
Permission.PUBLISH_POST,
Permission.EXPORT_REPORTS,
},
parent=viewer
)
# Admin inherits from Editor
admin = Role(
name="admin",
permissions={
Permission.CREATE_USER,
Permission.UPDATE_USER,
Permission.DELETE_USER,
Permission.MANAGE_SETTINGS,
Permission.VIEW_AUDIT_LOG,
},
parent=editor
)
return {
"viewer": viewer,
"editor": editor,
"admin": admin,
}
# ==============================================================
# RBAC Manager
# ==============================================================
class RBACManager:
"""Centralized RBAC management."""
def __init__(self):
self.roles = create_default_roles()
self.users: Dict[int, User] = {}
def create_user(self, user_id: int, username: str,
role_names: list = None) -> User:
"""Create a user with specified roles."""
roles = set()
for name in (role_names or ["viewer"]):
if name in self.roles:
roles.add(self.roles[name])
else:
raise ValueError(f"Unknown role: {name}")
user = User(id=user_id, username=username, roles=roles)
self.users[user_id] = user
return user
def assign_role(self, user_id: int, role_name: str):
"""Assign a role to a user."""
if user_id not in self.users:
raise ValueError(f"User {user_id} not found")
if role_name not in self.roles:
raise ValueError(f"Role {role_name} not found")
self.users[user_id].roles.add(self.roles[role_name])
def revoke_role(self, user_id: int, role_name: str):
"""Remove a role from a user."""
if user_id not in self.users:
raise ValueError(f"User {user_id} not found")
self.users[user_id].roles = {
r for r in self.users[user_id].roles
if r.name != role_name
}
def check_access(self, user_id: int, permission: Permission) -> bool:
"""Check if a user has a specific permission."""
user = self.users.get(user_id)
if not user:
return False
return user.has_permission(permission)
# ==============================================================
# Demo
# ==============================================================
if __name__ == "__main__":
rbac = RBACManager()
# Create users
alice = rbac.create_user(1, "alice", ["admin"])
bob = rbac.create_user(2, "bob", ["editor"])
carol = rbac.create_user(3, "carol", ["viewer"])
# Check permissions
print("=== Permission Checks ===")
checks = [
(alice, Permission.DELETE_USER, "Alice can delete users"),
(alice, Permission.READ_POST, "Alice can read posts (inherited)"),
(bob, Permission.CREATE_POST, "Bob can create posts"),
(bob, Permission.DELETE_USER, "Bob can delete users"),
(carol, Permission.READ_POST, "Carol can read posts"),
(carol, Permission.CREATE_POST, "Carol can create posts"),
]
for user, perm, description in checks:
result = user.has_permission(perm)
status = "ALLOWED" if result else "DENIED"
print(f" [{status}] {description}")
# List all permissions
print(f"\nAlice's permissions: {len(alice.get_all_permissions())}")
for p in sorted(alice.get_all_permissions(), key=lambda x: x.value):
print(f" - {p.value}")
2.3 Flask RBAC 미들웨어¶
"""
flask_rbac.py - RBAC authorization middleware for Flask
"""
from flask import Flask, request, jsonify, g
from functools import wraps
import jwt
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key' # Use env var in production
# ==============================================================
# Authorization Decorators
# ==============================================================
def require_auth(f):
"""Decorator: Require authenticated user."""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return jsonify({"error": "Authentication required"}), 401
try:
payload = jwt.decode(
token,
app.config['SECRET_KEY'],
algorithms=['HS256']
)
g.current_user = {
'id': payload['sub'],
'roles': payload.get('roles', []),
'permissions': payload.get('permissions', []),
}
except jwt.InvalidTokenError as e:
return jsonify({"error": f"Invalid token: {str(e)}"}), 401
return f(*args, **kwargs)
return decorated
def require_role(*allowed_roles):
"""Decorator: Require user to have one of the specified roles."""
def decorator(f):
@wraps(f)
@require_auth
def decorated(*args, **kwargs):
user_roles = set(g.current_user.get('roles', []))
if not user_roles.intersection(set(allowed_roles)):
return jsonify({
"error": "Forbidden",
"detail": f"Required roles: {allowed_roles}"
}), 403
return f(*args, **kwargs)
return decorated
return decorator
def require_permission(*required_permissions):
"""Decorator: Require user to have all specified permissions."""
def decorator(f):
@wraps(f)
@require_auth
def decorated(*args, **kwargs):
user_perms = set(g.current_user.get('permissions', []))
missing = set(required_permissions) - user_perms
if missing:
return jsonify({
"error": "Forbidden",
"detail": f"Missing permissions: {list(missing)}"
}), 403
return f(*args, **kwargs)
return decorated
return decorator
# ==============================================================
# Protected Routes
# ==============================================================
@app.route('/api/posts', methods=['GET'])
@require_auth
def list_posts():
"""Any authenticated user can list posts."""
return jsonify({"posts": []})
@app.route('/api/posts', methods=['POST'])
@require_permission('post:create')
def create_post():
"""Only users with post:create permission."""
data = request.json
# Create post logic...
return jsonify({"message": "Post created"}), 201
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
@require_permission('post:delete')
def delete_post(post_id):
"""Only users with post:delete permission."""
# Additional check: can only delete own posts unless admin
post = get_post(post_id) # Your DB lookup
if not post:
return jsonify({"error": "Not found"}), 404
user = g.current_user
is_admin = 'admin' in user.get('roles', [])
is_owner = post['author_id'] == user['id']
if not is_admin and not is_owner:
return jsonify({"error": "Can only delete your own posts"}), 403
# Delete post logic...
return jsonify({"message": "Post deleted"})
@app.route('/api/admin/users', methods=['GET'])
@require_role('admin')
def list_users():
"""Admin-only endpoint."""
return jsonify({"users": []})
@app.route('/api/admin/settings', methods=['PUT'])
@require_role('admin')
def update_settings():
"""Admin-only: modify system settings."""
data = request.json
# Update settings logic...
return jsonify({"message": "Settings updated"})
# ==============================================================
# Dynamic Permission Check (for resource-level auth)
# ==============================================================
def check_resource_access(user_id: int, resource_type: str,
resource_id: int, action: str) -> bool:
"""
Check if a user can perform an action on a specific resource.
This goes beyond role-based checks to resource-level authorization.
"""
# Example: Check if user owns the resource or has admin role
user = get_user(user_id)
resource = get_resource(resource_type, resource_id)
if not user or not resource:
return False
# Admin can do anything
if 'admin' in user.get('roles', []):
return True
# Owner can read/update their own resources
if resource.get('owner_id') == user_id:
if action in ('read', 'update', 'delete'):
return True
# Check shared access
shared_with = resource.get('shared_with', [])
for share in shared_with:
if share['user_id'] == user_id:
if action in share.get('allowed_actions', []):
return True
return False
3. 속성 기반 접근 제어 (ABAC)¶
3.1 ABAC 개념¶
ABAC는 주체(사용자), 리소스, 작업 및 환경의 속성(attributes)을 기반으로 접근 결정을 내립니다. RBAC보다 더 유연하지만 더 복잡합니다.
┌─────────────────────────────────────────────────────────────────┐
│ ABAC 모델 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 접근 결정 = │
│ f(주체 속성, 리소스 속성, │
│ 작업 속성, 환경 속성) │
│ │
│ 주체 속성: 리소스 속성: │
│ ├── role: "doctor" ├── type: "medical_record" │
│ ├── department: "ER" ├── department: "ER" │
│ ├── clearance: "L3" ├── sensitivity: "L2" │
│ └── certification: true └── owner: "patient_123" │
│ │
│ 작업 속성: 환경 속성: │
│ ├── type: "read" ├── time: "14:30 UTC" │
│ └── purpose: "treatment"├── ip_address: "10.0.1.50" │
│ ├── location: "hospital_network" │
│ └── device_trust: "managed" │
│ │
│ 정책 예시: │
│ "응급실 부서의 의사는 병원 네트워크에서 관리되는 기기를 통해 │
│ 근무 시간 중에 응급실 부서의 의료 기록을 읽을 수 있다." │
│ │
│ Subject.role == "doctor" AND │
│ Subject.department == Resource.department AND │
│ Action.type == "read" AND │
│ Environment.time BETWEEN "07:00" AND "19:00" AND │
│ Environment.location == "hospital_network" │
│ ──▶ ALLOW │
│ │
└─────────────────────────────────────────────────────────────────┘
3.2 RBAC vs ABAC 사용 시점¶
| 기준 | RBAC | ABAC |
|---|---|---|
| 역할 수 | 소수, 명확히 정의됨 | 많거나 동적 |
| 접근 결정 | 역할 멤버십 | 다중 속성 |
| 복잡성 | 구현이 간단 | 복잡하지만 유연함 |
| "역할 폭발" | 위험 (역할이 너무 많음) | 문제 없음 |
| 컨텍스트 인식 | 없음 (정적 역할) | 있음 (시간, 위치 등) |
| 규정 준수 | 기본 요구사항 | 규제 요구사항 |
| 적합한 경우 | 대부분의 웹 앱, API | 의료, 금융, 정부 |
| 성능 | 빠름 (역할 조회) | 느림 (정책 평가) |
3.3 ABAC 구현¶
"""
abac.py - Attribute-Based Access Control implementation
"""
from dataclasses import dataclass, field
from datetime import datetime, time
from typing import Any, Dict, List, Callable
from enum import Enum
class Decision(Enum):
ALLOW = "allow"
DENY = "deny"
NOT_APPLICABLE = "not_applicable"
@dataclass
class AccessRequest:
"""Represents a request for access to a resource."""
# Subject attributes (who)
subject: Dict[str, Any]
# Resource attributes (what)
resource: Dict[str, Any]
# Action attributes (how)
action: Dict[str, Any]
# Environment attributes (context)
environment: Dict[str, Any] = field(default_factory=dict)
@dataclass
class Policy:
"""A single ABAC policy."""
name: str
description: str
priority: int # Lower = higher priority
effect: Decision # ALLOW or DENY
condition: Callable[[AccessRequest], bool]
def evaluate(self, request: AccessRequest) -> Decision:
"""Evaluate this policy against a request."""
try:
if self.condition(request):
return self.effect
return Decision.NOT_APPLICABLE
except (KeyError, TypeError):
return Decision.NOT_APPLICABLE
class PolicyEngine:
"""Evaluates access requests against a set of policies."""
def __init__(self, default_decision: Decision = Decision.DENY):
self.policies: List[Policy] = []
self.default_decision = default_decision
def add_policy(self, policy: Policy):
"""Add a policy to the engine."""
self.policies.append(policy)
# Keep sorted by priority
self.policies.sort(key=lambda p: p.priority)
def evaluate(self, request: AccessRequest) -> Decision:
"""
Evaluate all policies. Uses deny-overrides combining algorithm:
- If any policy says DENY, result is DENY
- If at least one says ALLOW and none say DENY, result is ALLOW
- Otherwise, use default decision
"""
has_allow = False
for policy in self.policies:
decision = policy.evaluate(request)
if decision == Decision.DENY:
return Decision.DENY # Deny overrides
if decision == Decision.ALLOW:
has_allow = True
return Decision.ALLOW if has_allow else self.default_decision
# ==============================================================
# Example Policies
# ==============================================================
def create_medical_policies() -> PolicyEngine:
"""Create policies for a healthcare system."""
engine = PolicyEngine(default_decision=Decision.DENY)
# Policy 1: Doctors can read patient records in their department
engine.add_policy(Policy(
name="doctor_read_department_records",
description="Doctors can read records in their department",
priority=10,
effect=Decision.ALLOW,
condition=lambda req: (
req.subject.get("role") == "doctor" and
req.action.get("type") == "read" and
req.resource.get("type") == "medical_record" and
req.subject.get("department") == req.resource.get("department")
)
))
# Policy 2: Doctors can write records for their patients
engine.add_policy(Policy(
name="doctor_write_own_patients",
description="Doctors can update records of patients assigned to them",
priority=10,
effect=Decision.ALLOW,
condition=lambda req: (
req.subject.get("role") == "doctor" and
req.action.get("type") in ("write", "update") and
req.resource.get("type") == "medical_record" and
req.subject.get("id") in req.resource.get("assigned_doctors", [])
)
))
# Policy 3: Nurses can read records in their department during shifts
engine.add_policy(Policy(
name="nurse_read_during_shift",
description="Nurses can read records during their shift hours",
priority=20,
effect=Decision.ALLOW,
condition=lambda req: (
req.subject.get("role") == "nurse" and
req.action.get("type") == "read" and
req.resource.get("type") == "medical_record" and
req.subject.get("department") == req.resource.get("department") and
_is_during_shift(req.environment.get("time"),
req.subject.get("shift_start"),
req.subject.get("shift_end"))
)
))
# Policy 4: No access from untrusted devices
engine.add_policy(Policy(
name="deny_untrusted_devices",
description="Deny access from non-managed devices",
priority=1, # High priority - evaluated first
effect=Decision.DENY,
condition=lambda req: (
req.resource.get("sensitivity", "low") in ("high", "critical") and
req.environment.get("device_trust") != "managed"
)
))
# Policy 5: Emergency override - on-duty doctors in ER
engine.add_policy(Policy(
name="emergency_override",
description="ER doctors can access any record during emergency",
priority=5,
effect=Decision.ALLOW,
condition=lambda req: (
req.subject.get("role") == "doctor" and
req.subject.get("department") == "emergency" and
req.action.get("type") == "read" and
req.environment.get("emergency_mode") is True
)
))
return engine
def _is_during_shift(current_time, shift_start, shift_end):
"""Check if current time is within shift hours."""
if not all([current_time, shift_start, shift_end]):
return False
if isinstance(current_time, str):
current_time = datetime.fromisoformat(current_time).time()
shift_start = time.fromisoformat(shift_start)
shift_end = time.fromisoformat(shift_end)
if shift_start <= shift_end:
return shift_start <= current_time <= shift_end
else: # Overnight shift
return current_time >= shift_start or current_time <= shift_end
# ==============================================================
# Demo
# ==============================================================
if __name__ == "__main__":
engine = create_medical_policies()
# Test Case 1: Doctor reading records in their department
request1 = AccessRequest(
subject={"id": "dr_smith", "role": "doctor", "department": "cardiology"},
resource={"type": "medical_record", "department": "cardiology",
"sensitivity": "high"},
action={"type": "read"},
environment={"device_trust": "managed", "time": "2024-01-15T10:30:00"}
)
print(f"Doctor reads own dept: {engine.evaluate(request1).value}")
# Test Case 2: Doctor reading records in different department
request2 = AccessRequest(
subject={"id": "dr_smith", "role": "doctor", "department": "cardiology"},
resource={"type": "medical_record", "department": "neurology",
"sensitivity": "high"},
action={"type": "read"},
environment={"device_trust": "managed"}
)
print(f"Doctor reads other dept: {engine.evaluate(request2).value}")
# Test Case 3: Access from untrusted device (denied regardless)
request3 = AccessRequest(
subject={"id": "dr_smith", "role": "doctor", "department": "cardiology"},
resource={"type": "medical_record", "department": "cardiology",
"sensitivity": "high"},
action={"type": "read"},
environment={"device_trust": "personal"} # Not managed!
)
print(f"Untrusted device: {engine.evaluate(request3).value}")
# Test Case 4: Emergency override
request4 = AccessRequest(
subject={"id": "dr_jones", "role": "doctor", "department": "emergency"},
resource={"type": "medical_record", "department": "cardiology",
"sensitivity": "critical"},
action={"type": "read"},
environment={"device_trust": "managed", "emergency_mode": True}
)
print(f"Emergency override: {engine.evaluate(request4).value}")
4. 접근 제어 목록 (ACL)¶
4.1 ACL 개념¶
ACL은 개별 리소스 수준에서 권한을 정의합니다. 각 리소스에는 어떤 주체(사용자, 그룹)가 어떤 작업을 수행할 수 있는지 지정하는 항목 목록이 있습니다.
┌─────────────────────────────────────────────────────────────────┐
│ ACL 모델 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 리소스: "Project Plan.docx" │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ ACL 항목 │ 권한 │ │
│ ├─────────────────────┼─────────────────────────────────┤ │
│ │ alice (소유자) │ read, write, delete, share │ │
│ │ bob │ read, write │ │
│ │ carol │ read │ │
│ │ engineering (그룹) │ read, comment │ │
│ │ * (모든 사람) │ (접근 권한 없음) │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ 리소스: "Company Financials.xlsx" │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ ACL 항목 │ 권한 │ │
│ ├─────────────────────┼─────────────────────────────────┤ │
│ │ alice (소유자) │ read, write, delete, share │ │
│ │ finance (그룹) │ read, write │ │
│ │ ceo │ read │ │
│ │ * (모든 사람) │ (접근 권한 없음) │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ Unix 파일 권한 (단순화된 ACL): │
│ -rwxr-xr-- owner group file.txt │
│ │││ │││ │││ │
│ │││ │││ └┴┴── Others: 읽기만 │
│ │││ └┴┴────── Group: 읽기 + 실행 │
│ └┴┴────────── Owner: 읽기 + 쓰기 + 실행 │
│ │
└─────────────────────────────────────────────────────────────────┘
4.2 ACL 구현¶
"""
acl.py - Access Control List implementation for resource-level permissions
"""
from dataclasses import dataclass, field
from typing import Set, Dict, Optional, List
from enum import Flag, auto
class ACLPermission(Flag):
"""Permission flags (combinable with bitwise OR)."""
NONE = 0
READ = auto()
WRITE = auto()
DELETE = auto()
SHARE = auto()
ADMIN = READ | WRITE | DELETE | SHARE # All permissions combined
@dataclass
class ACLEntry:
"""A single ACL entry mapping a principal to permissions."""
principal_type: str # "user" or "group"
principal_id: str
permissions: ACLPermission
@dataclass
class Resource:
"""A resource with an access control list."""
id: str
name: str
owner_id: str
acl: List[ACLEntry] = field(default_factory=list)
def grant(self, principal_type: str, principal_id: str,
permissions: ACLPermission):
"""Grant permissions to a principal."""
# Check if entry already exists
for entry in self.acl:
if (entry.principal_type == principal_type and
entry.principal_id == principal_id):
entry.permissions |= permissions # Add permissions
return
# Create new entry
self.acl.append(ACLEntry(
principal_type=principal_type,
principal_id=principal_id,
permissions=permissions,
))
def revoke(self, principal_type: str, principal_id: str,
permissions: ACLPermission = None):
"""Revoke permissions (or all access) from a principal."""
if permissions is None:
# Remove entire entry
self.acl = [
e for e in self.acl
if not (e.principal_type == principal_type and
e.principal_id == principal_id)
]
else:
for entry in self.acl:
if (entry.principal_type == principal_type and
entry.principal_id == principal_id):
entry.permissions &= ~permissions # Remove specific perms
def check_access(self, user_id: str, user_groups: Set[str],
required: ACLPermission) -> bool:
"""Check if a user has the required permissions."""
# Owner always has full access
if user_id == self.owner_id:
return True
effective_perms = ACLPermission.NONE
for entry in self.acl:
if entry.principal_type == "user" and entry.principal_id == user_id:
effective_perms |= entry.permissions
elif (entry.principal_type == "group" and
entry.principal_id in user_groups):
effective_perms |= entry.permissions
return bool(effective_perms & required)
def get_effective_permissions(self, user_id: str,
user_groups: Set[str]) -> ACLPermission:
"""Get all effective permissions for a user."""
if user_id == self.owner_id:
return ACLPermission.ADMIN
effective = ACLPermission.NONE
for entry in self.acl:
if entry.principal_type == "user" and entry.principal_id == user_id:
effective |= entry.permissions
elif (entry.principal_type == "group" and
entry.principal_id in user_groups):
effective |= entry.permissions
return effective
# ==============================================================
# ACL Manager with Database Backend (simplified)
# ==============================================================
class ACLManager:
"""Manage ACLs across multiple resources."""
def __init__(self):
self.resources: Dict[str, Resource] = {}
def create_resource(self, resource_id: str, name: str,
owner_id: str) -> Resource:
"""Create a new resource."""
resource = Resource(id=resource_id, name=name, owner_id=owner_id)
self.resources[resource_id] = resource
return resource
def check(self, user_id: str, user_groups: Set[str],
resource_id: str, permission: ACLPermission) -> bool:
"""Check access to a resource."""
resource = self.resources.get(resource_id)
if not resource:
return False
return resource.check_access(user_id, user_groups, permission)
def share(self, resource_id: str, requesting_user_id: str,
target_type: str, target_id: str,
permissions: ACLPermission) -> bool:
"""Share a resource (only owner or users with SHARE can do this)."""
resource = self.resources.get(resource_id)
if not resource:
return False
# Check if requesting user can share
if requesting_user_id != resource.owner_id:
if not resource.check_access(
requesting_user_id, set(), ACLPermission.SHARE
):
return False
resource.grant(target_type, target_id, permissions)
return True
# ==============================================================
# Demo
# ==============================================================
if __name__ == "__main__":
manager = ACLManager()
# Create a document
doc = manager.create_resource("doc1", "Project Plan.docx", owner_id="alice")
# Share with bob (read + write)
manager.share("doc1", "alice", "user", "bob",
ACLPermission.READ | ACLPermission.WRITE)
# Share with engineering group (read only)
manager.share("doc1", "alice", "group", "engineering", ACLPermission.READ)
# Check access
print("=== ACL Checks ===")
print(f"Alice (owner) read: {manager.check('alice', set(), 'doc1', ACLPermission.READ)}")
print(f"Alice (owner) delete: {manager.check('alice', set(), 'doc1', ACLPermission.DELETE)}")
print(f"Bob read: {manager.check('bob', set(), 'doc1', ACLPermission.READ)}")
print(f"Bob write: {manager.check('bob', set(), 'doc1', ACLPermission.WRITE)}")
print(f"Bob delete: {manager.check('bob', set(), 'doc1', ACLPermission.DELETE)}")
# Carol is in engineering group
print(f"Carol (eng) read: {manager.check('carol', {'engineering'}, 'doc1', ACLPermission.READ)}")
print(f"Carol (eng) write: {manager.check('carol', {'engineering'}, 'doc1', ACLPermission.WRITE)}")
# Dan has no access
print(f"Dan read: {manager.check('dan', set(), 'doc1', ACLPermission.READ)}")
5. 정책 엔진: OPA (Open Policy Agent)¶
5.1 OPA란?¶
Open Policy Agent (OPA)는 애플리케이션 로직에서 정책 의사 결정을 분리하는 범용 정책 엔진입니다. 정책은 선언적 쿼리 언어인 Rego로 작성됩니다.
┌─────────────────────────────────────────────────────────────────┐
│ OPA 아키텍처 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 애플리케이션 │──────▶│ OPA │ │
│ │ (Python, │ Query │ ┌──────┐ │ │
│ │ Java, │───────│──▶│ Rego │ │ │
│ │ Go...) │ │ │Policy│ │ │
│ │ │◀──────│ └──────┘ │ │
│ │ │Decision│ ┌──────┐ │ │
│ └──────────────┘ │ │ Data │ │ │
│ │ │(JSON)│ │ │
│ │ └──────┘ │ │
│ └──────────────┘ │
│ │
│ 배포 옵션: │
│ 1. 라이브러리 (앱에 내장) │
│ 2. 사이드카 (앱과 함께 실행되는 데몬) │
│ 3. 독립 실행형 서비스 (중앙 집중식) │
│ │
│ 주요 이점: │
│ - 코드로서의 정책 (버전 관리, 테스트, 감사) │
│ - 언어 독립적 (모든 앱이 OPA를 쿼리 가능) │
│ - 관심사 분리 (개발자는 로직 작성, 보안팀은 정책 작성) │
│ │
└─────────────────────────────────────────────────────────────────┘
5.2 Rego 정책 언어¶
# authz.rego - Example OPA policy for API authorization
package authz
import rego.v1
# Default deny all requests
default allow := false
# Admin users can do anything
allow if {
input.user.roles[_] == "admin"
}
# Users can read their own profile
allow if {
input.action == "read"
input.resource.type == "profile"
input.resource.owner == input.user.id
}
# Editors can create and update posts
allow if {
input.user.roles[_] == "editor"
input.action in {"create", "update"}
input.resource.type == "post"
}
# Users can delete their own posts
allow if {
input.action == "delete"
input.resource.type == "post"
input.resource.owner == input.user.id
}
# Time-based access: deny outside business hours for sensitive data
deny if {
input.resource.sensitivity == "high"
not is_business_hours
}
is_business_hours if {
hour := time.clock(time.now_ns())[0]
hour >= 8
hour < 18
}
# Final decision (deny overrides allow)
decision := "allow" if {
allow
not deny
}
decision := "deny" if {
not allow
}
decision := "deny" if {
deny
}
5.3 Python과 OPA 통합¶
"""
opa_integration.py - Integrating OPA with a Python/Flask application
pip install requests
"""
import requests
import json
from flask import Flask, request, jsonify, g
from functools import wraps
app = Flask(__name__)
# OPA runs as a sidecar or standalone service
OPA_URL = "http://localhost:8181/v1/data/authz/decision"
def check_opa_policy(user: dict, action: str, resource: dict) -> bool:
"""Query OPA for an authorization decision."""
opa_input = {
"input": {
"user": user,
"action": action,
"resource": resource,
}
}
try:
response = requests.post(
OPA_URL,
json=opa_input,
timeout=1, # Fail fast
)
response.raise_for_status()
result = response.json()
return result.get("result") == "allow"
except requests.RequestException as e:
app.logger.error(f"OPA query failed: {e}")
return False # Fail closed (deny on error)
def require_opa(action: str, resource_fn=None):
"""
Decorator that checks OPA policy before allowing access.
resource_fn: function that builds resource dict from request context
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
user = g.get('current_user')
if not user:
return jsonify({"error": "Not authenticated"}), 401
# Build resource context
if resource_fn:
resource = resource_fn(request, **kwargs)
else:
resource = {"type": "unknown"}
# Query OPA
if not check_opa_policy(user, action, resource):
return jsonify({"error": "Access denied by policy"}), 403
return f(*args, **kwargs)
return decorated
return decorator
# Example resource builders
def post_resource(req, post_id=None, **kwargs):
"""Build resource dict for post endpoints."""
if post_id:
post = get_post(post_id) # Your DB lookup
return {
"type": "post",
"id": post_id,
"owner": post.get("author_id") if post else None,
"sensitivity": "normal",
}
return {"type": "post"}
# Protected routes using OPA
@app.route('/api/posts', methods=['POST'])
@require_opa("create", lambda req, **kw: {"type": "post"})
def create_post():
return jsonify({"message": "Post created"}), 201
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
@require_opa("delete", post_resource)
def delete_post(post_id):
return jsonify({"message": "Post deleted"})
6. 인가를 위한 JWT 클레임¶
6.1 표준 및 사용자 정의 클레임¶
┌─────────────────────────────────────────────────────────────────┐
│ 인가를 위한 JWT 클레임 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 표준 (등록된) 클레임: │
│ ┌────────────┬──────────────────────────────────────────┐ │
│ │ 클레임 │ 목적 │ │
│ ├────────────┼──────────────────────────────────────────┤ │
│ │ sub │ 주체 (사용자 ID) │ │
│ │ iss │ 발급자 (토큰을 생성한 주체) │ │
│ │ aud │ 대상자 (토큰을 수락해야 하는 주체) │ │
│ │ exp │ 만료 시간 │ │
│ │ iat │ 발급 시간 │ │
│ │ nbf │ 이전 시간 │ │
│ │ jti │ JWT ID (고유 식별자) │ │
│ └────────────┴──────────────────────────────────────────┘ │
│ │
│ 사용자 정의 인가 클레임: │
│ { │
│ "sub": "user_123", │
│ "roles": ["editor", "reviewer"], │
│ "permissions": ["post:create", "post:update", "post:read"], │
│ "org_id": "org_456", │
│ "department": "engineering", │
│ "tier": "premium", │
│ "scope": "read write" │
│ } │
│ │
│ 경고: JWT 페이로드를 작게 유지하세요! │
│ - JWT는 모든 요청과 함께 전송됩니다 (Authorization 헤더) │
│ - 큰 페이로드는 대역폭과 지연 시간을 증가시킵니다 │
│ - JWT에는 최소한의 인증 정보만 넣고, 세부 정보는 DB/캐시에서 │
│ 가져오세요 │
│ │
└─────────────────────────────────────────────────────────────────┘
6.2 클레임 기반 인가 미들웨어¶
"""
jwt_authz.py - JWT claim-based authorization
"""
from flask import Flask, request, jsonify, g
from functools import wraps
import jwt
app = Flask(__name__)
JWT_SECRET = "your-256-bit-secret" # Use env var in production
def decode_jwt(token: str) -> dict:
"""Decode and validate JWT."""
return jwt.decode(
token,
JWT_SECRET,
algorithms=["HS256"],
options={"require": ["exp", "sub", "iss"]}
)
def require_claims(**required_claims):
"""
Decorator that checks JWT claims match requirements.
Usage:
@require_claims(roles=["admin"], tier="premium")
@require_claims(permissions=["post:create"])
@require_claims(org_id=lambda v: v == g.resource_org_id)
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Extract token
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({"error": "Missing token"}), 401
try:
token = auth_header.split(' ', 1)[1]
claims = decode_jwt(token)
g.claims = claims
except jwt.InvalidTokenError as e:
return jsonify({"error": str(e)}), 401
# Check each required claim
for claim_name, expected in required_claims.items():
actual = claims.get(claim_name)
# Callable check (custom validation function)
if callable(expected):
if not expected(actual):
return jsonify({
"error": f"Claim '{claim_name}' validation failed"
}), 403
# List check (user must have at least one matching value)
elif isinstance(expected, list):
user_values = actual if isinstance(actual, list) else [actual]
if not set(expected).intersection(set(user_values)):
return jsonify({
"error": f"Required claim '{claim_name}': {expected}"
}), 403
# Direct value check
else:
if actual != expected:
return jsonify({
"error": f"Claim '{claim_name}' mismatch"
}), 403
return f(*args, **kwargs)
return decorated
return decorator
# Route examples
@app.route('/api/admin/dashboard')
@require_claims(roles=["admin"])
def admin_dashboard():
return jsonify({"message": "Welcome, admin"})
@app.route('/api/premium/features')
@require_claims(tier="premium")
def premium_features():
return jsonify({"features": ["advanced_analytics", "api_access"]})
@app.route('/api/posts', methods=['POST'])
@require_claims(permissions=["post:create"])
def create_post():
return jsonify({"message": "Post created"})
@app.route('/api/org/<org_id>/data')
@require_claims(org_id=lambda v: v == request.view_args.get('org_id'))
def org_data(org_id):
"""Only users belonging to this org can access."""
return jsonify({"org_id": org_id, "data": []})
7. OAuth 2.0 스코프¶
7.1 스코프 이해하기¶
OAuth 2.0 스코프는 액세스 토큰이 할 수 있는 작업을 제한합니다. 클라이언트 애플리케이션에 사용자가 부여한 권한을 나타냅니다.
┌─────────────────────────────────────────────────────────────────┐
│ OAuth 2.0 스코프 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 동의 화면: │
│ ┌─────────────────────────────────────────────┐ │
│ │ "MyApp"이 귀하의 계정에 접근하려고 합니다: │ │
│ │ │ │
│ │ [x] 프로필 읽기 (scope: profile:read) │ │
│ │ [x] 이메일 읽기 (scope: email:read) │ │
│ │ [ ] 대신 이메일 보내기 (email:send) │ │
│ │ [ ] 데이터 삭제 (data:delete) │ │
│ │ │ │
│ │ [허용] [거부] │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 결과 토큰: │
│ { │
│ "scope": "profile:read email:read", │
│ "client_id": "myapp", │
│ "sub": "user_123" │
│ } │
│ │
│ 일반적인 스코프 패턴: │
│ ┌──────────────────────────────────────────────────┐ │
│ │ 패턴 │ 예시 │ │
│ ├──────────────────┼───────────────────────────────┤ │
│ │ resource:action │ posts:read, posts:write │ │
│ │ resource.action │ user.email, user.profile │ │
│ │ hierarchical │ admin (모든 것을 의미) │ │
│ │ OIDC standard │ openid, profile, email │ │
│ │ GitHub style │ repo, user, gist │ │
│ │ Google style │ drive.readonly, calendar │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
7.2 스코프 강제¶
"""
scope_enforcement.py - OAuth scope checking for APIs
"""
from flask import Flask, request, jsonify, g
from functools import wraps
app = Flask(__name__)
def require_scope(*required_scopes):
"""
Decorator to enforce OAuth 2.0 scopes.
Token must contain ALL required scopes.
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Token scopes are typically space-separated
token_scopes = set(g.claims.get('scope', '').split())
required = set(required_scopes)
missing = required - token_scopes
if missing:
return jsonify({
"error": "insufficient_scope",
"error_description": f"Missing scopes: {', '.join(missing)}",
"required_scopes": list(required),
"granted_scopes": list(token_scopes),
}), 403
return f(*args, **kwargs)
return decorated
return decorator
@app.route('/api/user/profile')
@require_scope('profile:read')
def get_profile():
return jsonify({"name": "Alice", "email": "alice@example.com"})
@app.route('/api/user/profile', methods=['PUT'])
@require_scope('profile:read', 'profile:write')
def update_profile():
return jsonify({"message": "Profile updated"})
@app.route('/api/user/data/export')
@require_scope('data:export')
def export_data():
return jsonify({"download_url": "https://..."})
8. 리소스 수준 권한¶
8.1 역할 기반을 넘어서: 리소스 소유권¶
많은 애플리케이션은 "사용자가 적절한 역할을 가지고 있는가"뿐만 아니라 "사용자가 이 특정 리소스에 접근할 수 있는가"를 확인해야 합니다.
┌─────────────────────────────────────────────────────────────────┐
│ 리소스 수준 권한 확인 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 레벨 1: 역할 확인 │
│ "사용자가 편집자인가?" → 예/아니오 │
│ │
│ 레벨 2: 리소스 소유권 │
│ "사용자가 이 게시물의 소유자인가?" → 예/아니오 │
│ │
│ 레벨 3: 공유 접근 │
│ "게시물이 이 사용자와 공유되었는가?" → 예/아니오 │
│ │
│ 레벨 4: 조직 범위 │
│ "사용자가 같은 조직에 속하는가?" → 예/아니오 │
│ │
│ 결합: │
│ ┌────────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ 역할 확인 │───▶│ 리소스 │───▶│ 추가 │ │
│ │ (편집자?) │ │ 소유권 │ │ 제약 조건 │ │
│ └────────────┘ │ (소유자?) │ │ (같은 조직?) │ │
│ └────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
8.2 구현: 다층 인가¶
"""
resource_auth.py - Multi-layer authorization for resources
"""
from flask import Flask, request, jsonify, g, abort
from functools import wraps
from typing import Optional, Callable
app = Flask(__name__)
# ==============================================================
# Resource Authorization Framework
# ==============================================================
class ResourceAuthorizer:
"""
Multi-layer authorization for resources.
Checks are performed in order: role → ownership → sharing → custom.
"""
def __init__(self):
self.custom_checks = {}
def register_check(self, resource_type: str,
check_fn: Callable) -> None:
"""Register a custom authorization check for a resource type."""
if resource_type not in self.custom_checks:
self.custom_checks[resource_type] = []
self.custom_checks[resource_type].append(check_fn)
def authorize(self, user: dict, resource: dict,
action: str) -> bool:
"""
Check if user can perform action on resource.
Returns True if authorized.
"""
resource_type = resource.get('type')
# Layer 1: Super admin bypass
if 'super_admin' in user.get('roles', []):
return True
# Layer 2: Resource ownership
if resource.get('owner_id') == user.get('id'):
return True
# Layer 3: Role-based for the resource type
role_permissions = self._get_role_permissions(
user.get('roles', []), resource_type
)
if action in role_permissions:
return True
# Layer 4: Shared access
shares = resource.get('shares', [])
for share in shares:
if share.get('user_id') == user.get('id'):
if action in share.get('permissions', []):
return True
if share.get('group_id') in user.get('groups', []):
if action in share.get('permissions', []):
return True
# Layer 5: Custom checks
for check_fn in self.custom_checks.get(resource_type, []):
if check_fn(user, resource, action):
return True
return False
def _get_role_permissions(self, roles: list,
resource_type: str) -> set:
"""Get permissions for roles on a resource type."""
# In production, this comes from a database
role_perms = {
'admin': {
'post': {'read', 'create', 'update', 'delete', 'publish'},
'comment': {'read', 'create', 'update', 'delete'},
'user': {'read', 'create', 'update', 'delete'},
},
'editor': {
'post': {'read', 'create', 'update'},
'comment': {'read', 'create', 'update', 'delete'},
},
'viewer': {
'post': {'read'},
'comment': {'read', 'create'},
},
}
perms = set()
for role in roles:
if role in role_perms and resource_type in role_perms[role]:
perms |= role_perms[role][resource_type]
return perms
# Global authorizer instance
authorizer = ResourceAuthorizer()
# ==============================================================
# Authorization Decorator
# ==============================================================
def authorize_resource(action: str, resource_loader: Callable):
"""
Decorator for resource-level authorization.
resource_loader: function that returns the resource dict.
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
user = g.get('current_user')
if not user:
return jsonify({"error": "Not authenticated"}), 401
resource = resource_loader(**kwargs)
if not resource:
return jsonify({"error": "Resource not found"}), 404
if not authorizer.authorize(user, resource, action):
return jsonify({
"error": "Forbidden",
"detail": f"You don't have '{action}' access to this resource"
}), 403
g.resource = resource
return f(*args, **kwargs)
return decorated
return decorator
# Resource loaders
def load_post(post_id: int = None, **kwargs):
"""Load a post from the database."""
# Simulated DB lookup
posts = {
1: {"type": "post", "id": 1, "owner_id": "alice", "org_id": "org1",
"shares": [{"user_id": "bob", "permissions": ["read"]}]},
2: {"type": "post", "id": 2, "owner_id": "bob", "org_id": "org1",
"shares": []},
}
return posts.get(post_id)
# Routes
@app.route('/api/posts/<int:post_id>', methods=['GET'])
@authorize_resource('read', load_post)
def get_post(post_id):
return jsonify(g.resource)
@app.route('/api/posts/<int:post_id>', methods=['PUT'])
@authorize_resource('update', load_post)
def update_post(post_id):
return jsonify({"message": "Updated"})
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
@authorize_resource('delete', load_post)
def delete_post(post_id):
return jsonify({"message": "Deleted"})
9. 일반적인 인가 취약점¶
9.1 IDOR (안전하지 않은 직접 객체 참조)¶
IDOR은 애플리케이션이 내부 객체 참조(예: 데이터베이스 ID)를 노출하면서 요청 사용자가 접근할 권한이 있는지 확인하지 않을 때 발생합니다.
┌─────────────────────────────────────────────────────────────────┐
│ IDOR 취약점 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 취약한 경우: │
│ GET /api/invoices/12345 │
│ → invoice 12345 반환 (소유권 확인 없음!) │
│ │
│ 공격자가 ID 변경: │
│ GET /api/invoices/12346 │
│ → 다른 사람의 invoice 반환! 🚨 │
│ │
│ GET /api/users/100/profile → 공격자의 프로필 │
│ GET /api/users/101/profile → 다른 사용자의 프로필! │
│ GET /api/users/102/profile → 또 다른 사용자! │
│ │
└─────────────────────────────────────────────────────────────────┘
"""
idor_example.py - IDOR vulnerability and fix
"""
# ============================================================
# VULNERABLE: No authorization check on resource access
# ============================================================
@app.route('/api/invoices/<int:invoice_id>')
@require_auth # Only checks authentication, NOT authorization!
def get_invoice_vulnerable(invoice_id):
# Any authenticated user can access ANY invoice by ID
invoice = db.get_invoice(invoice_id)
if not invoice:
return jsonify({"error": "Not found"}), 404
return jsonify(invoice) # IDOR: No ownership check!
# ============================================================
# FIXED: Verify resource ownership
# ============================================================
@app.route('/api/invoices/<int:invoice_id>')
@require_auth
def get_invoice_secure(invoice_id):
invoice = db.get_invoice(invoice_id)
if not invoice:
return jsonify({"error": "Not found"}), 404
# Check: Does this invoice belong to the current user?
if invoice['user_id'] != g.current_user['id']:
# Return 404 (not 403) to avoid information disclosure
# A 403 tells attacker "the resource exists but you can't access it"
return jsonify({"error": "Not found"}), 404
return jsonify(invoice)
# ============================================================
# BETTER: Use indirect references
# ============================================================
@app.route('/api/my/invoices')
@require_auth
def list_my_invoices():
"""Only return the current user's invoices."""
invoices = db.get_invoices_for_user(g.current_user['id'])
return jsonify({"invoices": invoices})
9.2 권한 상승¶
┌─────────────────────────────────────────────────────────────────┐
│ 권한 상승 유형 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 수직 상승 (더 높은 권한 획득): │
│ ┌──────────┐ ┌──────────┐ │
│ │ 일반 │ ──────▶ │ 관리자 │ │
│ │ 사용자 │ 공격 │ 사용자 │ │
│ └──────────┘ └──────────┘ │
│ │
│ 예시: JWT 역할 클레임 수정 │
│ 원본: {"sub": "user1", "role": "user"} │
│ 변조됨: {"sub": "user1", "role": "admin"} │
│ │
│ 수평 상승 (다른 사용자의 데이터 접근): │
│ ┌──────────┐ ┌──────────┐ │
│ │ 사용자 A │ ──────▶ │ 사용자 B │ │
│ │ (본인) │ 접근 │ (타인) │ │
│ └──────────┘ └──────────┘ │
│ │
│ 예시: user_id 파라미터 변경 │
│ 본인 데이터: GET /api/users/100/settings │
│ 타인 데이터: GET /api/users/101/settings │
│ │
└─────────────────────────────────────────────────────────────────┘
"""
privilege_escalation.py - Privilege escalation vulnerabilities and fixes
"""
# ============================================================
# VULNERABLE: Role in client-controlled input
# ============================================================
@app.route('/api/register', methods=['POST'])
def register_vulnerable():
data = request.json
user = {
'username': data['username'],
'email': data['email'],
'role': data.get('role', 'user'), # Attacker sends role=admin!
}
db.create_user(user)
return jsonify(user), 201
# Attack:
# POST /api/register
# {"username": "attacker", "email": "a@b.com", "role": "admin"}
# ============================================================
# FIXED: Never trust client-supplied role
# ============================================================
@app.route('/api/register', methods=['POST'])
def register_secure():
data = request.json
user = {
'username': data['username'],
'email': data['email'],
'role': 'user', # Always set server-side!
}
db.create_user(user)
return jsonify(user), 201
# ============================================================
# VULNERABLE: Mass assignment (updating fields not intended)
# ============================================================
@app.route('/api/users/<int:user_id>', methods=['PATCH'])
@require_auth
def update_user_vulnerable(user_id):
data = request.json
# Blindly update all provided fields
db.update_user(user_id, **data) # Attacker sends {"role": "admin"}!
return jsonify({"message": "Updated"})
# ============================================================
# FIXED: Whitelist allowed fields
# ============================================================
ALLOWED_UPDATE_FIELDS = {'username', 'email', 'bio', 'avatar_url'}
@app.route('/api/users/<int:user_id>', methods=['PATCH'])
@require_auth
def update_user_secure(user_id):
# Ensure user can only update their own profile
if user_id != g.current_user['id']:
return jsonify({"error": "Forbidden"}), 403
data = request.json
# Only allow whitelisted fields
safe_data = {
k: v for k, v in data.items()
if k in ALLOWED_UPDATE_FIELDS
}
if not safe_data:
return jsonify({"error": "No valid fields to update"}), 400
db.update_user(user_id, **safe_data)
return jsonify({"message": "Updated"})
# ============================================================
# VULNERABLE: Broken function-level access control
# ============================================================
# Admin endpoint with no authorization check
@app.route('/api/admin/delete-user/<int:user_id>', methods=['DELETE'])
@require_auth # Only checks if user is logged in, not if they're admin!
def delete_user_vulnerable(user_id):
db.delete_user(user_id)
return jsonify({"message": "User deleted"})
# ============================================================
# FIXED: Proper role check on admin endpoints
# ============================================================
@app.route('/api/admin/delete-user/<int:user_id>', methods=['DELETE'])
@require_role('admin') # Checks both auth AND admin role
def delete_user_secure(user_id):
# Additional safety: prevent deleting self or other admins
target = db.get_user(user_id)
if not target:
return jsonify({"error": "User not found"}), 404
if target['id'] == g.current_user['id']:
return jsonify({"error": "Cannot delete yourself"}), 400
if 'admin' in target.get('roles', []):
return jsonify({"error": "Cannot delete other admins via API"}), 403
db.delete_user(user_id)
return jsonify({"message": "User deleted"})
9.3 인가 취약점 체크리스트¶
┌─────────────────────────────────────────────────────────────────┐
│ 인가 보안 체크리스트 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [ ] 모든 엔드포인트에 명시적인 인가 확인이 있음 │
│ [ ] 인가는 서버 측에서 강제됨 (클라이언트 전용 금지) │
│ [ ] 직접 객체 참조는 소유권에 대해 검증됨 │
│ [ ] 관리자 기능은 관리자 역할 확인이 필요함 │
│ [ ] 역할/권한 변경은 관리자 인가가 필요함 │
│ [ ] 사용자 입력은 역할 또는 권한 할당을 제어하지 않음 │
│ [ ] API 응답은 다른 사용자의 데이터를 노출하지 않음 │
│ [ ] 실패한 인가는 403 (또는 IDOR의 경우 404)을 반환함 │
│ [ ] 인가 로직은 중앙 집중화됨 (중복되지 않음) │
│ [ ] 대량 할당이 방지됨 (필드 화이트리스팅) │
│ [ ] 수평 접근이 확인됨 (사용자 A는 B의 데이터에 접근 불가) │
│ [ ] 인가 결정이 감사를 위해 로깅됨 │
│ [ ] 기본 거부 정책 (명시적으로 허용되지 않는 한 거부) │
│ [ ] 토큰 기반 인증은 스코프/클레임을 확인함 │
│ [ ] 다중 테넌트 격리가 데이터 계층에서 강제됨 │
│ │
└─────────────────────────────────────────────────────────────────┘
10. 연습 문제¶
연습 문제 1: RBAC 시스템 구현¶
역할 계층을 가진 완전한 RBAC 시스템 구축:
"""
Exercise: Implement a complete RBAC system for a blogging platform.
Requirements:
- Roles: super_admin, admin, moderator, author, reader
- Role hierarchy: super_admin > admin > moderator > author > reader
- Permissions: user:*, post:*, comment:*, settings:*
- Users can have multiple roles
- Implement role assignment with authorization (only admin+ can assign)
"""
class BlogRBAC:
def create_role(self, name: str, permissions: set,
parent: str = None) -> bool:
"""Create a new role with optional parent."""
pass
def assign_role(self, admin_id: int, target_user_id: int,
role: str) -> bool:
"""Assign role (only admins can do this)."""
pass
def check_permission(self, user_id: int,
permission: str) -> bool:
"""Check if user has permission (including inherited)."""
pass
def get_accessible_resources(self, user_id: int,
resource_type: str) -> list:
"""Get all resources a user can access."""
pass
연습 문제 2: ABAC 정책 엔진 구축¶
의료 ABAC 시스템 생성:
"""
Exercise: Build an ABAC engine for a hospital system.
Policies to implement:
1. Doctors can read patient records in their department
2. Nurses can read records during their shift only
3. Emergency doctors can override department restrictions
4. No one can access records from non-hospital IP addresses
5. Psychiatry records require additional clearance level
6. Research access requires IRB approval attribute
"""
class HospitalABAC:
def add_policy(self, name: str, condition: callable,
effect: str) -> None:
pass
def evaluate(self, subject: dict, resource: dict,
action: dict, context: dict) -> str:
pass
연습 문제 3: 인가 취약점 수정¶
이 코드의 모든 인가 문제를 찾아 수정:
"""
Exercise: This API has at least 7 authorization vulnerabilities.
Find and fix ALL of them.
"""
@app.route('/api/users', methods=['GET'])
def list_all_users():
# Issue 1: ???
return jsonify(db.get_all_users())
@app.route('/api/users/<int:id>/password', methods=['PUT'])
@require_auth
def change_password(id):
# Issue 2: ???
new_password = request.json['password']
db.update_password(id, new_password)
return jsonify({"status": "updated"})
@app.route('/api/posts/<int:id>', methods=['DELETE'])
@require_auth
def delete_post(id):
post = db.get_post(id)
# Issue 3: ???
db.delete_post(id)
return jsonify({"status": "deleted"})
@app.route('/api/admin/promote', methods=['POST'])
@require_auth
def promote_user():
# Issue 4: ???
user_id = request.json['user_id']
role = request.json['role'] # Issue 5: ???
db.set_role(user_id, role)
return jsonify({"status": "promoted"})
@app.route('/api/files/<path:filepath>')
@require_auth
def download_file(filepath):
# Issue 6: ???
return send_file(f'/uploads/{filepath}')
@app.route('/api/settings', methods=['PUT'])
@require_auth
def update_settings():
# Issue 7: ???
settings = request.json
db.update_all_settings(settings)
return jsonify({"status": "updated"})
연습 문제 4: 다중 테넌트 인가¶
다중 테넌트 인가 시스템 설계 및 구현:
"""
Exercise: Build authorization for a multi-tenant SaaS application.
Requirements:
- Each tenant (organization) has isolated data
- Users belong to exactly one organization
- Roles are per-organization (admin in org A != admin in org B)
- Cross-tenant access is never allowed
- Super-admins (platform level) can access any tenant
"""
class MultiTenantAuth:
def create_org(self, org_id: str, owner_id: str) -> dict:
pass
def check_tenant_access(self, user_id: str, org_id: str,
resource_id: str, action: str) -> bool:
pass
def ensure_tenant_isolation(self, query: str,
user_org_id: str) -> str:
"""Add tenant filter to database queries."""
pass
연습 문제 5: OPA 정책 작성¶
다음 시나리오에 대한 Rego 정책 작성:
# Exercise: Write Rego policies for:
#
# 1. Users can only access resources in their department
# 2. Managers can access resources in their department and
# departments they manage
# 3. Financial reports require "finance" role AND "senior" level
# 4. API rate limiting: users with "basic" tier limited to 100 req/hour
# 5. Data classification: "top_secret" resources require MFA
# authentication within the last 30 minutes
package exercise
import rego.v1
# Write your policies here:
default allow := false
# Policy 1: Department access
# ...
# Policy 2: Manager cross-department access
# ...
# Policy 3: Financial reports
# ...
# Policy 4: Rate limiting
# ...
# Policy 5: MFA requirement for classified data
# ...
11. 요약¶
┌─────────────────────────────────────────────────────────────────┐
│ 인가 및 접근 제어 요약 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 모델: │
│ - RBAC: 역할 → 권한. 간단하고 널리 사용됨. │
│ - ABAC: 속성 → 정책 → 결정. 유연하고 컨텍스트 인식. │
│ - ACL: 리소스별 권한 목록. 세밀한 제어. │
│ - 모델 결합 (RBAC + 리소스 소유권이 일반적) │
│ │
│ 주요 원칙: │
│ - 최소 권한: 필요한 최소 권한만 부여 │
│ - 기본 거부: 명시적으로 허용되지 않은 모든 것을 차단 │
│ - 직무 분리: 단일 역할이 모든 것을 할 수 없음 │
│ - 심층 방어: 여러 계층에서 확인 │
│ - 중앙 집중화: 인가 로직을 한 곳에 │
│ │
│ 구현: │
│ - 서버 측 강제 (클라이언트를 절대 신뢰하지 않음) │
│ - 상태 비저장 API 인가를 위한 JWT 클레임 │
│ - 제3자 접근 위임을 위한 OAuth 스코프 │
│ - 복잡하거나 외부화된 정책을 위한 정책 엔진 (OPA) │
│ - Flask에서 DRY 인가를 위한 데코레이터/미들웨어 │
│ │
│ 일반적인 취약점: │
│ - IDOR: 항상 리소스 소유권을 검증 │
│ - 권한 상승: 클라이언트가 제공한 역할을 절대 신뢰하지 않음 │
│ - 대량 할당: 업데이트 가능한 필드를 화이트리스트로 관리 │
│ - 기능 수준 확인 누락: 모든 엔드포인트에 인증이 필요 │
│ │
└─────────────────────────────────────────────────────────────────┘
이전: 05. 인증 시스템 | 다음: 07. OWASP Top 10 (2021)