api_security_demo.py

Download
python 434 lines 14.8 KB
  1"""
  2API 보안 기법 데모
  3API Security Techniques Demo
  4
  5토큰 버킷 레이트 리미터, 입력 검증, CORS 헤더, API 키 해싱,
  6HMAC 기반 요청 서명 등 API 보안 핵심 기법을 구현합니다.
  7
  8Demonstrates token bucket rate limiting, input validation, CORS headers,
  9API key hashing/verification, and HMAC-based request signing.
 10"""
 11
 12import hashlib
 13import hmac
 14import re
 15import time
 16import secrets
 17import json
 18import base64
 19from datetime import datetime, timedelta, timezone
 20
 21
 22# =============================================================================
 23# 1. Token Bucket Rate Limiter (토큰 버킷 레이트 리미터)
 24# =============================================================================
 25
 26class TokenBucketRateLimiter:
 27    """
 28    Token bucket algorithm for rate limiting.
 29    Tokens are added at a fixed rate. Each request consumes one token.
 30    If no tokens are available, the request is rejected.
 31    """
 32
 33    def __init__(self, capacity: int, refill_rate: float):
 34        """
 35        Args:
 36            capacity: Maximum number of tokens the bucket can hold
 37            refill_rate: Tokens added per second
 38        """
 39        self.capacity = capacity
 40        self.refill_rate = refill_rate
 41        self.tokens = capacity
 42        self.last_refill = time.monotonic()
 43
 44    def _refill(self):
 45        """Add tokens based on elapsed time."""
 46        now = time.monotonic()
 47        elapsed = now - self.last_refill
 48        new_tokens = elapsed * self.refill_rate
 49        self.tokens = min(self.capacity, self.tokens + new_tokens)
 50        self.last_refill = now
 51
 52    def allow_request(self) -> bool:
 53        """Check if a request is allowed and consume a token."""
 54        self._refill()
 55        if self.tokens >= 1:
 56            self.tokens -= 1
 57            return True
 58        return False
 59
 60    def wait_time(self) -> float:
 61        """Estimated seconds until a token is available."""
 62        self._refill()
 63        if self.tokens >= 1:
 64            return 0.0
 65        return (1 - self.tokens) / self.refill_rate
 66
 67
 68def demo_rate_limiter():
 69    print("=" * 60)
 70    print("1. Token Bucket Rate Limiter")
 71    print("=" * 60)
 72
 73    limiter = TokenBucketRateLimiter(capacity=5, refill_rate=2.0)
 74
 75    print(f"  Bucket capacity: 5 tokens, refill rate: 2 tokens/sec\n")
 76
 77    # Burst: send 7 requests rapidly
 78    for i in range(1, 8):
 79        allowed = limiter.allow_request()
 80        status = "ALLOWED" if allowed else "REJECTED"
 81        print(f"  Request {i}: {status}  (tokens left: {limiter.tokens:.1f})")
 82
 83    # Wait for refill
 84    print("\n  Waiting 1.5 seconds for token refill...")
 85    time.sleep(1.5)
 86
 87    for i in range(8, 12):
 88        allowed = limiter.allow_request()
 89        status = "ALLOWED" if allowed else "REJECTED"
 90        print(f"  Request {i}: {status}  (tokens left: {limiter.tokens:.1f})")
 91
 92    print()
 93
 94
 95# =============================================================================
 96# 2. Input Validation Functions (입력 검증 함수)
 97# =============================================================================
 98
 99class InputValidator:
100    """Collection of input validation methods for API endpoints."""
101
102    # Email: simplified RFC 5322 pattern
103    EMAIL_REGEX = re.compile(
104        r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
105    )
106
107    # Username: alphanumeric + underscore, 3-30 chars
108    USERNAME_REGEX = re.compile(r'^[a-zA-Z0-9_]{3,30}$')
109
110    @staticmethod
111    def validate_email(email: str) -> tuple[bool, str]:
112        if not email or len(email) > 254:
113            return False, "Email is empty or exceeds 254 characters"
114        if not InputValidator.EMAIL_REGEX.match(email):
115            return False, "Invalid email format"
116        return True, "Valid"
117
118    @staticmethod
119    def validate_username(username: str) -> tuple[bool, str]:
120        if not username:
121            return False, "Username is empty"
122        if not InputValidator.USERNAME_REGEX.match(username):
123            return False, "Username must be 3-30 alphanumeric/underscore characters"
124        return True, "Valid"
125
126    @staticmethod
127    def validate_password_strength(password: str) -> tuple[bool, str]:
128        issues = []
129        if len(password) < 8:
130            issues.append("at least 8 characters")
131        if not re.search(r'[A-Z]', password):
132            issues.append("an uppercase letter")
133        if not re.search(r'[a-z]', password):
134            issues.append("a lowercase letter")
135        if not re.search(r'[0-9]', password):
136            issues.append("a digit")
137        if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
138            issues.append("a special character")
139        if issues:
140            return False, "Password needs: " + ", ".join(issues)
141        return True, "Strong password"
142
143    @staticmethod
144    def sanitize_string(value: str, max_length: int = 200) -> str:
145        """Remove potentially dangerous characters and truncate."""
146        value = value[:max_length]
147        # Remove null bytes and control characters
148        value = re.sub(r'[\x00-\x1f\x7f]', '', value)
149        # Escape HTML special characters
150        value = (value.replace('&', '&amp;').replace('<', '&lt;')
151                 .replace('>', '&gt;').replace('"', '&quot;')
152                 .replace("'", '&#x27;'))
153        return value
154
155
156def demo_input_validation():
157    print("=" * 60)
158    print("2. Input Validation")
159    print("=" * 60)
160
161    validator = InputValidator()
162
163    # Email validation
164    emails = ["user@example.com", "bad-email", "a@b.c", "test@domain.co.uk"]
165    print("\n  Email Validation:")
166    for email in emails:
167        valid, msg = validator.validate_email(email)
168        symbol = "OK" if valid else "FAIL"
169        print(f"    [{symbol}] {email:30s} -> {msg}")
170
171    # Username validation
172    usernames = ["alice_01", "ab", "valid_user", "no spaces!", "x" * 31]
173    print("\n  Username Validation:")
174    for uname in usernames:
175        display = uname if len(uname) <= 20 else uname[:17] + "..."
176        valid, msg = validator.validate_username(uname)
177        symbol = "OK" if valid else "FAIL"
178        print(f"    [{symbol}] {display:25s} -> {msg}")
179
180    # Password strength
181    passwords = ["short", "alllowercase1!", "NoDigits!here", "Str0ng!Pass"]
182    print("\n  Password Strength:")
183    for pw in passwords:
184        valid, msg = validator.validate_password_strength(pw)
185        symbol = "OK" if valid else "FAIL"
186        print(f"    [{symbol}] {pw:25s} -> {msg}")
187
188    # Sanitization
189    dangerous = '<script>alert("xss")</script>'
190    sanitized = validator.sanitize_string(dangerous)
191    print(f"\n  Sanitization:")
192    print(f"    Input:     {dangerous}")
193    print(f"    Sanitized: {sanitized}")
194    print()
195
196
197# =============================================================================
198# 3. CORS Header Generation (CORS 헤더 생성)
199# =============================================================================
200
201class CORSPolicy:
202    """Generate CORS (Cross-Origin Resource Sharing) response headers."""
203
204    def __init__(self, allowed_origins: list[str], allowed_methods: list[str],
205                 allowed_headers: list[str], max_age: int = 3600):
206        self.allowed_origins = set(allowed_origins)
207        self.allowed_methods = allowed_methods
208        self.allowed_headers = allowed_headers
209        self.max_age = max_age
210
211    def get_headers(self, request_origin: str) -> dict[str, str]:
212        """Generate CORS headers for a given request origin."""
213        headers = {}
214
215        if request_origin in self.allowed_origins or "*" in self.allowed_origins:
216            headers["Access-Control-Allow-Origin"] = request_origin
217            headers["Access-Control-Allow-Methods"] = ", ".join(self.allowed_methods)
218            headers["Access-Control-Allow-Headers"] = ", ".join(self.allowed_headers)
219            headers["Access-Control-Max-Age"] = str(self.max_age)
220            headers["Vary"] = "Origin"
221        # If origin not allowed, return empty headers (browser will block)
222
223        return headers
224
225
226def demo_cors():
227    print("=" * 60)
228    print("3. CORS Header Generation")
229    print("=" * 60)
230
231    cors = CORSPolicy(
232        allowed_origins=["https://myapp.com", "https://admin.myapp.com"],
233        allowed_methods=["GET", "POST", "PUT", "DELETE"],
234        allowed_headers=["Content-Type", "Authorization"],
235        max_age=7200,
236    )
237
238    origins = ["https://myapp.com", "https://evil.com", "https://admin.myapp.com"]
239    for origin in origins:
240        headers = cors.get_headers(origin)
241        status = "ALLOWED" if headers else "BLOCKED"
242        print(f"\n  Origin: {origin}  [{status}]")
243        if headers:
244            for k, v in headers.items():
245                print(f"    {k}: {v}")
246        else:
247            print(f"    (no CORS headers -> browser will block request)")
248    print()
249
250
251# =============================================================================
252# 4. API Key Hashing and Verification (API 키 해싱 및 검증)
253# =============================================================================
254
255class APIKeyManager:
256    """
257    Generate and verify API keys using salted SHA-256 hashing.
258    Only the hash is stored; the raw key is shown once at creation.
259    """
260
261    def __init__(self):
262        self.key_store: dict[str, dict] = {}  # key_id -> {hash, salt, created}
263
264    def generate_key(self, owner: str) -> tuple[str, str]:
265        """Generate a new API key. Returns (key_id, raw_key)."""
266        key_id = f"key_{secrets.token_hex(4)}"
267        raw_key = secrets.token_urlsafe(32)
268        salt = secrets.token_bytes(16)
269
270        key_hash = hashlib.sha256(salt + raw_key.encode()).hexdigest()
271
272        self.key_store[key_id] = {
273            "hash": key_hash,
274            "salt": salt,
275            "owner": owner,
276            "created": datetime.now(timezone.utc).isoformat(),
277        }
278        return key_id, raw_key
279
280    def verify_key(self, key_id: str, raw_key: str) -> bool:
281        """Verify a raw API key against stored hash."""
282        if key_id not in self.key_store:
283            return False
284        entry = self.key_store[key_id]
285        computed = hashlib.sha256(entry["salt"] + raw_key.encode()).hexdigest()
286        return hmac.compare_digest(computed, entry["hash"])
287
288
289def demo_api_key():
290    print("=" * 60)
291    print("4. API Key Hashing & Verification")
292    print("=" * 60)
293
294    mgr = APIKeyManager()
295
296    key_id, raw_key = mgr.generate_key("alice")
297    print(f"\n  Generated key for 'alice':")
298    print(f"    Key ID:  {key_id}")
299    print(f"    Raw Key: {raw_key[:12]}... (show once, then discard)")
300    print(f"    Stored hash: {mgr.key_store[key_id]['hash'][:24]}...")
301
302    # Verify correct key
303    result = mgr.verify_key(key_id, raw_key)
304    print(f"\n  Verify with correct key:  {'PASS' if result else 'FAIL'}")
305
306    # Verify wrong key
307    result = mgr.verify_key(key_id, "wrong-key-value")
308    print(f"  Verify with wrong key:    {'PASS' if result else 'FAIL'}")
309
310    # Verify non-existent key ID
311    result = mgr.verify_key("key_nonexistent", raw_key)
312    print(f"  Verify non-existent ID:   {'PASS' if result else 'FAIL'}")
313    print()
314
315
316# =============================================================================
317# 5. HMAC-Based Request Signing (HMAC 기반 요청 서명)
318# =============================================================================
319
320class RequestSigner:
321    """
322    Sign and verify API requests using HMAC-SHA256.
323    Prevents request tampering and replay attacks.
324    """
325
326    def __init__(self, secret_key: bytes):
327        self.secret_key = secret_key
328        self.replay_window = 300  # 5 minutes
329
330    def sign_request(self, method: str, path: str, body: str,
331                     timestamp: str | None = None) -> dict[str, str]:
332        """Create signature headers for an outgoing request."""
333        if timestamp is None:
334            timestamp = datetime.now(timezone.utc).isoformat()
335
336        # Canonical string: method + path + body + timestamp
337        canonical = f"{method}\n{path}\n{body}\n{timestamp}"
338
339        signature = hmac.new(
340            self.secret_key, canonical.encode(), hashlib.sha256
341        ).hexdigest()
342
343        return {
344            "X-Timestamp": timestamp,
345            "X-Signature": signature,
346        }
347
348    def verify_request(self, method: str, path: str, body: str,
349                       timestamp: str, signature: str) -> tuple[bool, str]:
350        """Verify an incoming signed request."""
351        # Check replay window
352        try:
353            req_time = datetime.fromisoformat(timestamp)
354            now = datetime.now(timezone.utc)
355            age = abs((now - req_time).total_seconds())
356            if age > self.replay_window:
357                return False, f"Request expired ({age:.0f}s old, max {self.replay_window}s)"
358        except ValueError:
359            return False, "Invalid timestamp format"
360
361        # Recompute signature
362        canonical = f"{method}\n{path}\n{body}\n{timestamp}"
363        expected = hmac.new(
364            self.secret_key, canonical.encode(), hashlib.sha256
365        ).hexdigest()
366
367        if hmac.compare_digest(signature, expected):
368            return True, "Signature valid"
369        return False, "Signature mismatch"
370
371
372def demo_request_signing():
373    print("=" * 60)
374    print("5. HMAC-Based Request Signing")
375    print("=" * 60)
376
377    secret = secrets.token_bytes(32)
378    signer = RequestSigner(secret)
379
380    method, path = "POST", "/api/v1/orders"
381    body = json.dumps({"item": "widget", "qty": 3})
382
383    # Sign request
384    headers = signer.sign_request(method, path, body)
385    print(f"\n  Request: {method} {path}")
386    print(f"  Body: {body}")
387    print(f"  Signature headers:")
388    for k, v in headers.items():
389        display = v if len(v) <= 40 else v[:37] + "..."
390        print(f"    {k}: {display}")
391
392    # Verify valid request
393    ok, msg = signer.verify_request(
394        method, path, body, headers["X-Timestamp"], headers["X-Signature"]
395    )
396    print(f"\n  Verify original request:  [{msg}]")
397
398    # Tampered body
399    tampered_body = json.dumps({"item": "widget", "qty": 999})
400    ok, msg = signer.verify_request(
401        method, path, tampered_body, headers["X-Timestamp"], headers["X-Signature"]
402    )
403    print(f"  Verify tampered body:     [{msg}]")
404
405    # Expired timestamp
406    old_time = (datetime.now(timezone.utc) - timedelta(minutes=10)).isoformat()
407    old_headers = signer.sign_request(method, path, body, timestamp=old_time)
408    ok, msg = signer.verify_request(
409        method, path, body, old_time, old_headers["X-Signature"]
410    )
411    print(f"  Verify expired request:   [{msg}]")
412    print()
413
414
415# =============================================================================
416# Main
417# =============================================================================
418
419if __name__ == "__main__":
420    print("\n" + "=" * 60)
421    print("  API Security Techniques Demo")
422    print("  API 보안 기법 데모")
423    print("=" * 60 + "\n")
424
425    demo_rate_limiter()
426    demo_input_validation()
427    demo_cors()
428    demo_api_key()
429    demo_request_signing()
430
431    print("=" * 60)
432    print("  Demo complete. All examples use stdlib only.")
433    print("=" * 60)