1"""
2API 보안 기법 데모
3API Security Techniques Demo
4
5토큰 버킷 레이트 리미터, 입력 검증, CORS 헤더, API 키 해싱,
6HMAC 기반 요청 서명 등 API 보안 핵심 기법을 구현합니다.
7
8Demonstrates token bucket rate limiting, input validation, CORS headers,
9API key hashing/verification, and HMAC-based request signing.
10"""
11
12import hashlib
13import hmac
14import re
15import time
16import secrets
17import json
18import base64
19from datetime import datetime, timedelta, timezone
20
21
22# =============================================================================
23# 1. Token Bucket Rate Limiter (토큰 버킷 레이트 리미터)
24# =============================================================================
25
26class TokenBucketRateLimiter:
27 """
28 Token bucket algorithm for rate limiting.
29 Tokens are added at a fixed rate. Each request consumes one token.
30 If no tokens are available, the request is rejected.
31 """
32
33 def __init__(self, capacity: int, refill_rate: float):
34 """
35 Args:
36 capacity: Maximum number of tokens the bucket can hold
37 refill_rate: Tokens added per second
38 """
39 self.capacity = capacity
40 self.refill_rate = refill_rate
41 self.tokens = capacity
42 self.last_refill = time.monotonic()
43
44 def _refill(self):
45 """Add tokens based on elapsed time."""
46 now = time.monotonic()
47 elapsed = now - self.last_refill
48 new_tokens = elapsed * self.refill_rate
49 self.tokens = min(self.capacity, self.tokens + new_tokens)
50 self.last_refill = now
51
52 def allow_request(self) -> bool:
53 """Check if a request is allowed and consume a token."""
54 self._refill()
55 if self.tokens >= 1:
56 self.tokens -= 1
57 return True
58 return False
59
60 def wait_time(self) -> float:
61 """Estimated seconds until a token is available."""
62 self._refill()
63 if self.tokens >= 1:
64 return 0.0
65 return (1 - self.tokens) / self.refill_rate
66
67
68def demo_rate_limiter():
69 print("=" * 60)
70 print("1. Token Bucket Rate Limiter")
71 print("=" * 60)
72
73 limiter = TokenBucketRateLimiter(capacity=5, refill_rate=2.0)
74
75 print(f" Bucket capacity: 5 tokens, refill rate: 2 tokens/sec\n")
76
77 # Burst: send 7 requests rapidly
78 for i in range(1, 8):
79 allowed = limiter.allow_request()
80 status = "ALLOWED" if allowed else "REJECTED"
81 print(f" Request {i}: {status} (tokens left: {limiter.tokens:.1f})")
82
83 # Wait for refill
84 print("\n Waiting 1.5 seconds for token refill...")
85 time.sleep(1.5)
86
87 for i in range(8, 12):
88 allowed = limiter.allow_request()
89 status = "ALLOWED" if allowed else "REJECTED"
90 print(f" Request {i}: {status} (tokens left: {limiter.tokens:.1f})")
91
92 print()
93
94
95# =============================================================================
96# 2. Input Validation Functions (입력 검증 함수)
97# =============================================================================
98
99class InputValidator:
100 """Collection of input validation methods for API endpoints."""
101
102 # Email: simplified RFC 5322 pattern
103 EMAIL_REGEX = re.compile(
104 r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
105 )
106
107 # Username: alphanumeric + underscore, 3-30 chars
108 USERNAME_REGEX = re.compile(r'^[a-zA-Z0-9_]{3,30}$')
109
110 @staticmethod
111 def validate_email(email: str) -> tuple[bool, str]:
112 if not email or len(email) > 254:
113 return False, "Email is empty or exceeds 254 characters"
114 if not InputValidator.EMAIL_REGEX.match(email):
115 return False, "Invalid email format"
116 return True, "Valid"
117
118 @staticmethod
119 def validate_username(username: str) -> tuple[bool, str]:
120 if not username:
121 return False, "Username is empty"
122 if not InputValidator.USERNAME_REGEX.match(username):
123 return False, "Username must be 3-30 alphanumeric/underscore characters"
124 return True, "Valid"
125
126 @staticmethod
127 def validate_password_strength(password: str) -> tuple[bool, str]:
128 issues = []
129 if len(password) < 8:
130 issues.append("at least 8 characters")
131 if not re.search(r'[A-Z]', password):
132 issues.append("an uppercase letter")
133 if not re.search(r'[a-z]', password):
134 issues.append("a lowercase letter")
135 if not re.search(r'[0-9]', password):
136 issues.append("a digit")
137 if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
138 issues.append("a special character")
139 if issues:
140 return False, "Password needs: " + ", ".join(issues)
141 return True, "Strong password"
142
143 @staticmethod
144 def sanitize_string(value: str, max_length: int = 200) -> str:
145 """Remove potentially dangerous characters and truncate."""
146 value = value[:max_length]
147 # Remove null bytes and control characters
148 value = re.sub(r'[\x00-\x1f\x7f]', '', value)
149 # Escape HTML special characters
150 value = (value.replace('&', '&').replace('<', '<')
151 .replace('>', '>').replace('"', '"')
152 .replace("'", '''))
153 return value
154
155
156def demo_input_validation():
157 print("=" * 60)
158 print("2. Input Validation")
159 print("=" * 60)
160
161 validator = InputValidator()
162
163 # Email validation
164 emails = ["user@example.com", "bad-email", "a@b.c", "test@domain.co.uk"]
165 print("\n Email Validation:")
166 for email in emails:
167 valid, msg = validator.validate_email(email)
168 symbol = "OK" if valid else "FAIL"
169 print(f" [{symbol}] {email:30s} -> {msg}")
170
171 # Username validation
172 usernames = ["alice_01", "ab", "valid_user", "no spaces!", "x" * 31]
173 print("\n Username Validation:")
174 for uname in usernames:
175 display = uname if len(uname) <= 20 else uname[:17] + "..."
176 valid, msg = validator.validate_username(uname)
177 symbol = "OK" if valid else "FAIL"
178 print(f" [{symbol}] {display:25s} -> {msg}")
179
180 # Password strength
181 passwords = ["short", "alllowercase1!", "NoDigits!here", "Str0ng!Pass"]
182 print("\n Password Strength:")
183 for pw in passwords:
184 valid, msg = validator.validate_password_strength(pw)
185 symbol = "OK" if valid else "FAIL"
186 print(f" [{symbol}] {pw:25s} -> {msg}")
187
188 # Sanitization
189 dangerous = '<script>alert("xss")</script>'
190 sanitized = validator.sanitize_string(dangerous)
191 print(f"\n Sanitization:")
192 print(f" Input: {dangerous}")
193 print(f" Sanitized: {sanitized}")
194 print()
195
196
197# =============================================================================
198# 3. CORS Header Generation (CORS 헤더 생성)
199# =============================================================================
200
201class CORSPolicy:
202 """Generate CORS (Cross-Origin Resource Sharing) response headers."""
203
204 def __init__(self, allowed_origins: list[str], allowed_methods: list[str],
205 allowed_headers: list[str], max_age: int = 3600):
206 self.allowed_origins = set(allowed_origins)
207 self.allowed_methods = allowed_methods
208 self.allowed_headers = allowed_headers
209 self.max_age = max_age
210
211 def get_headers(self, request_origin: str) -> dict[str, str]:
212 """Generate CORS headers for a given request origin."""
213 headers = {}
214
215 if request_origin in self.allowed_origins or "*" in self.allowed_origins:
216 headers["Access-Control-Allow-Origin"] = request_origin
217 headers["Access-Control-Allow-Methods"] = ", ".join(self.allowed_methods)
218 headers["Access-Control-Allow-Headers"] = ", ".join(self.allowed_headers)
219 headers["Access-Control-Max-Age"] = str(self.max_age)
220 headers["Vary"] = "Origin"
221 # If origin not allowed, return empty headers (browser will block)
222
223 return headers
224
225
226def demo_cors():
227 print("=" * 60)
228 print("3. CORS Header Generation")
229 print("=" * 60)
230
231 cors = CORSPolicy(
232 allowed_origins=["https://myapp.com", "https://admin.myapp.com"],
233 allowed_methods=["GET", "POST", "PUT", "DELETE"],
234 allowed_headers=["Content-Type", "Authorization"],
235 max_age=7200,
236 )
237
238 origins = ["https://myapp.com", "https://evil.com", "https://admin.myapp.com"]
239 for origin in origins:
240 headers = cors.get_headers(origin)
241 status = "ALLOWED" if headers else "BLOCKED"
242 print(f"\n Origin: {origin} [{status}]")
243 if headers:
244 for k, v in headers.items():
245 print(f" {k}: {v}")
246 else:
247 print(f" (no CORS headers -> browser will block request)")
248 print()
249
250
251# =============================================================================
252# 4. API Key Hashing and Verification (API 키 해싱 및 검증)
253# =============================================================================
254
255class APIKeyManager:
256 """
257 Generate and verify API keys using salted SHA-256 hashing.
258 Only the hash is stored; the raw key is shown once at creation.
259 """
260
261 def __init__(self):
262 self.key_store: dict[str, dict] = {} # key_id -> {hash, salt, created}
263
264 def generate_key(self, owner: str) -> tuple[str, str]:
265 """Generate a new API key. Returns (key_id, raw_key)."""
266 key_id = f"key_{secrets.token_hex(4)}"
267 raw_key = secrets.token_urlsafe(32)
268 salt = secrets.token_bytes(16)
269
270 key_hash = hashlib.sha256(salt + raw_key.encode()).hexdigest()
271
272 self.key_store[key_id] = {
273 "hash": key_hash,
274 "salt": salt,
275 "owner": owner,
276 "created": datetime.now(timezone.utc).isoformat(),
277 }
278 return key_id, raw_key
279
280 def verify_key(self, key_id: str, raw_key: str) -> bool:
281 """Verify a raw API key against stored hash."""
282 if key_id not in self.key_store:
283 return False
284 entry = self.key_store[key_id]
285 computed = hashlib.sha256(entry["salt"] + raw_key.encode()).hexdigest()
286 return hmac.compare_digest(computed, entry["hash"])
287
288
289def demo_api_key():
290 print("=" * 60)
291 print("4. API Key Hashing & Verification")
292 print("=" * 60)
293
294 mgr = APIKeyManager()
295
296 key_id, raw_key = mgr.generate_key("alice")
297 print(f"\n Generated key for 'alice':")
298 print(f" Key ID: {key_id}")
299 print(f" Raw Key: {raw_key[:12]}... (show once, then discard)")
300 print(f" Stored hash: {mgr.key_store[key_id]['hash'][:24]}...")
301
302 # Verify correct key
303 result = mgr.verify_key(key_id, raw_key)
304 print(f"\n Verify with correct key: {'PASS' if result else 'FAIL'}")
305
306 # Verify wrong key
307 result = mgr.verify_key(key_id, "wrong-key-value")
308 print(f" Verify with wrong key: {'PASS' if result else 'FAIL'}")
309
310 # Verify non-existent key ID
311 result = mgr.verify_key("key_nonexistent", raw_key)
312 print(f" Verify non-existent ID: {'PASS' if result else 'FAIL'}")
313 print()
314
315
316# =============================================================================
317# 5. HMAC-Based Request Signing (HMAC 기반 요청 서명)
318# =============================================================================
319
320class RequestSigner:
321 """
322 Sign and verify API requests using HMAC-SHA256.
323 Prevents request tampering and replay attacks.
324 """
325
326 def __init__(self, secret_key: bytes):
327 self.secret_key = secret_key
328 self.replay_window = 300 # 5 minutes
329
330 def sign_request(self, method: str, path: str, body: str,
331 timestamp: str | None = None) -> dict[str, str]:
332 """Create signature headers for an outgoing request."""
333 if timestamp is None:
334 timestamp = datetime.now(timezone.utc).isoformat()
335
336 # Canonical string: method + path + body + timestamp
337 canonical = f"{method}\n{path}\n{body}\n{timestamp}"
338
339 signature = hmac.new(
340 self.secret_key, canonical.encode(), hashlib.sha256
341 ).hexdigest()
342
343 return {
344 "X-Timestamp": timestamp,
345 "X-Signature": signature,
346 }
347
348 def verify_request(self, method: str, path: str, body: str,
349 timestamp: str, signature: str) -> tuple[bool, str]:
350 """Verify an incoming signed request."""
351 # Check replay window
352 try:
353 req_time = datetime.fromisoformat(timestamp)
354 now = datetime.now(timezone.utc)
355 age = abs((now - req_time).total_seconds())
356 if age > self.replay_window:
357 return False, f"Request expired ({age:.0f}s old, max {self.replay_window}s)"
358 except ValueError:
359 return False, "Invalid timestamp format"
360
361 # Recompute signature
362 canonical = f"{method}\n{path}\n{body}\n{timestamp}"
363 expected = hmac.new(
364 self.secret_key, canonical.encode(), hashlib.sha256
365 ).hexdigest()
366
367 if hmac.compare_digest(signature, expected):
368 return True, "Signature valid"
369 return False, "Signature mismatch"
370
371
372def demo_request_signing():
373 print("=" * 60)
374 print("5. HMAC-Based Request Signing")
375 print("=" * 60)
376
377 secret = secrets.token_bytes(32)
378 signer = RequestSigner(secret)
379
380 method, path = "POST", "/api/v1/orders"
381 body = json.dumps({"item": "widget", "qty": 3})
382
383 # Sign request
384 headers = signer.sign_request(method, path, body)
385 print(f"\n Request: {method} {path}")
386 print(f" Body: {body}")
387 print(f" Signature headers:")
388 for k, v in headers.items():
389 display = v if len(v) <= 40 else v[:37] + "..."
390 print(f" {k}: {display}")
391
392 # Verify valid request
393 ok, msg = signer.verify_request(
394 method, path, body, headers["X-Timestamp"], headers["X-Signature"]
395 )
396 print(f"\n Verify original request: [{msg}]")
397
398 # Tampered body
399 tampered_body = json.dumps({"item": "widget", "qty": 999})
400 ok, msg = signer.verify_request(
401 method, path, tampered_body, headers["X-Timestamp"], headers["X-Signature"]
402 )
403 print(f" Verify tampered body: [{msg}]")
404
405 # Expired timestamp
406 old_time = (datetime.now(timezone.utc) - timedelta(minutes=10)).isoformat()
407 old_headers = signer.sign_request(method, path, body, timestamp=old_time)
408 ok, msg = signer.verify_request(
409 method, path, body, old_time, old_headers["X-Signature"]
410 )
411 print(f" Verify expired request: [{msg}]")
412 print()
413
414
415# =============================================================================
416# Main
417# =============================================================================
418
419if __name__ == "__main__":
420 print("\n" + "=" * 60)
421 print(" API Security Techniques Demo")
422 print(" API 보안 기법 데모")
423 print("=" * 60 + "\n")
424
425 demo_rate_limiter()
426 demo_input_validation()
427 demo_cors()
428 demo_api_key()
429 demo_request_signing()
430
431 print("=" * 60)
432 print(" Demo complete. All examples use stdlib only.")
433 print("=" * 60)