app.py

Download
python 791 lines 27.8 KB
  1"""
  2๋ณด์•ˆ API ์„œ๋ฒ„ ๋ฐ๋ชจ (stdlib only)
  3Secure API Server Demo (stdlib only)
  4
  5ํŒจ์Šค์›Œ๋“œ ํ•ด์‹ฑ(PBKDF2), JWT ์ธ์ฆ, RBAC, ์ž…๋ ฅ ๊ฒ€์ฆ, ๋ณด์•ˆ ํ—ค๋”,
  6๋ ˆ์ดํŠธ ๋ฆฌ๋ฏธํŒ…, ๊ตฌ์กฐํ™” ๋กœ๊น…์„ ํฌํ•จํ•˜๋Š” ์™„์ „ํ•œ ๋ณด์•ˆ API ์„œ๋ฒ„๋ฅผ ๊ตฌํ˜„ํ•ฉ๋‹ˆ๋‹ค.
  7http.server ๊ธฐ๋ฐ˜์œผ๋กœ ์™ธ๋ถ€ ์˜์กด์„ฑ ์—†์ด ๋™์ž‘ํ•ฉ๋‹ˆ๋‹ค.
  8
  9Implements a complete secure API server with password hashing (PBKDF2),
 10JWT authentication, RBAC, input validation, security headers, rate limiting,
 11and structured logging. Uses only Python stdlib (http.server).
 12"""
 13
 14import hashlib
 15import hmac
 16import json
 17import os
 18import re
 19import secrets
 20import time
 21import base64
 22import logging
 23import sys
 24from datetime import datetime, timedelta, timezone
 25from http.server import HTTPServer, BaseHTTPRequestHandler
 26from urllib.parse import urlparse, parse_qs
 27from functools import wraps
 28
 29
 30# =============================================================================
 31# Configuration (์„ค์ •)
 32# =============================================================================
 33
 34JWT_SECRET = secrets.token_bytes(32)
 35JWT_ALGORITHM = "HS256"
 36JWT_EXPIRY_MINUTES = 30
 37BCRYPT_ITERATIONS = 100_000
 38SERVER_PORT = 0  # Assigned dynamically for demo
 39
 40# Structured logger
 41logging.basicConfig(
 42    level=logging.INFO,
 43    format='%(asctime)s | %(levelname)-5s | %(message)s',
 44    datefmt='%Y-%m-%d %H:%M:%S',
 45    stream=sys.stdout,
 46)
 47logger = logging.getLogger("secure_api")
 48
 49
 50# =============================================================================
 51# 1. Password Hashing (ํŒจ์Šค์›Œ๋“œ ํ•ด์‹ฑ - PBKDF2)
 52# =============================================================================
 53
 54class PasswordHasher:
 55    """Hash and verify passwords using PBKDF2-HMAC-SHA256."""
 56
 57    ITERATIONS = BCRYPT_ITERATIONS
 58    SALT_LENGTH = 16
 59    KEY_LENGTH = 32
 60
 61    @staticmethod
 62    def hash_password(password: str) -> str:
 63        """Hash a password, returning 'salt$hash' as hex strings."""
 64        salt = os.urandom(PasswordHasher.SALT_LENGTH)
 65        dk = hashlib.pbkdf2_hmac(
 66            "sha256", password.encode(), salt,
 67            PasswordHasher.ITERATIONS, dklen=PasswordHasher.KEY_LENGTH,
 68        )
 69        return salt.hex() + "$" + dk.hex()
 70
 71    @staticmethod
 72    def verify_password(password: str, stored: str) -> bool:
 73        """Verify a password against a stored hash."""
 74        try:
 75            salt_hex, hash_hex = stored.split("$")
 76            salt = bytes.fromhex(salt_hex)
 77            expected = bytes.fromhex(hash_hex)
 78            dk = hashlib.pbkdf2_hmac(
 79                "sha256", password.encode(), salt,
 80                PasswordHasher.ITERATIONS, dklen=PasswordHasher.KEY_LENGTH,
 81            )
 82            return hmac.compare_digest(dk, expected)
 83        except (ValueError, AttributeError):
 84            return False
 85
 86
 87# =============================================================================
 88# 2. JWT Implementation (JWT ๊ตฌํ˜„)
 89# =============================================================================
 90
 91class JWT:
 92    """Minimal JWT (HS256) implementation using stdlib only."""
 93
 94    @staticmethod
 95    def _b64url_encode(data: bytes) -> str:
 96        return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
 97
 98    @staticmethod
 99    def _b64url_decode(s: str) -> bytes:
100        padding = 4 - len(s) % 4
101        if padding != 4:
102            s += "=" * padding
103        return base64.urlsafe_b64decode(s)
104
105    @staticmethod
106    def encode(payload: dict, secret: bytes) -> str:
107        """Create a JWT token."""
108        header = {"alg": "HS256", "typ": "JWT"}
109        header_b64 = JWT._b64url_encode(json.dumps(header).encode())
110        payload_b64 = JWT._b64url_encode(json.dumps(payload).encode())
111
112        message = f"{header_b64}.{payload_b64}".encode()
113        signature = hmac.new(secret, message, hashlib.sha256).digest()
114        sig_b64 = JWT._b64url_encode(signature)
115
116        return f"{header_b64}.{payload_b64}.{sig_b64}"
117
118    @staticmethod
119    def decode(token: str, secret: bytes) -> dict | None:
120        """Decode and verify a JWT token. Returns None if invalid."""
121        try:
122            parts = token.split(".")
123            if len(parts) != 3:
124                return None
125
126            header_b64, payload_b64, sig_b64 = parts
127
128            # Verify signature
129            message = f"{header_b64}.{payload_b64}".encode()
130            expected_sig = hmac.new(secret, message, hashlib.sha256).digest()
131            actual_sig = JWT._b64url_decode(sig_b64)
132
133            if not hmac.compare_digest(expected_sig, actual_sig):
134                return None
135
136            # Decode payload
137            payload = json.loads(JWT._b64url_decode(payload_b64))
138
139            # Check expiration
140            if "exp" in payload:
141                exp_time = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
142                if datetime.now(timezone.utc) > exp_time:
143                    return None
144
145            return payload
146        except Exception:
147            return None
148
149
150# =============================================================================
151# 3. RBAC - Role-Based Access Control (์—ญํ•  ๊ธฐ๋ฐ˜ ์ ‘๊ทผ ์ œ์–ด)
152# =============================================================================
153
154class RBAC:
155    """Simple role-based access control system."""
156
157    # Role hierarchy: admin > editor > viewer
158    PERMISSIONS = {
159        "admin":  {"read", "write", "delete", "admin"},
160        "editor": {"read", "write"},
161        "viewer": {"read"},
162    }
163
164    @staticmethod
165    def has_permission(role: str, required_permission: str) -> bool:
166        perms = RBAC.PERMISSIONS.get(role, set())
167        return required_permission in perms
168
169    @staticmethod
170    def require_permission(role: str, permission: str) -> tuple[bool, str]:
171        if RBAC.has_permission(role, permission):
172            return True, f"Access granted: role '{role}' has '{permission}'"
173        return False, f"Access denied: role '{role}' lacks '{permission}'"
174
175
176# =============================================================================
177# 4. Input Validation (์ž…๋ ฅ ๊ฒ€์ฆ)
178# =============================================================================
179
180class Validator:
181    """Input validation for API requests."""
182
183    @staticmethod
184    def validate_username(value: str) -> tuple[bool, str]:
185        if not value or not re.match(r'^[a-zA-Z0-9_]{3,30}$', value):
186            return False, "Username must be 3-30 alphanumeric/underscore characters"
187        return True, ""
188
189    @staticmethod
190    def validate_password(value: str) -> tuple[bool, str]:
191        if len(value) < 8:
192            return False, "Password must be at least 8 characters"
193        if not re.search(r'[A-Z]', value):
194            return False, "Password must contain an uppercase letter"
195        if not re.search(r'[0-9]', value):
196            return False, "Password must contain a digit"
197        return True, ""
198
199    @staticmethod
200    def validate_email(value: str) -> tuple[bool, str]:
201        if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', value):
202            return False, "Invalid email format"
203        return True, ""
204
205    @staticmethod
206    def sanitize(value: str, max_len: int = 200) -> str:
207        value = value[:max_len]
208        value = re.sub(r'[\x00-\x1f\x7f]', '', value)
209        return value
210
211
212# =============================================================================
213# 5. Rate Limiter (๋ ˆ์ดํŠธ ๋ฆฌ๋ฏธํ„ฐ)
214# =============================================================================
215
216class RateLimiter:
217    """Per-IP sliding window rate limiter."""
218
219    def __init__(self, max_requests: int = 10, window_seconds: int = 60):
220        self.max_requests = max_requests
221        self.window = window_seconds
222        self.requests: dict[str, list[float]] = {}
223
224    def is_allowed(self, client_ip: str) -> bool:
225        now = time.monotonic()
226        if client_ip not in self.requests:
227            self.requests[client_ip] = []
228
229        # Remove expired entries
230        self.requests[client_ip] = [
231            t for t in self.requests[client_ip] if now - t < self.window
232        ]
233
234        if len(self.requests[client_ip]) >= self.max_requests:
235            return False
236
237        self.requests[client_ip].append(now)
238        return True
239
240    def remaining(self, client_ip: str) -> int:
241        now = time.monotonic()
242        recent = [t for t in self.requests.get(client_ip, []) if now - t < self.window]
243        return max(0, self.max_requests - len(recent))
244
245
246# =============================================================================
247# 6. Security Headers (๋ณด์•ˆ ํ—ค๋”)
248# =============================================================================
249
250SECURITY_HEADERS = {
251    "X-Content-Type-Options": "nosniff",
252    "X-Frame-Options": "DENY",
253    "X-XSS-Protection": "0",  # Modern browsers: use CSP instead
254    "Content-Security-Policy": "default-src 'none'; frame-ancestors 'none'",
255    "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
256    "Referrer-Policy": "strict-origin-when-cross-origin",
257    "Cache-Control": "no-store",
258    "Permissions-Policy": "camera=(), microphone=(), geolocation=()",
259}
260
261
262# =============================================================================
263# 7. In-Memory User Store (๋ฉ”๋ชจ๋ฆฌ ๊ธฐ๋ฐ˜ ์‚ฌ์šฉ์ž ์ €์žฅ์†Œ)
264# =============================================================================
265
266class UserStore:
267    """Simple in-memory user database for demo purposes."""
268
269    def __init__(self):
270        self.users: dict[str, dict] = {}
271        self.hasher = PasswordHasher()
272
273    def register(self, username: str, password: str, email: str,
274                 role: str = "viewer") -> tuple[bool, str]:
275        if username in self.users:
276            return False, "Username already exists"
277
278        ok, msg = Validator.validate_username(username)
279        if not ok:
280            return False, msg
281        ok, msg = Validator.validate_password(password)
282        if not ok:
283            return False, msg
284        ok, msg = Validator.validate_email(email)
285        if not ok:
286            return False, msg
287
288        self.users[username] = {
289            "password_hash": self.hasher.hash_password(password),
290            "email": email,
291            "role": role,
292            "created": datetime.now(timezone.utc).isoformat(),
293        }
294        return True, "User registered successfully"
295
296    def authenticate(self, username: str, password: str) -> dict | None:
297        user = self.users.get(username)
298        if not user:
299            # Constant-time comparison to prevent timing attacks
300            self.hasher.verify_password(password, "0" * 32 + "$" + "0" * 64)
301            return None
302        if self.hasher.verify_password(password, user["password_hash"]):
303            return {"username": username, "role": user["role"], "email": user["email"]}
304        return None
305
306    def get_user(self, username: str) -> dict | None:
307        user = self.users.get(username)
308        if user:
309            return {"username": username, "role": user["role"], "email": user["email"]}
310        return None
311
312
313# =============================================================================
314# 8. Structured Logging (๊ตฌ์กฐํ™” ๋กœ๊น…)
315# =============================================================================
316
317class SecurityLogger:
318    """Structured logging for security events."""
319
320    @staticmethod
321    def log_auth_attempt(username: str, success: bool, ip: str):
322        status = "SUCCESS" if success else "FAILURE"
323        logger.info(f"AUTH | {status} | user={username} ip={ip}")
324
325    @staticmethod
326    def log_access(method: str, path: str, ip: str, status: int, user: str = "anonymous"):
327        logger.info(f"ACCESS | {method} {path} | status={status} user={user} ip={ip}")
328
329    @staticmethod
330    def log_rate_limit(ip: str, path: str):
331        logger.warning(f"RATE_LIMIT | ip={ip} path={path}")
332
333    @staticmethod
334    def log_validation_error(field: str, error: str, ip: str):
335        logger.warning(f"VALIDATION | field={field} error={error} ip={ip}")
336
337
338# =============================================================================
339# 9. API Request Handler (API ์š”์ฒญ ํ•ธ๋“ค๋Ÿฌ)
340# =============================================================================
341
342# Global state for the handler
343user_store = UserStore()
344rate_limiter = RateLimiter(max_requests=20, window_seconds=60)
345sec_logger = SecurityLogger()
346
347
348class SecureAPIHandler(BaseHTTPRequestHandler):
349    """HTTP request handler with security features."""
350
351    def _get_client_ip(self) -> str:
352        return self.client_address[0] if self.client_address else "unknown"
353
354    def _send_json(self, status: int, data: dict):
355        self.send_response(status)
356        self.send_header("Content-Type", "application/json")
357        for key, value in SECURITY_HEADERS.items():
358            self.send_header(key, value)
359        remaining = rate_limiter.remaining(self._get_client_ip())
360        self.send_header("X-RateLimit-Remaining", str(remaining))
361        self.end_headers()
362        self.wfile.write(json.dumps(data).encode())
363
364    def _read_body(self) -> dict:
365        length = int(self.headers.get("Content-Length", 0))
366        if length > 10_000:  # Max 10KB body
367            return {}
368        try:
369            return json.loads(self.rfile.read(length)) if length > 0 else {}
370        except (json.JSONDecodeError, UnicodeDecodeError):
371            return {}
372
373    def _get_current_user(self) -> dict | None:
374        auth = self.headers.get("Authorization", "")
375        if not auth.startswith("Bearer "):
376            return None
377        token = auth[7:]
378        payload = JWT.decode(token, JWT_SECRET)
379        if payload and "sub" in payload:
380            return user_store.get_user(payload["sub"])
381        return None
382
383    def _check_rate_limit(self) -> bool:
384        ip = self._get_client_ip()
385        if not rate_limiter.is_allowed(ip):
386            sec_logger.log_rate_limit(ip, self.path)
387            self._send_json(429, {"error": "Too many requests", "retry_after": 60})
388            return False
389        return True
390
391    # --- Routes ---
392
393    def do_GET(self):
394        if not self._check_rate_limit():
395            return
396        ip = self._get_client_ip()
397
398        parsed = urlparse(self.path)
399        path = parsed.path
400
401        if path == "/api/health":
402            self._handle_health()
403        elif path == "/api/profile":
404            self._handle_profile()
405        elif path == "/api/users":
406            self._handle_list_users()
407        else:
408            self._send_json(404, {"error": "Not found"})
409            sec_logger.log_access("GET", path, ip, 404)
410
411    def do_POST(self):
412        if not self._check_rate_limit():
413            return
414        ip = self._get_client_ip()
415
416        parsed = urlparse(self.path)
417        path = parsed.path
418
419        if path == "/api/register":
420            self._handle_register()
421        elif path == "/api/login":
422            self._handle_login()
423        elif path == "/api/data":
424            self._handle_create_data()
425        else:
426            self._send_json(404, {"error": "Not found"})
427            sec_logger.log_access("POST", path, ip, 404)
428
429    def do_DELETE(self):
430        if not self._check_rate_limit():
431            return
432        ip = self._get_client_ip()
433
434        parsed = urlparse(self.path)
435        path = parsed.path
436
437        if path.startswith("/api/users/"):
438            self._handle_delete_user(path)
439        else:
440            self._send_json(404, {"error": "Not found"})
441            sec_logger.log_access("DELETE", path, ip, 404)
442
443    # --- Handlers ---
444
445    def _handle_health(self):
446        self._send_json(200, {"status": "healthy", "timestamp": datetime.now(timezone.utc).isoformat()})
447
448    def _handle_register(self):
449        ip = self._get_client_ip()
450        body = self._read_body()
451
452        username = Validator.sanitize(body.get("username", ""))
453        password = body.get("password", "")
454        email = Validator.sanitize(body.get("email", ""))
455
456        ok, msg = user_store.register(username, password, email)
457        if ok:
458            sec_logger.log_access("POST", "/api/register", ip, 201, username)
459            self._send_json(201, {"message": msg, "username": username})
460        else:
461            sec_logger.log_validation_error("registration", msg, ip)
462            self._send_json(400, {"error": msg})
463
464    def _handle_login(self):
465        ip = self._get_client_ip()
466        body = self._read_body()
467
468        username = body.get("username", "")
469        password = body.get("password", "")
470
471        user = user_store.authenticate(username, password)
472        if user:
473            payload = {
474                "sub": username,
475                "role": user["role"],
476                "exp": int((datetime.now(timezone.utc) + timedelta(minutes=JWT_EXPIRY_MINUTES)).timestamp()),
477                "iat": int(datetime.now(timezone.utc).timestamp()),
478            }
479            token = JWT.encode(payload, JWT_SECRET)
480            sec_logger.log_auth_attempt(username, True, ip)
481            self._send_json(200, {"token": token, "expires_in": JWT_EXPIRY_MINUTES * 60})
482        else:
483            sec_logger.log_auth_attempt(username, False, ip)
484            self._send_json(401, {"error": "Invalid credentials"})
485
486    def _handle_profile(self):
487        ip = self._get_client_ip()
488        user = self._get_current_user()
489        if not user:
490            self._send_json(401, {"error": "Authentication required"})
491            return
492        sec_logger.log_access("GET", "/api/profile", ip, 200, user["username"])
493        self._send_json(200, {"user": user})
494
495    def _handle_list_users(self):
496        ip = self._get_client_ip()
497        user = self._get_current_user()
498        if not user:
499            self._send_json(401, {"error": "Authentication required"})
500            return
501
502        ok, msg = RBAC.require_permission(user["role"], "admin")
503        if not ok:
504            sec_logger.log_access("GET", "/api/users", ip, 403, user["username"])
505            self._send_json(403, {"error": msg})
506            return
507
508        users = [user_store.get_user(u) for u in user_store.users]
509        sec_logger.log_access("GET", "/api/users", ip, 200, user["username"])
510        self._send_json(200, {"users": users})
511
512    def _handle_create_data(self):
513        ip = self._get_client_ip()
514        user = self._get_current_user()
515        if not user:
516            self._send_json(401, {"error": "Authentication required"})
517            return
518
519        ok, msg = RBAC.require_permission(user["role"], "write")
520        if not ok:
521            sec_logger.log_access("POST", "/api/data", ip, 403, user["username"])
522            self._send_json(403, {"error": msg})
523            return
524
525        body = self._read_body()
526        sec_logger.log_access("POST", "/api/data", ip, 201, user["username"])
527        self._send_json(201, {"message": "Data created", "data": body})
528
529    def _handle_delete_user(self, path: str):
530        ip = self._get_client_ip()
531        user = self._get_current_user()
532        if not user:
533            self._send_json(401, {"error": "Authentication required"})
534            return
535
536        ok, msg = RBAC.require_permission(user["role"], "admin")
537        if not ok:
538            sec_logger.log_access("DELETE", path, ip, 403, user["username"])
539            self._send_json(403, {"error": msg})
540            return
541
542        target = path.split("/")[-1]
543        if target in user_store.users:
544            del user_store.users[target]
545            sec_logger.log_access("DELETE", path, ip, 200, user["username"])
546            self._send_json(200, {"message": f"User '{target}' deleted"})
547        else:
548            self._send_json(404, {"error": "User not found"})
549
550    def log_message(self, format, *args):
551        """Suppress default HTTP server logging (we use our own)."""
552        pass
553
554
555# =============================================================================
556# 10. Demo: Simulate API Interactions (๋ฐ๋ชจ: API ์ƒํ˜ธ์ž‘์šฉ ์‹œ๋ฎฌ๋ ˆ์ด์…˜)
557# =============================================================================
558
559def run_demo():
560    """Demonstrate all security features without starting a real server."""
561    print("=" * 60)
562    print("  Secure API Server Demo (Simulation)")
563    print("  ๋ณด์•ˆ API ์„œ๋ฒ„ ๋ฐ๋ชจ (์‹œ๋ฎฌ๋ ˆ์ด์…˜)")
564    print("=" * 60)
565
566    # --- Password Hashing ---
567    print("\n" + "=" * 60)
568    print("1. Password Hashing (PBKDF2-HMAC-SHA256)")
569    print("=" * 60)
570
571    hasher = PasswordHasher()
572    pw = "MyStr0ng!Pass"
573    hashed = hasher.hash_password(pw)
574    print(f"\n  Password: {pw}")
575    print(f"  Hash:     {hashed[:20]}...{hashed[-20:]}")
576    print(f"  Verify correct:  {hasher.verify_password(pw, hashed)}")
577    print(f"  Verify wrong:    {hasher.verify_password('wrong', hashed)}")
578
579    # --- JWT ---
580    print("\n" + "=" * 60)
581    print("2. JWT Token (HS256)")
582    print("=" * 60)
583
584    payload = {
585        "sub": "alice",
586        "role": "admin",
587        "exp": int((datetime.now(timezone.utc) + timedelta(hours=1)).timestamp()),
588        "iat": int(datetime.now(timezone.utc).timestamp()),
589    }
590    token = JWT.encode(payload, JWT_SECRET)
591    print(f"\n  Payload: {json.dumps(payload, indent=2)}")
592    print(f"  Token:   {token[:40]}...{token[-20:]}")
593
594    decoded = JWT.decode(token, JWT_SECRET)
595    print(f"  Decode valid token:  sub={decoded['sub']}, role={decoded['role']}")
596
597    bad_decode = JWT.decode(token + "tampered", JWT_SECRET)
598    print(f"  Decode tampered:     {bad_decode}")
599
600    expired_payload = {**payload, "exp": int((datetime.now(timezone.utc) - timedelta(hours=1)).timestamp())}
601    expired_token = JWT.encode(expired_payload, JWT_SECRET)
602    exp_decode = JWT.decode(expired_token, JWT_SECRET)
603    print(f"  Decode expired:      {exp_decode}")
604
605    # --- RBAC ---
606    print("\n" + "=" * 60)
607    print("3. Role-Based Access Control")
608    print("=" * 60)
609
610    roles = ["admin", "editor", "viewer"]
611    perms = ["read", "write", "delete", "admin"]
612    print(f"\n  {'Role':<10} | {'read':>5} | {'write':>5} | {'delete':>6} | {'admin':>5}")
613    print(f"  {'-'*10}-+-{'-'*5}-+-{'-'*5}-+-{'-'*6}-+-{'-'*5}")
614    for role in roles:
615        row = f"  {role:<10}"
616        for perm in perms:
617            allowed = RBAC.has_permission(role, perm)
618            row += f" | {'YES':>{len(perm)}}" if allowed else f" | {'-':>{len(perm)}}"
619        print(row)
620
621    # --- User Registration & Auth Flow ---
622    print("\n" + "=" * 60)
623    print("4. Registration & Authentication Flow")
624    print("=" * 60)
625
626    store = UserStore()
627
628    # Register users
629    users_to_register = [
630        ("alice", "Str0ng!Admin", "alice@example.com", "admin"),
631        ("bob", "B0bEdit0r!", "bob@example.com", "editor"),
632        ("charlie", "Ch@rlie99", "charlie@example.com", "viewer"),
633    ]
634
635    print("\n  Registration:")
636    for uname, pw, email, role in users_to_register:
637        ok, msg = store.register(uname, pw, email, role)
638        print(f"    {uname:10s} ({role:6s}): {msg}")
639
640    # Duplicate registration
641    ok, msg = store.register("alice", "Another!1", "a@b.com")
642    print(f"    {'alice':10s} (dup)  : {msg}")
643
644    # Weak password
645    ok, msg = store.register("weak_user", "short", "w@b.com")
646    print(f"    {'weak_user':10s} (weak) : {msg}")
647
648    # Authentication
649    print("\n  Authentication:")
650    for uname, pw in [("alice", "Str0ng!Admin"), ("bob", "WrongPass!"), ("nonexist", "any")]:
651        user = store.authenticate(uname, pw)
652        status = f"role={user['role']}" if user else "FAILED"
653        print(f"    {uname:10s}: {status}")
654
655    # --- Input Validation ---
656    print("\n" + "=" * 60)
657    print("5. Input Validation")
658    print("=" * 60)
659
660    test_inputs = [
661        ("username", "valid_user", Validator.validate_username),
662        ("username", "ab", Validator.validate_username),
663        ("password", "Str0ng!Pass", Validator.validate_password),
664        ("password", "weak", Validator.validate_password),
665        ("email", "user@example.com", Validator.validate_email),
666        ("email", "not-an-email", Validator.validate_email),
667    ]
668
669    print()
670    for field, value, fn in test_inputs:
671        ok, msg = fn(value)
672        status = "OK" if ok else f"FAIL: {msg}"
673        print(f"    {field:10s} = {value:20s} -> {status}")
674
675    # Sanitization
676    dangerous = '<script>alert("xss")</script>'
677    print(f"\n    Sanitize: '{dangerous[:30]}...' -> '{Validator.sanitize(dangerous)[:40]}...'")
678
679    # --- Security Headers ---
680    print("\n" + "=" * 60)
681    print("6. Security Response Headers")
682    print("=" * 60)
683
684    print()
685    for header, value in SECURITY_HEADERS.items():
686        print(f"    {header}: {value}")
687
688    # --- Rate Limiting ---
689    print("\n" + "=" * 60)
690    print("7. Rate Limiting")
691    print("=" * 60)
692
693    rl = RateLimiter(max_requests=5, window_seconds=60)
694    print(f"\n  Config: max=5 requests per 60 seconds\n")
695    for i in range(1, 8):
696        allowed = rl.is_allowed("192.168.1.1")
697        remaining = rl.remaining("192.168.1.1")
698        status = "ALLOWED" if allowed else "RATE LIMITED"
699        print(f"    Request {i}: {status} (remaining: {remaining})")
700
701    # --- Structured Logging ---
702    print("\n" + "=" * 60)
703    print("8. Structured Security Logging")
704    print("=" * 60)
705    print()
706
707    sec_logger.log_auth_attempt("alice", True, "10.0.0.1")
708    sec_logger.log_auth_attempt("hacker", False, "10.0.0.99")
709    sec_logger.log_access("GET", "/api/profile", "10.0.0.1", 200, "alice")
710    sec_logger.log_access("DELETE", "/api/users/bob", "10.0.0.99", 403, "hacker")
711    sec_logger.log_rate_limit("10.0.0.99", "/api/login")
712    sec_logger.log_validation_error("password", "too short", "10.0.0.50")
713
714    # --- Full Flow Summary ---
715    print("\n" + "=" * 60)
716    print("9. Complete API Flow Summary")
717    print("=" * 60)
718
719    print("""
720  Typical secure API request lifecycle:
721
722    1. Client sends request
723       -> Rate limiter checks per-IP request count
724    2. If POST /api/register:
725       -> Validate username, password strength, email format
726       -> Hash password with PBKDF2 (100k iterations)
727       -> Store user with role
728    3. If POST /api/login:
729       -> Authenticate with constant-time comparison
730       -> Issue JWT token (HS256, 30 min expiry)
731       -> Log authentication attempt
732    4. For protected endpoints:
733       -> Extract & verify JWT from Authorization header
734       -> Check RBAC permissions for user's role
735       -> Process request if authorized
736    5. Response:
737       -> Add security headers (CSP, HSTS, X-Frame-Options, etc.)
738       -> Include rate limit remaining count
739       -> Return JSON response
740    6. Logging:
741       -> Structured logs for all auth events
742       -> Access logs with user, IP, status code
743       -> Rate limit violation warnings
744""")
745
746    print("=" * 60)
747    print("  Server code ready at: examples/Security/15_secure_api/app.py")
748    print("  To run as actual server: python app.py --serve")
749    print("=" * 60)
750
751
752# =============================================================================
753# 11. Optional: Run as Actual HTTP Server
754# =============================================================================
755
756def run_server(port: int = 8443):
757    """Start the actual HTTP server (for manual testing)."""
758    # Seed with demo users
759    user_store.register("admin", "Adm1n!Pass", "admin@example.com", "admin")
760    user_store.register("editor", "Ed1tor!Pass", "editor@example.com", "editor")
761    user_store.register("viewer", "V1ewer!Pass", "viewer@example.com", "viewer")
762
763    server = HTTPServer(("127.0.0.1", port), SecureAPIHandler)
764    print(f"\n  Secure API server running on http://127.0.0.1:{port}")
765    print(f"  Demo users: admin/Adm1n!Pass, editor/Ed1tor!Pass, viewer/V1ewer!Pass")
766    print(f"  Press Ctrl+C to stop.\n")
767    print(f"  Try: curl -X POST http://127.0.0.1:{port}/api/login \\")
768    print(f'        -H "Content-Type: application/json" \\')
769    print(f"        -d '{{\"username\":\"admin\",\"password\":\"Adm1n!Pass\"}}'\n")
770
771    try:
772        server.serve_forever()
773    except KeyboardInterrupt:
774        print("\n  Server stopped.")
775        server.server_close()
776
777
778# =============================================================================
779# Main
780# =============================================================================
781
782if __name__ == "__main__":
783    if "--serve" in sys.argv:
784        port = 8443
785        for i, arg in enumerate(sys.argv):
786            if arg == "--port" and i + 1 < len(sys.argv):
787                port = int(sys.argv[i + 1])
788        run_server(port)
789    else:
790        run_demo()