1"""
2๋ณด์ API ์๋ฒ ๋ฐ๋ชจ (stdlib only)
3Secure API Server Demo (stdlib only)
4
5ํจ์ค์๋ ํด์ฑ(PBKDF2), JWT ์ธ์ฆ, RBAC, ์
๋ ฅ ๊ฒ์ฆ, ๋ณด์ ํค๋,
6๋ ์ดํธ ๋ฆฌ๋ฏธํ
, ๊ตฌ์กฐํ ๋ก๊น
์ ํฌํจํ๋ ์์ ํ ๋ณด์ API ์๋ฒ๋ฅผ ๊ตฌํํฉ๋๋ค.
7http.server ๊ธฐ๋ฐ์ผ๋ก ์ธ๋ถ ์์กด์ฑ ์์ด ๋์ํฉ๋๋ค.
8
9Implements a complete secure API server with password hashing (PBKDF2),
10JWT authentication, RBAC, input validation, security headers, rate limiting,
11and structured logging. Uses only Python stdlib (http.server).
12"""
13
14import hashlib
15import hmac
16import json
17import os
18import re
19import secrets
20import time
21import base64
22import logging
23import sys
24from datetime import datetime, timedelta, timezone
25from http.server import HTTPServer, BaseHTTPRequestHandler
26from urllib.parse import urlparse, parse_qs
27from functools import wraps
28
29
30# =============================================================================
31# Configuration (์ค์ )
32# =============================================================================
33
34JWT_SECRET = secrets.token_bytes(32)
35JWT_ALGORITHM = "HS256"
36JWT_EXPIRY_MINUTES = 30
37BCRYPT_ITERATIONS = 100_000
38SERVER_PORT = 0 # Assigned dynamically for demo
39
40# Structured logger
41logging.basicConfig(
42 level=logging.INFO,
43 format='%(asctime)s | %(levelname)-5s | %(message)s',
44 datefmt='%Y-%m-%d %H:%M:%S',
45 stream=sys.stdout,
46)
47logger = logging.getLogger("secure_api")
48
49
50# =============================================================================
51# 1. Password Hashing (ํจ์ค์๋ ํด์ฑ - PBKDF2)
52# =============================================================================
53
54class PasswordHasher:
55 """Hash and verify passwords using PBKDF2-HMAC-SHA256."""
56
57 ITERATIONS = BCRYPT_ITERATIONS
58 SALT_LENGTH = 16
59 KEY_LENGTH = 32
60
61 @staticmethod
62 def hash_password(password: str) -> str:
63 """Hash a password, returning 'salt$hash' as hex strings."""
64 salt = os.urandom(PasswordHasher.SALT_LENGTH)
65 dk = hashlib.pbkdf2_hmac(
66 "sha256", password.encode(), salt,
67 PasswordHasher.ITERATIONS, dklen=PasswordHasher.KEY_LENGTH,
68 )
69 return salt.hex() + "$" + dk.hex()
70
71 @staticmethod
72 def verify_password(password: str, stored: str) -> bool:
73 """Verify a password against a stored hash."""
74 try:
75 salt_hex, hash_hex = stored.split("$")
76 salt = bytes.fromhex(salt_hex)
77 expected = bytes.fromhex(hash_hex)
78 dk = hashlib.pbkdf2_hmac(
79 "sha256", password.encode(), salt,
80 PasswordHasher.ITERATIONS, dklen=PasswordHasher.KEY_LENGTH,
81 )
82 return hmac.compare_digest(dk, expected)
83 except (ValueError, AttributeError):
84 return False
85
86
87# =============================================================================
88# 2. JWT Implementation (JWT ๊ตฌํ)
89# =============================================================================
90
91class JWT:
92 """Minimal JWT (HS256) implementation using stdlib only."""
93
94 @staticmethod
95 def _b64url_encode(data: bytes) -> str:
96 return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
97
98 @staticmethod
99 def _b64url_decode(s: str) -> bytes:
100 padding = 4 - len(s) % 4
101 if padding != 4:
102 s += "=" * padding
103 return base64.urlsafe_b64decode(s)
104
105 @staticmethod
106 def encode(payload: dict, secret: bytes) -> str:
107 """Create a JWT token."""
108 header = {"alg": "HS256", "typ": "JWT"}
109 header_b64 = JWT._b64url_encode(json.dumps(header).encode())
110 payload_b64 = JWT._b64url_encode(json.dumps(payload).encode())
111
112 message = f"{header_b64}.{payload_b64}".encode()
113 signature = hmac.new(secret, message, hashlib.sha256).digest()
114 sig_b64 = JWT._b64url_encode(signature)
115
116 return f"{header_b64}.{payload_b64}.{sig_b64}"
117
118 @staticmethod
119 def decode(token: str, secret: bytes) -> dict | None:
120 """Decode and verify a JWT token. Returns None if invalid."""
121 try:
122 parts = token.split(".")
123 if len(parts) != 3:
124 return None
125
126 header_b64, payload_b64, sig_b64 = parts
127
128 # Verify signature
129 message = f"{header_b64}.{payload_b64}".encode()
130 expected_sig = hmac.new(secret, message, hashlib.sha256).digest()
131 actual_sig = JWT._b64url_decode(sig_b64)
132
133 if not hmac.compare_digest(expected_sig, actual_sig):
134 return None
135
136 # Decode payload
137 payload = json.loads(JWT._b64url_decode(payload_b64))
138
139 # Check expiration
140 if "exp" in payload:
141 exp_time = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
142 if datetime.now(timezone.utc) > exp_time:
143 return None
144
145 return payload
146 except Exception:
147 return None
148
149
150# =============================================================================
151# 3. RBAC - Role-Based Access Control (์ญํ ๊ธฐ๋ฐ ์ ๊ทผ ์ ์ด)
152# =============================================================================
153
154class RBAC:
155 """Simple role-based access control system."""
156
157 # Role hierarchy: admin > editor > viewer
158 PERMISSIONS = {
159 "admin": {"read", "write", "delete", "admin"},
160 "editor": {"read", "write"},
161 "viewer": {"read"},
162 }
163
164 @staticmethod
165 def has_permission(role: str, required_permission: str) -> bool:
166 perms = RBAC.PERMISSIONS.get(role, set())
167 return required_permission in perms
168
169 @staticmethod
170 def require_permission(role: str, permission: str) -> tuple[bool, str]:
171 if RBAC.has_permission(role, permission):
172 return True, f"Access granted: role '{role}' has '{permission}'"
173 return False, f"Access denied: role '{role}' lacks '{permission}'"
174
175
176# =============================================================================
177# 4. Input Validation (์
๋ ฅ ๊ฒ์ฆ)
178# =============================================================================
179
180class Validator:
181 """Input validation for API requests."""
182
183 @staticmethod
184 def validate_username(value: str) -> tuple[bool, str]:
185 if not value or not re.match(r'^[a-zA-Z0-9_]{3,30}$', value):
186 return False, "Username must be 3-30 alphanumeric/underscore characters"
187 return True, ""
188
189 @staticmethod
190 def validate_password(value: str) -> tuple[bool, str]:
191 if len(value) < 8:
192 return False, "Password must be at least 8 characters"
193 if not re.search(r'[A-Z]', value):
194 return False, "Password must contain an uppercase letter"
195 if not re.search(r'[0-9]', value):
196 return False, "Password must contain a digit"
197 return True, ""
198
199 @staticmethod
200 def validate_email(value: str) -> tuple[bool, str]:
201 if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', value):
202 return False, "Invalid email format"
203 return True, ""
204
205 @staticmethod
206 def sanitize(value: str, max_len: int = 200) -> str:
207 value = value[:max_len]
208 value = re.sub(r'[\x00-\x1f\x7f]', '', value)
209 return value
210
211
212# =============================================================================
213# 5. Rate Limiter (๋ ์ดํธ ๋ฆฌ๋ฏธํฐ)
214# =============================================================================
215
216class RateLimiter:
217 """Per-IP sliding window rate limiter."""
218
219 def __init__(self, max_requests: int = 10, window_seconds: int = 60):
220 self.max_requests = max_requests
221 self.window = window_seconds
222 self.requests: dict[str, list[float]] = {}
223
224 def is_allowed(self, client_ip: str) -> bool:
225 now = time.monotonic()
226 if client_ip not in self.requests:
227 self.requests[client_ip] = []
228
229 # Remove expired entries
230 self.requests[client_ip] = [
231 t for t in self.requests[client_ip] if now - t < self.window
232 ]
233
234 if len(self.requests[client_ip]) >= self.max_requests:
235 return False
236
237 self.requests[client_ip].append(now)
238 return True
239
240 def remaining(self, client_ip: str) -> int:
241 now = time.monotonic()
242 recent = [t for t in self.requests.get(client_ip, []) if now - t < self.window]
243 return max(0, self.max_requests - len(recent))
244
245
246# =============================================================================
247# 6. Security Headers (๋ณด์ ํค๋)
248# =============================================================================
249
250SECURITY_HEADERS = {
251 "X-Content-Type-Options": "nosniff",
252 "X-Frame-Options": "DENY",
253 "X-XSS-Protection": "0", # Modern browsers: use CSP instead
254 "Content-Security-Policy": "default-src 'none'; frame-ancestors 'none'",
255 "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
256 "Referrer-Policy": "strict-origin-when-cross-origin",
257 "Cache-Control": "no-store",
258 "Permissions-Policy": "camera=(), microphone=(), geolocation=()",
259}
260
261
262# =============================================================================
263# 7. In-Memory User Store (๋ฉ๋ชจ๋ฆฌ ๊ธฐ๋ฐ ์ฌ์ฉ์ ์ ์ฅ์)
264# =============================================================================
265
266class UserStore:
267 """Simple in-memory user database for demo purposes."""
268
269 def __init__(self):
270 self.users: dict[str, dict] = {}
271 self.hasher = PasswordHasher()
272
273 def register(self, username: str, password: str, email: str,
274 role: str = "viewer") -> tuple[bool, str]:
275 if username in self.users:
276 return False, "Username already exists"
277
278 ok, msg = Validator.validate_username(username)
279 if not ok:
280 return False, msg
281 ok, msg = Validator.validate_password(password)
282 if not ok:
283 return False, msg
284 ok, msg = Validator.validate_email(email)
285 if not ok:
286 return False, msg
287
288 self.users[username] = {
289 "password_hash": self.hasher.hash_password(password),
290 "email": email,
291 "role": role,
292 "created": datetime.now(timezone.utc).isoformat(),
293 }
294 return True, "User registered successfully"
295
296 def authenticate(self, username: str, password: str) -> dict | None:
297 user = self.users.get(username)
298 if not user:
299 # Constant-time comparison to prevent timing attacks
300 self.hasher.verify_password(password, "0" * 32 + "$" + "0" * 64)
301 return None
302 if self.hasher.verify_password(password, user["password_hash"]):
303 return {"username": username, "role": user["role"], "email": user["email"]}
304 return None
305
306 def get_user(self, username: str) -> dict | None:
307 user = self.users.get(username)
308 if user:
309 return {"username": username, "role": user["role"], "email": user["email"]}
310 return None
311
312
313# =============================================================================
314# 8. Structured Logging (๊ตฌ์กฐํ ๋ก๊น
)
315# =============================================================================
316
317class SecurityLogger:
318 """Structured logging for security events."""
319
320 @staticmethod
321 def log_auth_attempt(username: str, success: bool, ip: str):
322 status = "SUCCESS" if success else "FAILURE"
323 logger.info(f"AUTH | {status} | user={username} ip={ip}")
324
325 @staticmethod
326 def log_access(method: str, path: str, ip: str, status: int, user: str = "anonymous"):
327 logger.info(f"ACCESS | {method} {path} | status={status} user={user} ip={ip}")
328
329 @staticmethod
330 def log_rate_limit(ip: str, path: str):
331 logger.warning(f"RATE_LIMIT | ip={ip} path={path}")
332
333 @staticmethod
334 def log_validation_error(field: str, error: str, ip: str):
335 logger.warning(f"VALIDATION | field={field} error={error} ip={ip}")
336
337
338# =============================================================================
339# 9. API Request Handler (API ์์ฒญ ํธ๋ค๋ฌ)
340# =============================================================================
341
342# Global state for the handler
343user_store = UserStore()
344rate_limiter = RateLimiter(max_requests=20, window_seconds=60)
345sec_logger = SecurityLogger()
346
347
348class SecureAPIHandler(BaseHTTPRequestHandler):
349 """HTTP request handler with security features."""
350
351 def _get_client_ip(self) -> str:
352 return self.client_address[0] if self.client_address else "unknown"
353
354 def _send_json(self, status: int, data: dict):
355 self.send_response(status)
356 self.send_header("Content-Type", "application/json")
357 for key, value in SECURITY_HEADERS.items():
358 self.send_header(key, value)
359 remaining = rate_limiter.remaining(self._get_client_ip())
360 self.send_header("X-RateLimit-Remaining", str(remaining))
361 self.end_headers()
362 self.wfile.write(json.dumps(data).encode())
363
364 def _read_body(self) -> dict:
365 length = int(self.headers.get("Content-Length", 0))
366 if length > 10_000: # Max 10KB body
367 return {}
368 try:
369 return json.loads(self.rfile.read(length)) if length > 0 else {}
370 except (json.JSONDecodeError, UnicodeDecodeError):
371 return {}
372
373 def _get_current_user(self) -> dict | None:
374 auth = self.headers.get("Authorization", "")
375 if not auth.startswith("Bearer "):
376 return None
377 token = auth[7:]
378 payload = JWT.decode(token, JWT_SECRET)
379 if payload and "sub" in payload:
380 return user_store.get_user(payload["sub"])
381 return None
382
383 def _check_rate_limit(self) -> bool:
384 ip = self._get_client_ip()
385 if not rate_limiter.is_allowed(ip):
386 sec_logger.log_rate_limit(ip, self.path)
387 self._send_json(429, {"error": "Too many requests", "retry_after": 60})
388 return False
389 return True
390
391 # --- Routes ---
392
393 def do_GET(self):
394 if not self._check_rate_limit():
395 return
396 ip = self._get_client_ip()
397
398 parsed = urlparse(self.path)
399 path = parsed.path
400
401 if path == "/api/health":
402 self._handle_health()
403 elif path == "/api/profile":
404 self._handle_profile()
405 elif path == "/api/users":
406 self._handle_list_users()
407 else:
408 self._send_json(404, {"error": "Not found"})
409 sec_logger.log_access("GET", path, ip, 404)
410
411 def do_POST(self):
412 if not self._check_rate_limit():
413 return
414 ip = self._get_client_ip()
415
416 parsed = urlparse(self.path)
417 path = parsed.path
418
419 if path == "/api/register":
420 self._handle_register()
421 elif path == "/api/login":
422 self._handle_login()
423 elif path == "/api/data":
424 self._handle_create_data()
425 else:
426 self._send_json(404, {"error": "Not found"})
427 sec_logger.log_access("POST", path, ip, 404)
428
429 def do_DELETE(self):
430 if not self._check_rate_limit():
431 return
432 ip = self._get_client_ip()
433
434 parsed = urlparse(self.path)
435 path = parsed.path
436
437 if path.startswith("/api/users/"):
438 self._handle_delete_user(path)
439 else:
440 self._send_json(404, {"error": "Not found"})
441 sec_logger.log_access("DELETE", path, ip, 404)
442
443 # --- Handlers ---
444
445 def _handle_health(self):
446 self._send_json(200, {"status": "healthy", "timestamp": datetime.now(timezone.utc).isoformat()})
447
448 def _handle_register(self):
449 ip = self._get_client_ip()
450 body = self._read_body()
451
452 username = Validator.sanitize(body.get("username", ""))
453 password = body.get("password", "")
454 email = Validator.sanitize(body.get("email", ""))
455
456 ok, msg = user_store.register(username, password, email)
457 if ok:
458 sec_logger.log_access("POST", "/api/register", ip, 201, username)
459 self._send_json(201, {"message": msg, "username": username})
460 else:
461 sec_logger.log_validation_error("registration", msg, ip)
462 self._send_json(400, {"error": msg})
463
464 def _handle_login(self):
465 ip = self._get_client_ip()
466 body = self._read_body()
467
468 username = body.get("username", "")
469 password = body.get("password", "")
470
471 user = user_store.authenticate(username, password)
472 if user:
473 payload = {
474 "sub": username,
475 "role": user["role"],
476 "exp": int((datetime.now(timezone.utc) + timedelta(minutes=JWT_EXPIRY_MINUTES)).timestamp()),
477 "iat": int(datetime.now(timezone.utc).timestamp()),
478 }
479 token = JWT.encode(payload, JWT_SECRET)
480 sec_logger.log_auth_attempt(username, True, ip)
481 self._send_json(200, {"token": token, "expires_in": JWT_EXPIRY_MINUTES * 60})
482 else:
483 sec_logger.log_auth_attempt(username, False, ip)
484 self._send_json(401, {"error": "Invalid credentials"})
485
486 def _handle_profile(self):
487 ip = self._get_client_ip()
488 user = self._get_current_user()
489 if not user:
490 self._send_json(401, {"error": "Authentication required"})
491 return
492 sec_logger.log_access("GET", "/api/profile", ip, 200, user["username"])
493 self._send_json(200, {"user": user})
494
495 def _handle_list_users(self):
496 ip = self._get_client_ip()
497 user = self._get_current_user()
498 if not user:
499 self._send_json(401, {"error": "Authentication required"})
500 return
501
502 ok, msg = RBAC.require_permission(user["role"], "admin")
503 if not ok:
504 sec_logger.log_access("GET", "/api/users", ip, 403, user["username"])
505 self._send_json(403, {"error": msg})
506 return
507
508 users = [user_store.get_user(u) for u in user_store.users]
509 sec_logger.log_access("GET", "/api/users", ip, 200, user["username"])
510 self._send_json(200, {"users": users})
511
512 def _handle_create_data(self):
513 ip = self._get_client_ip()
514 user = self._get_current_user()
515 if not user:
516 self._send_json(401, {"error": "Authentication required"})
517 return
518
519 ok, msg = RBAC.require_permission(user["role"], "write")
520 if not ok:
521 sec_logger.log_access("POST", "/api/data", ip, 403, user["username"])
522 self._send_json(403, {"error": msg})
523 return
524
525 body = self._read_body()
526 sec_logger.log_access("POST", "/api/data", ip, 201, user["username"])
527 self._send_json(201, {"message": "Data created", "data": body})
528
529 def _handle_delete_user(self, path: str):
530 ip = self._get_client_ip()
531 user = self._get_current_user()
532 if not user:
533 self._send_json(401, {"error": "Authentication required"})
534 return
535
536 ok, msg = RBAC.require_permission(user["role"], "admin")
537 if not ok:
538 sec_logger.log_access("DELETE", path, ip, 403, user["username"])
539 self._send_json(403, {"error": msg})
540 return
541
542 target = path.split("/")[-1]
543 if target in user_store.users:
544 del user_store.users[target]
545 sec_logger.log_access("DELETE", path, ip, 200, user["username"])
546 self._send_json(200, {"message": f"User '{target}' deleted"})
547 else:
548 self._send_json(404, {"error": "User not found"})
549
550 def log_message(self, format, *args):
551 """Suppress default HTTP server logging (we use our own)."""
552 pass
553
554
555# =============================================================================
556# 10. Demo: Simulate API Interactions (๋ฐ๋ชจ: API ์ํธ์์ฉ ์๋ฎฌ๋ ์ด์
)
557# =============================================================================
558
559def run_demo():
560 """Demonstrate all security features without starting a real server."""
561 print("=" * 60)
562 print(" Secure API Server Demo (Simulation)")
563 print(" ๋ณด์ API ์๋ฒ ๋ฐ๋ชจ (์๋ฎฌ๋ ์ด์
)")
564 print("=" * 60)
565
566 # --- Password Hashing ---
567 print("\n" + "=" * 60)
568 print("1. Password Hashing (PBKDF2-HMAC-SHA256)")
569 print("=" * 60)
570
571 hasher = PasswordHasher()
572 pw = "MyStr0ng!Pass"
573 hashed = hasher.hash_password(pw)
574 print(f"\n Password: {pw}")
575 print(f" Hash: {hashed[:20]}...{hashed[-20:]}")
576 print(f" Verify correct: {hasher.verify_password(pw, hashed)}")
577 print(f" Verify wrong: {hasher.verify_password('wrong', hashed)}")
578
579 # --- JWT ---
580 print("\n" + "=" * 60)
581 print("2. JWT Token (HS256)")
582 print("=" * 60)
583
584 payload = {
585 "sub": "alice",
586 "role": "admin",
587 "exp": int((datetime.now(timezone.utc) + timedelta(hours=1)).timestamp()),
588 "iat": int(datetime.now(timezone.utc).timestamp()),
589 }
590 token = JWT.encode(payload, JWT_SECRET)
591 print(f"\n Payload: {json.dumps(payload, indent=2)}")
592 print(f" Token: {token[:40]}...{token[-20:]}")
593
594 decoded = JWT.decode(token, JWT_SECRET)
595 print(f" Decode valid token: sub={decoded['sub']}, role={decoded['role']}")
596
597 bad_decode = JWT.decode(token + "tampered", JWT_SECRET)
598 print(f" Decode tampered: {bad_decode}")
599
600 expired_payload = {**payload, "exp": int((datetime.now(timezone.utc) - timedelta(hours=1)).timestamp())}
601 expired_token = JWT.encode(expired_payload, JWT_SECRET)
602 exp_decode = JWT.decode(expired_token, JWT_SECRET)
603 print(f" Decode expired: {exp_decode}")
604
605 # --- RBAC ---
606 print("\n" + "=" * 60)
607 print("3. Role-Based Access Control")
608 print("=" * 60)
609
610 roles = ["admin", "editor", "viewer"]
611 perms = ["read", "write", "delete", "admin"]
612 print(f"\n {'Role':<10} | {'read':>5} | {'write':>5} | {'delete':>6} | {'admin':>5}")
613 print(f" {'-'*10}-+-{'-'*5}-+-{'-'*5}-+-{'-'*6}-+-{'-'*5}")
614 for role in roles:
615 row = f" {role:<10}"
616 for perm in perms:
617 allowed = RBAC.has_permission(role, perm)
618 row += f" | {'YES':>{len(perm)}}" if allowed else f" | {'-':>{len(perm)}}"
619 print(row)
620
621 # --- User Registration & Auth Flow ---
622 print("\n" + "=" * 60)
623 print("4. Registration & Authentication Flow")
624 print("=" * 60)
625
626 store = UserStore()
627
628 # Register users
629 users_to_register = [
630 ("alice", "Str0ng!Admin", "alice@example.com", "admin"),
631 ("bob", "B0bEdit0r!", "bob@example.com", "editor"),
632 ("charlie", "Ch@rlie99", "charlie@example.com", "viewer"),
633 ]
634
635 print("\n Registration:")
636 for uname, pw, email, role in users_to_register:
637 ok, msg = store.register(uname, pw, email, role)
638 print(f" {uname:10s} ({role:6s}): {msg}")
639
640 # Duplicate registration
641 ok, msg = store.register("alice", "Another!1", "a@b.com")
642 print(f" {'alice':10s} (dup) : {msg}")
643
644 # Weak password
645 ok, msg = store.register("weak_user", "short", "w@b.com")
646 print(f" {'weak_user':10s} (weak) : {msg}")
647
648 # Authentication
649 print("\n Authentication:")
650 for uname, pw in [("alice", "Str0ng!Admin"), ("bob", "WrongPass!"), ("nonexist", "any")]:
651 user = store.authenticate(uname, pw)
652 status = f"role={user['role']}" if user else "FAILED"
653 print(f" {uname:10s}: {status}")
654
655 # --- Input Validation ---
656 print("\n" + "=" * 60)
657 print("5. Input Validation")
658 print("=" * 60)
659
660 test_inputs = [
661 ("username", "valid_user", Validator.validate_username),
662 ("username", "ab", Validator.validate_username),
663 ("password", "Str0ng!Pass", Validator.validate_password),
664 ("password", "weak", Validator.validate_password),
665 ("email", "user@example.com", Validator.validate_email),
666 ("email", "not-an-email", Validator.validate_email),
667 ]
668
669 print()
670 for field, value, fn in test_inputs:
671 ok, msg = fn(value)
672 status = "OK" if ok else f"FAIL: {msg}"
673 print(f" {field:10s} = {value:20s} -> {status}")
674
675 # Sanitization
676 dangerous = '<script>alert("xss")</script>'
677 print(f"\n Sanitize: '{dangerous[:30]}...' -> '{Validator.sanitize(dangerous)[:40]}...'")
678
679 # --- Security Headers ---
680 print("\n" + "=" * 60)
681 print("6. Security Response Headers")
682 print("=" * 60)
683
684 print()
685 for header, value in SECURITY_HEADERS.items():
686 print(f" {header}: {value}")
687
688 # --- Rate Limiting ---
689 print("\n" + "=" * 60)
690 print("7. Rate Limiting")
691 print("=" * 60)
692
693 rl = RateLimiter(max_requests=5, window_seconds=60)
694 print(f"\n Config: max=5 requests per 60 seconds\n")
695 for i in range(1, 8):
696 allowed = rl.is_allowed("192.168.1.1")
697 remaining = rl.remaining("192.168.1.1")
698 status = "ALLOWED" if allowed else "RATE LIMITED"
699 print(f" Request {i}: {status} (remaining: {remaining})")
700
701 # --- Structured Logging ---
702 print("\n" + "=" * 60)
703 print("8. Structured Security Logging")
704 print("=" * 60)
705 print()
706
707 sec_logger.log_auth_attempt("alice", True, "10.0.0.1")
708 sec_logger.log_auth_attempt("hacker", False, "10.0.0.99")
709 sec_logger.log_access("GET", "/api/profile", "10.0.0.1", 200, "alice")
710 sec_logger.log_access("DELETE", "/api/users/bob", "10.0.0.99", 403, "hacker")
711 sec_logger.log_rate_limit("10.0.0.99", "/api/login")
712 sec_logger.log_validation_error("password", "too short", "10.0.0.50")
713
714 # --- Full Flow Summary ---
715 print("\n" + "=" * 60)
716 print("9. Complete API Flow Summary")
717 print("=" * 60)
718
719 print("""
720 Typical secure API request lifecycle:
721
722 1. Client sends request
723 -> Rate limiter checks per-IP request count
724 2. If POST /api/register:
725 -> Validate username, password strength, email format
726 -> Hash password with PBKDF2 (100k iterations)
727 -> Store user with role
728 3. If POST /api/login:
729 -> Authenticate with constant-time comparison
730 -> Issue JWT token (HS256, 30 min expiry)
731 -> Log authentication attempt
732 4. For protected endpoints:
733 -> Extract & verify JWT from Authorization header
734 -> Check RBAC permissions for user's role
735 -> Process request if authorized
736 5. Response:
737 -> Add security headers (CSP, HSTS, X-Frame-Options, etc.)
738 -> Include rate limit remaining count
739 -> Return JSON response
740 6. Logging:
741 -> Structured logs for all auth events
742 -> Access logs with user, IP, status code
743 -> Rate limit violation warnings
744""")
745
746 print("=" * 60)
747 print(" Server code ready at: examples/Security/15_secure_api/app.py")
748 print(" To run as actual server: python app.py --serve")
749 print("=" * 60)
750
751
752# =============================================================================
753# 11. Optional: Run as Actual HTTP Server
754# =============================================================================
755
756def run_server(port: int = 8443):
757 """Start the actual HTTP server (for manual testing)."""
758 # Seed with demo users
759 user_store.register("admin", "Adm1n!Pass", "admin@example.com", "admin")
760 user_store.register("editor", "Ed1tor!Pass", "editor@example.com", "editor")
761 user_store.register("viewer", "V1ewer!Pass", "viewer@example.com", "viewer")
762
763 server = HTTPServer(("127.0.0.1", port), SecureAPIHandler)
764 print(f"\n Secure API server running on http://127.0.0.1:{port}")
765 print(f" Demo users: admin/Adm1n!Pass, editor/Ed1tor!Pass, viewer/V1ewer!Pass")
766 print(f" Press Ctrl+C to stop.\n")
767 print(f" Try: curl -X POST http://127.0.0.1:{port}/api/login \\")
768 print(f' -H "Content-Type: application/json" \\')
769 print(f" -d '{{\"username\":\"admin\",\"password\":\"Adm1n!Pass\"}}'\n")
770
771 try:
772 server.serve_forever()
773 except KeyboardInterrupt:
774 print("\n Server stopped.")
775 server.server_close()
776
777
778# =============================================================================
779# Main
780# =============================================================================
781
782if __name__ == "__main__":
783 if "--serve" in sys.argv:
784 port = 8443
785 for i, arg in enumerate(sys.argv):
786 if arg == "--port" and i + 1 < len(sys.argv):
787 port = int(sys.argv[i + 1])
788 run_server(port)
789 else:
790 run_demo()