API ๋ณด์
API ๋ณด์¶
์ด์ : 09_Web_Security_Headers.md | ๋ค์: 11_Secrets_Management.md
API๋ ํ๋ ์ํํธ์จ์ด ์์คํ ์ ํต์ฌ์ ๋๋ค. ๋ง์ดํฌ๋ก์๋น์ค๋ฅผ ์ฐ๊ฒฐํ๊ณ , ๋ชจ๋ฐ์ผ ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ตฌ๋ํ๋ฉฐ, ํ์ฌ ํตํฉ์ ๊ฐ๋ฅํ๊ฒ ํฉ๋๋ค. API๋ ์ ํ๋ฆฌ์ผ์ด์ ๋ก์ง๊ณผ ๋ฐ์ดํฐ๋ฅผ ์ง์ ๋ ธ์ถํ๊ธฐ ๋๋ฌธ์ ๊ณต๊ฒฉ์์ ์ฃผ์ ํ์ ์ ๋๋ค. ๋จ์ผ ์๋ํฌ์ธํธ์ ์๋ชป๋ ๊ตฌ์ฑ์ผ๋ก ์๋ฐฑ๋ง ๊ฐ์ ๋ ์ฝ๋๊ฐ ๋ ธ์ถ๋ ์ ์์ต๋๋ค. ์ด ๋ ์จ์ API๋ฅผ ๊ตฌ์ถ, ๋ฐฐํฌ ๋ฐ ์ ์งํ๊ธฐ ์ํ ํ์ ๋ณด์ ๊ดํ์ ๋ค๋ฃน๋๋ค โ ์ธ์ฆ ๋ฐ ์๋ ์ ํ๋ถํฐ ์ ๋ ฅ ๊ฒ์ฆ ๋ฐ CORS ๊ตฌ์ฑ๊น์ง.
ํ์ต ๋ชฉํ¶
- API ํค, OAuth 2.0 ๋ฐ JWT๋ฅผ ์ฌ์ฉํ ๊ฐ๋ ฅํ API ์ธ์ฆ ๊ตฌํ
- ๋จ์ฉ์ ๋ฐฉ์งํ๊ธฐ ์ํ ์๋ ์ ํ ์ ๋ต ์ค๊ณ ๋ฐ ๋ฐฐํฌ
- ์ฃผ์ ๊ณต๊ฒฉ์ ๋ฐฉ์งํ๊ธฐ ์ํ ๋ชจ๋ API ์ ๋ ฅ ๊ฒ์ฆ ๋ฐ ์ ์
- cross-origin ์ ๊ทผ์ ์ ์ดํ๊ธฐ ์ํ CORS ์ฌ๋ฐ๋ฅด๊ฒ ๊ตฌ์ฑ
- ์ผ๋ฐ์ ์ธ ๊ณต๊ฒฉ ํจํด์ผ๋ก๋ถํฐ GraphQL ์๋ํฌ์ธํธ ๋ณดํธ
- OpenAPI/Swagger ๋ช ์ธ์์ ๋ณด์ ์คํด ์ ์
- ํ๋ก๋์ ํ๊ฒฝ์ ์ํ API ๊ฒ์ดํธ์จ์ด ๋ณด์ ํจํด ์ ์ฉ
1. API ์ํ ํ๊ฒฝ¶
1.1 OWASP API ๋ณด์ Top 10 (2023)¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ OWASP API ๋ณด์ Top 10 (2023) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ API1 BOLA (Broken Object Level Authorization) โ
โ ๋ค๋ฅธ ์ฌ์ฉ์์ ๋ฆฌ์์ค ์ ๊ทผ โ
โ GET /api/users/OTHER_USER_ID/orders โ
โ โ
โ API2 Broken Authentication โ
โ ์ฝํ ์ธ์ฆ ๋ฉ์ปค๋์ฆ, ํ ํฐ ์ ์ถ โ
โ โ
โ API3 Broken Object Property Level Authorization โ
โ ๋๋ ํ ๋น, ๋ฏผ๊ฐํ ์์ฑ ๋
ธ์ถ โ
โ โ
โ API4 Unrestricted Resource Consumption โ
โ ์๋ ์ ํ ์์, ํฐ ํ์ด๋ก๋, ๋น์ฉ์ด ๋ง์ด ๋๋ ์ฟผ๋ฆฌ โ
โ โ
โ API5 Broken Function Level Authorization โ
โ ์ผ๋ฐ ์ฌ์ฉ์๋ก์ ๊ด๋ฆฌ ๊ธฐ๋ฅ ์ ๊ทผ โ
โ POST /api/admin/users/delete โ
โ โ
โ API6 Unrestricted Access to Sensitive Business Flows โ
โ ๋น์ฆ๋์ค ๊ธฐ๋ฅ์ ์๋ํ๋ ๋จ์ฉ โ
โ โ
โ API7 Server-Side Request Forgery (SSRF) โ
โ ๊ฒ์ฆ ์์ด ์ฌ์ฉ์ ์
๋ ฅ์์ URL ๊ฐ์ ธ์ค๊ธฐ โ
โ โ
โ API8 Security Misconfiguration โ
โ ํค๋ ๋๋ฝ, ์์ธํ ์ค๋ฅ, ๊ธฐ๋ณธ ์๊ฒฉ ์ฆ๋ช
โ
โ โ
โ API9 Improper Inventory Management โ
โ ๊ด๋ฆฌ๋์ง ์๋ API ๋ฒ์ , ์๋์ฐ API โ
โ โ
โ API10 Unsafe Consumption of APIs โ
โ ํ์ฌ API ์๋ต์ ๋งน๋ชฉ์ ์ผ๋ก ์ ๋ขฐ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
1.2 API ๊ณต๊ฒฉ ํ๋ฉด¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ API ๊ณต๊ฒฉ ํ๋ฉด โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ Client โโโโโถโ Network โโโโโถโ API โโโโโถโ Database โ โ
โ โ โ โ โ โ Server โ โ โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ
โ ๊ฐ ๊ณ์ธต์ ๊ณต๊ฒฉ ๋ฒกํฐ: โ
โ โ
โ Client: ๋ณ์กฐ๋ ์์ฒญ, ๋๋๋นํ ํ ํฐ, ์ฌ์ ๊ณต๊ฒฉ โ
โ Network: ์ค๊ฐ์ ๊ณต๊ฒฉ, ๋์ฒญ, DNS ํ์ด์ฌํน โ
โ API Server: ์ฃผ์
, ์ธ์ฆ ์์, BOLA, ๋๋ ํ ๋น โ
โ Database: SQL ์ฃผ์
, ๋ฐ์ดํฐ ์ ์ถ, ๋ฌด๋จ ์ ๊ทผ โ
โ โ
โ ํฌ๊ด์ ์ธ ์ฐ๋ ค ์ฌํญ: โ
โ โโโ ์ธ์ฆ (๋น์ ์ ๋๊ตฌ์ธ๊ฐ?) โ
โ โโโ ์ธ๊ฐ (๋ฌด์์ ์ ๊ทผํ ์ ์๋?) โ
โ โโโ ์
๋ ฅ ๊ฒ์ฆ (์ด ์์ฒญ์ ์์ ํ๊ฐ?) โ
โ โโโ ์๋ ์ ํ (API๋ฅผ ๋จ์ฉํ๊ณ ์๋?) โ
โ โโโ ์ํธํ (์ ์ก ์ค ๋ฐ์ดํฐ๊ฐ ๋ณดํธ๋๋๊ฐ?) โ
โ โโโ ๋ก๊น
/๋ชจ๋ํฐ๋ง (๊ณต๊ฒฉ์ ๊ฐ์งํ ์ ์๋?) โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
2. API ์ธ์ฆ ํจํด¶
2.1 API ํค¶
"""
API ํค ์ธ์ฆ โ ๋จ์ํ์ง๋ง ์ ํ์ .
์๋ฒ ๊ฐ ํต์ ๋ฐ ์ฌ์ฉ ์ถ์ ์ด ์๋ ๊ณต๊ฐ API์ ์ ํฉ.
"""
import secrets
import hashlib
from flask import Flask, request, jsonify, abort
from functools import wraps
from datetime import datetime
app = Flask(__name__)
# โโ API ํค ์์ฑ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def generate_api_key() -> tuple[str, str]:
"""API ํค์ ์ ์ฅ์ฉ ํด์๋ฅผ ์์ฑ."""
# ์ํธํ์ ์ผ๋ก ์์ ํ ํค ์์ฑ
# ์ ๋์ฌ๋ ํค ์ ํ๊ณผ ๋ฒ์ ์ ์๋ณํ๋ ๋ฐ ๋์
raw_key = f"sk_live_{secrets.token_urlsafe(32)}"
# ์์ ํค๋ ์ ๋ ์ ์ฅํ์ง ๋ง๊ณ ํด์๋ง ์ ์ฅ
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
return raw_key, key_hash
# raw_key: sk_live_Abc123... (ํด๋ผ์ด์ธํธ์ ํ ๋ฒ ์ ์ก, ์ ์ฅ ์ ํจ)
# key_hash: e3b0c4... (๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ์ฅ)
# โโ ์๋ฎฌ๋ ์ด์
๋ ํค ์ ์ฅ์ (ํ๋ก๋์
์์๋ ์ค์ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฌ์ฉ) โโโโ
API_KEYS = {
# ํด์ -> ๋ฉํ๋ฐ์ดํฐ
hashlib.sha256(b"sk_live_test_key_12345").hexdigest(): {
"client_id": "client_001",
"name": "Test Client",
"rate_limit": 100, # ๋ถ๋น ์์ฒญ ์
"scopes": ["read", "write"],
"created_at": "2025-01-01",
"last_used": None,
}
}
def require_api_key(f):
"""์ ํจํ API ํค๋ฅผ ์๊ตฌํ๋ ๋ฐ์ฝ๋ ์ดํฐ."""
@wraps(f)
def decorated(*args, **kwargs):
# API ํค๋ฅผ ์ฌ๋ฌ ์์น์์ ํ์ธ
api_key = (
request.headers.get('X-API-Key') or
request.headers.get('Authorization', '').replace('Bearer ', '') or
request.args.get('api_key') # ๋ ์์ , ๊ฐ๋ฅํ๋ฉด ํผํ๊ธฐ
)
if not api_key:
return jsonify({
"error": "missing_api_key",
"message": "API ํค๊ฐ ํ์ํฉ๋๋ค. "
"X-API-Key ํค๋์ ์ ๋ฌํ์ธ์."
}), 401
# ์ ๊ณต๋ ํค๋ฅผ ํด์ํ๊ณ ์กฐํ
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
key_data = API_KEYS.get(key_hash)
if not key_data:
return jsonify({
"error": "invalid_api_key",
"message": "์ ๊ณต๋ API ํค๊ฐ ์ ํจํ์ง ์์ต๋๋ค."
}), 401
# ๋ง์ง๋ง ์ฌ์ฉ ํ์์คํฌํ ์
๋ฐ์ดํธ
key_data["last_used"] = datetime.utcnow().isoformat()
# ์์ฒญ ์ปจํ
์คํธ์ ํค ๋ฉํ๋ฐ์ดํฐ ์ ์ฅ
request.api_client = key_data
return f(*args, **kwargs)
return decorated
@app.route('/api/data')
@require_api_key
def get_data():
"""API ํค๊ฐ ํ์ํ ๋ณดํธ๋ ์๋ํฌ์ธํธ."""
client = request.api_client
return jsonify({
"client": client["name"],
"data": [1, 2, 3]
})
# โโ API ํค ๋ณด์ ๋ชจ๋ฒ ์ฌ๋ก โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
"""
1. ์ ์ก:
- ํญ์ HTTPS ์ฌ์ฉ
- ์ฟผ๋ฆฌ ๋งค๊ฐ๋ณ์๋ณด๋ค ํค๋ ์ ํธ (์ฟผ๋ฆฌ ๋งค๊ฐ๋ณ์๋ ๋ก๊ทธ์ ๋ํ๋จ)
- X-API-Key ํค๋ ๋๋ Authorization: Bearer <key> ์ฌ์ฉ
2. ์ ์ฅ:
- ์ ์ฅํ๊ธฐ ์ ์ ํค ํด์ (์ต์ SHA-256)
- ์ ์ฒด API ํค๋ฅผ ๋ก๊ทธ์ ๋จ๊ธฐ์ง ๋ง ๊ฒ
- UI์๋ ๋ง์ง๋ง 4์๋ง ํ์: sk_live_****5678
3. ์ํ:
- ํด๋ผ์ด์ธํธ๋น ์ฌ๋ฌ ํ์ฑ ํค ์ง์
- ๋ค์ดํ์ ์์ด ํค ์ํ ํ์ฉ
- ํค์ ๋ง๋ฃ์ผ ์ค์
4. ๋ฒ์ ์ง์ :
- ๊ฐ ํค์ ๋ฒ์/๊ถํ ํ ๋น
- ์ฝ๊ธฐ ๋ ์ฐ๊ธฐ ์์
์ ๋ณ๋ ํค ์ฌ์ฉ
- ํ
์คํธ ๋ ํ๋ก๋์
์ ๋ณ๋ ํค ์ฌ์ฉ
5. ์ ํ ์ฌํญ:
- API ํค๋ ์ฌ์ฉ์๊ฐ ์๋ ์ ํ๋ฆฌ์ผ์ด์
์ ์๋ณ
- ๋ด์ฅ ๋ง๋ฃ๊ฐ ๋ถ์กฑ (ํ ํฐ๊ณผ ๋ฌ๋ฆฌ)
- ์์ฒญ๋ณ๋ก ์ฝ๊ฒ ๋ฒ์๋ฅผ ์ง์ ํ ์ ์์
- ์ฌ์ฉ์ ์ปจํ
์คํธ ์ธ๊ฐ์๋ OAuth 2.0 ์ฌ์ฉ
"""
2.2 OAuth 2.0¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ OAuth 2.0 Authorization Code ํ๋ฆ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ 1. ์ฌ์ฉ์๊ฐ "์ ๊ณต์๋ก ๋ก๊ทธ์ธ" ํด๋ฆญ โ
โ Client โโโโโโโโโโโโโโโโโโโโโโโโโโโถ Auth Server โ
โ GET /authorize?response_type=code โ
โ &client_id=CLIENT_ID โ
โ &redirect_uri=CALLBACK_URL โ
โ &scope=read+write โ
โ &state=RANDOM_STATE โ
โ โ
โ 2. ์ฌ์ฉ์ ์ธ์ฆ ๋ฐ ๋ฒ์ ์น์ธ โ
โ Auth Server โโโโโโโโโโโโโโโโโโโโโโถ Client Callback โ
โ GET /callback?code=AUTH_CODE&state=RANDOM_STATE โ
โ โ
โ 3. ํด๋ผ์ด์ธํธ๊ฐ ์ฝ๋๋ฅผ ํ ํฐ์ผ๋ก ๊ตํ โ
โ Client โโโโโโโโโโโโโโโโโโโโโโโโโโโถ Auth Server โ
โ POST /token โ
โ grant_type=authorization_code โ
โ &code=AUTH_CODE โ
โ &client_id=CLIENT_ID โ
โ &client_secret=CLIENT_SECRET โ
โ &redirect_uri=CALLBACK_URL โ
โ โ
โ 4. ์ธ์ฆ ์๋ฒ๊ฐ ํ ํฐ ๋ฐํ โ
โ Auth Server โโโโโโโโโโโโโโโโโโโโโโถ Client โ
โ { "access_token": "...", โ
โ "refresh_token": "...", โ
โ "token_type": "Bearer", โ
โ "expires_in": 3600 } โ
โ โ
โ 5. ํด๋ผ์ด์ธํธ๊ฐ ์ก์ธ์ค ํ ํฐ ์ฌ์ฉ โ
โ Client โโโโโโโโโโโโโโโโโโโโโโโโโโโถ Resource Server โ
โ Authorization: Bearer ACCESS_TOKEN โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
"""
Flask๋ฅผ ์ฌ์ฉํ OAuth 2.0 ๊ตฌํ (์๋ฒ ์ธก).
"""
import requests
import secrets
from flask import Flask, redirect, request, session, jsonify
from urllib.parse import urlencode
app = Flask(__name__)
app.secret_key = secrets.token_hex(32)
# OAuth 2.0 ๊ตฌ์ฑ (์: GitHub)
OAUTH_CONFIG = {
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"authorize_url": "https://github.com/login/oauth/authorize",
"token_url": "https://github.com/login/oauth/access_token",
"api_url": "https://api.github.com/user",
"redirect_uri": "http://localhost:5000/callback",
"scope": "read:user user:email",
}
@app.route('/login')
def login():
"""OAuth 2.0 Authorization Code ํ๋ฆ ์์."""
# CSRF๋ฅผ ๋ฐฉ์งํ๊ธฐ ์ํ state ๋งค๊ฐ๋ณ์ ์์ฑ
state = secrets.token_urlsafe(32)
session['oauth_state'] = state
# ์ธ๊ฐ URL ๊ตฌ์ถ
params = {
"client_id": OAUTH_CONFIG["client_id"],
"redirect_uri": OAUTH_CONFIG["redirect_uri"],
"scope": OAUTH_CONFIG["scope"],
"state": state,
"response_type": "code",
}
auth_url = f"{OAUTH_CONFIG['authorize_url']}?{urlencode(params)}"
return redirect(auth_url)
@app.route('/callback')
def callback():
"""OAuth 2.0 ์ฝ๋ฐฑ ์ฒ๋ฆฌ."""
# โโ state ๋งค๊ฐ๋ณ์ ํ์ธ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
state = request.args.get('state')
if state != session.pop('oauth_state', None):
return jsonify({"error": "์ ํจํ์ง ์์ state ๋งค๊ฐ๋ณ์"}), 400
# โโ ์ค๋ฅ ์๋ต ํ์ธ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
error = request.args.get('error')
if error:
return jsonify({
"error": error,
"description": request.args.get('error_description', '')
}), 400
# โโ ์ธ๊ฐ ์ฝ๋๋ฅผ ํ ํฐ์ผ๋ก ๊ตํ โโโโโโโโโโโโโโโโโโโโโโโโโโโ
code = request.args.get('code')
token_response = requests.post(
OAUTH_CONFIG["token_url"],
data={
"client_id": OAUTH_CONFIG["client_id"],
"client_secret": OAUTH_CONFIG["client_secret"],
"code": code,
"redirect_uri": OAUTH_CONFIG["redirect_uri"],
"grant_type": "authorization_code",
},
headers={"Accept": "application/json"},
timeout=10,
)
token_data = token_response.json()
if "error" in token_data:
return jsonify(token_data), 400
access_token = token_data["access_token"]
# โโ ์ฌ์ฉ์ ์ ๋ณด ๊ฐ์ ธ์ค๊ธฐ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
user_response = requests.get(
OAUTH_CONFIG["api_url"],
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
},
timeout=10,
)
user_data = user_response.json()
# ์ธ์
์ ์ ์ฅ (๋๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ์ฌ์ฉ์ ์์ฑ/์
๋ฐ์ดํธ)
session['user'] = {
"id": user_data["id"],
"name": user_data.get("name", user_data["login"]),
"email": user_data.get("email"),
}
return redirect('/dashboard')
# โโ OAuth 2.0 ๋ณด์ ์ฒดํฌ๋ฆฌ์คํธ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
"""
1. ํญ์ state ๋งค๊ฐ๋ณ์ ์ฌ์ฉ (CSRF ๋ฐฉ์ง)
2. redirect_uri๋ฅผ ์ ํํ ๊ฒ์ฆ (์คํ ๋ฆฌ๋๋ ํธ ๋ฐฉ์ง)
3. ์๋ฒ ์ฑ์ Authorization Code ํ๋ฆ ์ฌ์ฉ (Implicit ์๋)
4. ํ ํฐ์ ์์ ํ๊ฒ ์ ์ฅ (์ํธํ, ์๋ฒ ์ธก)
5. ๊ณต๊ฐ ํด๋ผ์ด์ธํธ(SPA, ๋ชจ๋ฐ์ผ ์ฑ)์ PKCE ์ฌ์ฉ
6. ๋ชจ๋ ์์ฒญ์์ ํ ํฐ ๋ฒ์ ๊ฒ์ฆ
7. ๋จ๊ธฐ ์ก์ธ์ค ํ ํฐ + ๋ฆฌํ๋ ์ ํ ํฐ ์ฌ์ฉ
8. ๋ก๊ทธ์์ ์ ํ ํฐ ํ๊ธฐ
"""
2.3 JWT (JSON Web Tokens)¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ JWT ๊ตฌ์กฐ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ eyJhbGci... . eyJzdWIi... . SflKxwRJ... โ
โ โโโโโโโโโ โโโโโโโโโ โโโโโโโโโโ โ
โ Header Payload Signature โ
โ โ
โ Header (base64url): โ
โ { โ
โ "alg": "RS256", โ
โ "typ": "JWT", โ
โ "kid": "key-id-123" โ
โ } โ
โ โ
โ Payload (base64url): โ
โ { โ
โ "sub": "user_123", // Subject (์ฌ์ฉ์ ID) โ
โ "iss": "api.example.com", // Issuer โ
โ "aud": "app.example.com", // Audience โ
โ "exp": 1700000000, // ๋ง๋ฃ ์๊ฐ โ
โ "iat": 1699996400, // ๋ฐ๊ธ ์๊ฐ โ
โ "nbf": 1699996400, // Not before โ
โ "jti": "unique-token-id", // JWT ID (ํ๊ธฐ์ฉ) โ
โ "scope": "read write", // ์ฌ์ฉ์ ์ ์ ํด๋ ์ โ
โ "role": "admin" โ
โ } โ
โ โ
โ Signature: โ
โ RSASHA256(base64url(header) + "." + base64url(payload), key) โ
โ โ
โ ์ค์: Payload๋ ์ํธํ๋์ง ์์ โ base64url ์ธ์ฝ๋ฉ๋ง ๋จ โ
โ ๋๊ตฌ๋ ์ฝ์ ์ ์์. JWT์ ๋น๋ฐ์ ์ ์ฅํ์ง ๋ง ๊ฒ. โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
"""
PyJWT๋ฅผ ์ฌ์ฉํ JWT ์ธ์ฆ โ ์์ ํ ๊ตฌํ.
"""
import jwt
import time
import uuid
from datetime import datetime, timedelta, timezone
from flask import Flask, request, jsonify
from functools import wraps
app = Flask(__name__)
# โโ ํค ๊ตฌ์ฑ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# ํ๋ก๋์
์๋ RS256 (๋น๋์นญ) ์ฌ์ฉ
# ๊ฐ์ธ ํค๋ ํ ํฐ์ ์๋ช
; ๊ณต๊ฐ ํค๋ ๊ฒ์ฆ
# ์ด๋ฅผ ํตํด ๋ง์ดํฌ๋ก์๋น์ค๊ฐ ๊ฐ์ธ ํค ์์ด ๊ฒ์ฆ ๊ฐ๋ฅ
# ์ด ์์ ์์๋ ๋จ์ํ๋ฅผ ์ํด HS256 (๋์นญ) ์ฌ์ฉ
JWT_SECRET = "your-256-bit-secret-change-this" # ํ๋ก๋์
์์ ํ๊ฒฝ ๋ณ์ ์ฌ์ฉ
JWT_ALGORITHM = "HS256"
JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=15)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
# โโ ํ ํฐ ๋ธ๋๋ฆฌ์คํธ (ํ๋ก๋์
์์ Redis ์ฌ์ฉ) โโโโโโโโโโโโโโโโโโโโ
revoked_tokens = set()
def create_access_token(user_id: str, role: str = "user",
scopes: list[str] = None) -> str:
"""๋จ๊ธฐ ์ก์ธ์ค ํ ํฐ ์์ฑ."""
now = datetime.now(timezone.utc)
payload = {
"sub": user_id,
"iss": "api.example.com",
"aud": "app.example.com",
"iat": now,
"exp": now + JWT_ACCESS_TOKEN_EXPIRES,
"nbf": now,
"jti": str(uuid.uuid4()), # ํ๊ธฐ๋ฅผ ์ํ ๊ณ ์ ID
"type": "access",
"role": role,
"scopes": scopes or ["read"],
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def create_refresh_token(user_id: str) -> str:
"""์ฅ๊ธฐ ๋ฆฌํ๋ ์ ํ ํฐ ์์ฑ."""
now = datetime.now(timezone.utc)
payload = {
"sub": user_id,
"iss": "api.example.com",
"iat": now,
"exp": now + JWT_REFRESH_TOKEN_EXPIRES,
"jti": str(uuid.uuid4()),
"type": "refresh",
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def decode_token(token: str) -> dict:
"""JWT ํ ํฐ ๋์ฝ๋ ๋ฐ ๊ฒ์ฆ."""
try:
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM],
issuer="api.example.com",
audience="app.example.com",
options={
"require": ["exp", "iat", "sub", "iss", "aud", "jti"],
"verify_exp": True,
"verify_iss": True,
"verify_aud": True,
"verify_nbf": True,
}
)
# ํ ํฐ์ด ํ๊ธฐ๋์๋์ง ํ์ธ
if payload["jti"] in revoked_tokens:
raise jwt.InvalidTokenError("ํ ํฐ์ด ํ๊ธฐ๋์์ต๋๋ค")
return payload
except jwt.ExpiredSignatureError:
raise ValueError("ํ ํฐ์ด ๋ง๋ฃ๋์์ต๋๋ค")
except jwt.InvalidAudienceError:
raise ValueError("์ ํจํ์ง ์์ ํ ํฐ ๋์")
except jwt.InvalidIssuerError:
raise ValueError("์ ํจํ์ง ์์ ํ ํฐ ๋ฐ๊ธ์")
except jwt.InvalidTokenError as e:
raise ValueError(f"์ ํจํ์ง ์์ ํ ํฐ: {e}")
def require_auth(scopes: list[str] = None):
"""์ ํ์ ๋ฒ์ ํ์ธ๊ณผ ํจ๊ป JWT ์ธ์ฆ์ ์๊ตฌํ๋ ๋ฐ์ฝ๋ ์ดํฐ."""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Authorization ํค๋์์ ํ ํฐ ์ถ์ถ
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({
"error": "missing_token",
"message": "Bearer ํ ํฐ์ด ์๋ Authorization ํค๋ ํ์"
}), 401
token = auth_header.split(' ', 1)[1]
try:
payload = decode_token(token)
except ValueError as e:
return jsonify({
"error": "invalid_token",
"message": str(e)
}), 401
# ํ ํฐ ์ ํ ํ์ธ
if payload.get("type") != "access":
return jsonify({
"error": "wrong_token_type",
"message": "์ก์ธ์ค ํ ํฐ ํ์"
}), 401
# ๋ฒ์ ํ์ธ
if scopes:
token_scopes = set(payload.get("scopes", []))
required_scopes = set(scopes)
if not required_scopes.issubset(token_scopes):
missing = required_scopes - token_scopes
return jsonify({
"error": "insufficient_scope",
"message": f"๋๋ฝ๋ ๋ฒ์: {', '.join(missing)}"
}), 403
request.user = payload
return f(*args, **kwargs)
return decorated
return decorator
# โโ ๋ผ์ฐํธ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
@app.route('/api/login', methods=['POST'])
def login():
"""์ธ์ฆํ๊ณ JWT ํ ํฐ ๋ฐํ."""
data = request.get_json()
username = data.get('username')
password = data.get('password')
# ์๊ฒฉ ์ฆ๋ช
๊ฒ์ฆ (bcrypt/argon2 ํด์ ๋น๊ต ์ฌ์ฉ)
# ๋จ์ํ๋จ โ ์์ธํ ๋ด์ฉ์ ์ธ์ฆ ๋ ์จ ์ฐธ์กฐ
user = authenticate_user(username, password)
if not user:
# ์ฌ์ฉ์ ์ด๊ฑฐ ๋ฐฉ์ง๋ฅผ ์ํ ์ผ๊ด๋ ํ์ด๋ฐ ์ฌ์ฉ
return jsonify({
"error": "invalid_credentials",
"message": "์ ํจํ์ง ์์ ์ฌ์ฉ์ ์ด๋ฆ ๋๋ ๋น๋ฐ๋ฒํธ"
}), 401
# ํ ํฐ ์ ์์ฑ
access_token = create_access_token(
user_id=user["id"],
role=user["role"],
scopes=user["scopes"]
)
refresh_token = create_refresh_token(user_id=user["id"])
return jsonify({
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": int(JWT_ACCESS_TOKEN_EXPIRES.total_seconds()),
})
@app.route('/api/refresh', methods=['POST'])
def refresh():
"""๋ฆฌํ๋ ์ ํ ํฐ์ ์ ์ก์ธ์ค ํ ํฐ์ผ๋ก ๊ตํ."""
data = request.get_json()
refresh_token = data.get('refresh_token')
if not refresh_token:
return jsonify({"error": "missing_refresh_token"}), 400
try:
# ๋์ ํ์ธ ์์ด ๋์ฝ๋ (๋ฆฌํ๋ ์ ํ ํฐ์ ๋ค๋ฅผ ์ ์์)
payload = jwt.decode(
refresh_token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM],
issuer="api.example.com",
options={"require": ["exp", "sub", "jti", "iss"]}
)
if payload.get("type") != "refresh":
raise ValueError("๋ฆฌํ๋ ์ ํ ํฐ์ด ์๋")
if payload["jti"] in revoked_tokens:
raise ValueError("๋ฆฌํ๋ ์ ํ ํฐ์ด ํ๊ธฐ๋์์ต๋๋ค")
except (jwt.InvalidTokenError, ValueError) as e:
return jsonify({"error": "invalid_refresh_token",
"message": str(e)}), 401
# ์ค๋๋ ๋ฆฌํ๋ ์ ํ ํฐ ํ๊ธฐ (์ํ)
revoked_tokens.add(payload["jti"])
# ์ ํ ํฐ ์ ๋ฐ๊ธ
user_id = payload["sub"]
# ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ํ์ฌ ์ฌ์ฉ์ ์ญํ /๋ฒ์ ์กฐํ
user = get_user_by_id(user_id)
new_access = create_access_token(
user_id=user_id,
role=user["role"],
scopes=user["scopes"]
)
new_refresh = create_refresh_token(user_id=user_id)
return jsonify({
"access_token": new_access,
"refresh_token": new_refresh,
"token_type": "Bearer",
"expires_in": int(JWT_ACCESS_TOKEN_EXPIRES.total_seconds()),
})
@app.route('/api/logout', methods=['POST'])
@require_auth()
def logout():
"""ํ์ฌ ์ก์ธ์ค ํ ํฐ ํ๊ธฐ."""
revoked_tokens.add(request.user["jti"])
return jsonify({"message": "์ฑ๊ณต์ ์ผ๋ก ๋ก๊ทธ์์ํ์ต๋๋ค"}), 200
@app.route('/api/protected')
@require_auth(scopes=["read"])
def protected_resource():
"""'read' ๋ฒ์๊ฐ ํ์ํ ๋ณดํธ๋ ์๋ํฌ์ธํธ."""
return jsonify({
"user_id": request.user["sub"],
"message": "์ด ๋ณดํธ๋ ๋ฆฌ์์ค์ ์ ๊ทผํ ์ ์์ต๋๋ค"
})
# ์์ ์ฑ์ ์ํ ํ๋ ์ด์คํ๋ ํจ์
def authenticate_user(username, password):
"""ํ๋ ์ด์คํ๋ โ ์ ์ ํ ๋น๋ฐ๋ฒํธ ํด์ฑ์ผ๋ก ๊ตฌํ."""
return None
def get_user_by_id(user_id):
"""ํ๋ ์ด์คํ๋ โ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์กฐํ๋ก ๊ตฌํ."""
return {"id": user_id, "role": "user", "scopes": ["read"]}
2.4 JWT ๋ณด์ ๋ชจ๋ฒ ์ฌ๋ก¶
"""
JWT ๋ณด์ ๋ชจ๋ฒ ์ฌ๋ก ๋ฐ ์ผ๋ฐ์ ์ธ ํจ์ .
"""
# โโ ํจ์ 1: 'none' ์๊ณ ๋ฆฌ์ฆ ์ฌ์ฉ โโโโโโโโโโโโโโโโโโโโโโโโโโโ
# ๊ณต๊ฒฉ: ํค๋๋ฅผ {"alg": "none"}์ผ๋ก ๋ณ๊ฒฝํ๊ณ ์๋ช
์ ๊ฑฐ
# ๋ฐฉ์ด: ํญ์ ํ์ฉ๋ ์๊ณ ๋ฆฌ์ฆ์ ๋ช
์์ ์ผ๋ก ์ง์
payload = jwt.decode(
token, key,
algorithms=["RS256"], # "none"์ ์ ๋ ํฌํจํ๊ฑฐ๋ ๋ชจ๋ ํ์ฉํ์ง ๋ง ๊ฒ
)
# โโ ํจ์ 2: HS256๊ณผ RS256 ํผ๋ โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# ๊ณต๊ฒฉ: ์๋ฒ๊ฐ RS256์ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ, ๊ณต๊ฒฉ์๊ฐ:
# 1. ๊ณต๊ฐ ํค ๊ฐ์ ธ์ค๊ธฐ (๊ณต๊ฐ์)
# 2. ๊ณต๊ฐ ํค๋ฅผ ๋น๋ฐ๋ก ์ฌ์ฉํ์ฌ HS256์ผ๋ก ์ ํ ํฐ ์๋ช
# 3. ์๋ฒ๊ฐ ๊ณต๊ฐ ํค๋ฅผ HMAC ๋น๋ฐ๋ก ์ทจ๊ธ
# ๋ฐฉ์ด: ํค๋์ ์๊ณ ๋ฆฌ์ฆ์ ์๊ฒฉํ๊ฒ ๊ฒ์ฆ
payload = jwt.decode(
token, rsa_public_key,
algorithms=["RS256"], # ์์๋ ์๊ณ ๋ฆฌ์ฆ๋ง ํ์ฉ
)
# โโ ํจ์ 3: ๋ง๋ฃ ๋๋ฝ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# exp ํด๋ ์์ด ์๋ ํ ํฐ์ ์์ํ ์ ํจ
# ๋ฐฉ์ด: ํญ์ ์งง์ ๋ง๋ฃ ์ค์
payload = {
"sub": "user_123",
"exp": datetime.now(timezone.utc) + timedelta(minutes=15),
}
# โโ ํจ์ 4: ํ์ด๋ก๋์ ๋ฏผ๊ฐํ ๋ฐ์ดํฐ ์ ์ฅ โโโโโโโโโโโโโโโโโ
# JWT ํ์ด๋ก๋๋ base64url ์ธ์ฝ๋ฉ๋๋ฉฐ ์ํธํ๋์ง ์์
# ํค ์์ด๋ ๋๊ตฌ๋ ๋์ฝ๋ ๊ฐ๋ฅ
import base64
header, payload_b64, signature = token.split('.')
decoded = base64.urlsafe_b64decode(payload_b64 + '==')
# ์ ์ฒด ํ์ด๋ก๋๋ฅผ ์ด์ ์ฝ์ ์ ์์!
# ์ ๋ ํฌํจํ์ง ๋ง ๊ฒ: ๋น๋ฐ๋ฒํธ, SSN, ์ ์ฉ์นด๋, PII๋ฅผ JWT์
# ํฌํจํ ๊ฒ๋ง: ์ฌ์ฉ์ ID, ์ญํ , ๋ฒ์, ๋ง๋ฃ
# โโ ํจ์ 5: ํ ํฐ ํ๊ธฐ ๋ฉ์ปค๋์ฆ ์์ โโโโโโโโโโโโโโโโโโโโโ
# JWT๋ ์์ฒด ํฌํจ โ ์๋ฒ๊ฐ ๋ฌดํจํํ ์ ์์
# ํด๊ฒฐ์ฑ
:
# 1. ๋จ๊ธฐ ์ก์ธ์ค ํ ํฐ (15๋ถ) + ๋ฆฌํ๋ ์ ํ ํฐ ์ํ
# 2. ํ ํฐ ๋ธ๋๋ฆฌ์คํธ (TTL = ํ ํฐ exp๊ฐ ์๋ Redis)
# 3. ํ ํฐ ๋ฒ์ ๊ด๋ฆฌ (DB์ ํ ํฐ ๋ฒ์ ์ ์ฅ, ๊ฐ ์์ฒญ์์ ํ์ธ)
# 4. ์๋ช
ํค ๋ณ๊ฒฝ (๋ชจ๋ ํ ํฐ ๋ฌดํจํ โ ๊ทน๋จ์ ์ต์
)
# โโ RS256 ์ค์ (ํ๋ก๋์
๊ถ์ฅ) โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
"""
# RSA ํค ์ ์์ฑ
openssl genrsa -out private.pem 2048
openssl rsa -in private.pem -pubout -out public.pem
# ๊ฐ์ธ ํค: ์ธ์ฆ ์๋ฒ๊ฐ ํ ํฐ์ ์๋ช
ํ๋ ๋ฐ ์ฌ์ฉ
# ๊ณต๊ฐ ํค: ๋ชจ๋ ์๋น์ค๊ฐ ํ ํฐ์ ๊ฒ์ฆํ๋ ๋ฐ ์ฌ์ฉ
# ๊ณต๊ฐ ํค๋ฅผ ์์ ๋กญ๊ฒ ๊ณต์ ; ๊ฐ์ธ ํค ๋ณดํธ
"""
from cryptography.hazmat.primitives import serialization
# ํค ๋ก๋
with open('private.pem', 'rb') as f:
private_key = serialization.load_pem_private_key(f.read(), password=None)
with open('public.pem', 'rb') as f:
public_key = serialization.load_pem_public_key(f.read())
# ๊ฐ์ธ ํค๋ก ์๋ช
token = jwt.encode(payload, private_key, algorithm="RS256")
# ๊ณต๊ฐ ํค๋ก ๊ฒ์ฆ (๋ชจ๋ ์๋น์ค๊ฐ ํ ์ ์์)
decoded = jwt.decode(token, public_key, algorithms=["RS256"])
3. ์๋ ์ ํ¶
3.1 ์๋ ์ ํ ์๊ณ ๋ฆฌ์ฆ¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ์๋ ์ ํ ์๊ณ ๋ฆฌ์ฆ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ 1. ๊ณ ์ ์๋์ฐ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Window 1 โโ Window 2 โโ Window 3 โ โ
โ โ โ โ โ โ โ โโ โ โ โ โโ โ โ โ โ โ โ โ โ โ
โ โ 5/10 โโ 3/10 โโ 7/10 โ (์ ํ: ์๋์ฐ๋น 10) โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ ์ฅ์ : ๊ฐ๋จ. ๋จ์ : ์๋์ฐ ๊ฒฝ๊ณ์์ ๋ฒ์คํธ (2๋ฐฐ ์ ํ) โ
โ โ
โ 2. ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ๋ก๊ทธ โ
โ Time: โโโโโโโโโ[====ํ์ฌ ์๋์ฐ====]โโโโโโโโ โ
โ ๊ฐ ์์ฒญ์ ์ ํํ ํ์์คํฌํ ์ถ์ โ
โ ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ๋ด ์์ฒญ ์นด์ดํธ โ
โ ์ฅ์ : ์ ํ. ๋จ์ : ๋ฉ๋ชจ๋ฆฌ ์ง์ฝ์ (๋ชจ๋ ํ์์คํฌํ ์ ์ฅ) โ
โ โ
โ 3. ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ์นด์ดํฐ โ
โ ๊ณ ์ ์๋์ฐ ์นด์ดํธ๋ฅผ ๊ฐ์ค์น ์ค๋ฒ๋ฉ๊ณผ ๊ฒฐํฉ โ
โ weight = (window_size - elapsed) / window_size โ
โ count = prev_count * weight + current_count โ
โ ์ฅ์ : ๋ฉ๋ชจ๋ฆฌ ํจ์จ์ . ๋จ์ : ๊ทผ์ฌ์น โ
โ โ
โ 4. ํ ํฐ ๋ฒํท โ
โ โโโโโโโโโโโ โ
โ โ โ โ โ โ โ โ ๋ฒํท (์ฉ๋: 10 ํ ํฐ) โ
โ โ โ โ โ โ โ
โ โโโโโโโโโโโ โ
โ โ โ
โ ๋ฆฌํ: ์ด๋น 1 ํ ํฐ โ
โ ๊ฐ ์์ฒญ์ 1 ํ ํฐ ์๋น โ
โ ์ฅ์ : ๋ฒ์คํธ ํ์ฉ. ๋จ์ : ๊ด๋ฆฌํ ์ํ๊ฐ ๋ ๋ง์ โ
โ โ
โ 5. ๋ฆฌํค ๋ฒํท โ
โ ์์ฒญ์ด ํ(๋ฒํท)์ ๋ค์ด๊ฐ โ
โ ๊ณ ์ ์๋๋ก ์ฒ๋ฆฌ (๋์ถ ์๋) โ
โ ์ค๋ฒํ๋ก๋ ๊ฑฐ๋ถ๋จ โ
โ ์ฅ์ : ๋ถ๋๋ฌ์ด ์ถ๋ ฅ ์๋. ๋จ์ : ์ฉ๋์ด ์์ด๋ ์ง์ฐ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
3.2 Flask-Limiter๋ฅผ ์ฌ์ฉํ Flask ์๋ ์ ํ¶
"""
Flask-Limiter๋ฅผ ์ฌ์ฉํ ์๋ ์ ํ ๊ตฌํ.
pip install Flask-Limiter
"""
from flask import Flask, jsonify, request
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
app = Flask(__name__)
# โโ ๊ธฐ๋ณธ ์ค์ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
limiter = Limiter(
app=app,
key_func=get_remote_address, # IP ์ฃผ์๋ก ์๋ ์ ํ
default_limits=["200 per day", "50 per hour"],
storage_uri="redis://localhost:6379", # ๋ถ์ฐ์ฉ Redis ์ฌ์ฉ
# storage_uri="memory://", # ๊ฐ๋ฐ์ฉ ์ธ๋ฉ๋ชจ๋ฆฌ
strategy="fixed-window-elastic-expiry",
)
# โโ ์ ์ญ ์๋ ์ ํ (๋ชจ๋ ๋ผ์ฐํธ์ ์ ์ฉ) โโโโโโโโโโโโโโโโโโโ
# ์ default_limits๋ฅผ ํตํด ์ด๋ฏธ ์ค์ ๋จ
# โโ ๋ผ์ฐํธ๋ณ ์๋ ์ ํ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
@app.route('/api/search')
@limiter.limit("10 per minute")
def search():
"""๊ฒ์ ์๋ํฌ์ธํธ โ ์คํฌ๋ํ ๋ฐฉ์ง๋ฅผ ์ํ ์๊ฒฉํ ์ ํ."""
query = request.args.get('q', '')
return jsonify({"query": query, "results": []})
@app.route('/api/login', methods=['POST'])
@limiter.limit("5 per minute") # ๋ฌด์ฐจ๋ณ ๋์
๋ฐฉ์ง
def login():
"""๊ณต๊ฒฉ์ ์ธ ์๋ ์ ํ์ด ์๋ ๋ก๊ทธ์ธ ์๋ํฌ์ธํธ."""
return jsonify({"message": "login"})
@app.route('/api/data')
@limiter.limit("100 per hour")
@limiter.limit("10 per minute") # ์ฌ๋ฌ ์ ํ
def get_data():
"""๊ณ์ธตํ๋ ์๋ ์ ํ์ด ์๋ ๋ฐ์ดํฐ ์๋ํฌ์ธํธ."""
return jsonify({"data": []})
# โโ API ํค ํฐ์ด์ ๋ฐ๋ฅธ ๋์ ์๋ ์ ํ โโโโโโโโโโโโโโโโโโโโโโโโ
def get_rate_limit_by_tier():
"""ํด๋ผ์ด์ธํธ์ API ํฐ์ด์ ๋ฐ๋ผ ์๋ ์ ํ ๋ฌธ์์ด ๋ฐํ."""
api_key = request.headers.get('X-API-Key', '')
# ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ํฐ์ด ์กฐํ (๋จ์ํ๋จ)
tiers = {
"free": "100 per hour",
"pro": "1000 per hour",
"enterprise": "10000 per hour",
}
tier = get_tier_for_key(api_key)
return tiers.get(tier, "50 per hour") # ๊ธฐ๋ณธ๊ฐ์ free
@app.route('/api/premium')
@limiter.limit(get_rate_limit_by_tier)
def premium_endpoint():
"""ํฐ์ด ๊ธฐ๋ฐ ์๋ ์ ํ์ด ์๋ ์๋ํฌ์ธํธ."""
return jsonify({"data": "premium"})
# โโ ์๋ ์ ํ ํค๋ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Flask-Limiter๊ฐ ์๋์ผ๋ก ์ด ํค๋๋ค์ ์ถ๊ฐ:
# X-RateLimit-Limit: 100
# X-RateLimit-Remaining: 95
# X-RateLimit-Reset: 1699999999
# Retry-After: 60 (์ ํ ์ด๊ณผ ์)
# โโ ์ฌ์ฉ์ ์ ์ ์ค๋ฅ ํธ๋ค๋ฌ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
@app.errorhandler(429)
def ratelimit_handler(e):
"""์๋ ์ ํ ์ด๊ณผ ์ ์ฌ์ฉ์ ์ ์ ์๋ต."""
return jsonify({
"error": "rate_limit_exceeded",
"message": "๋๋ฌด ๋ง์ ์์ฒญ. ๋์ค์ ๋ค์ ์๋ํ์ธ์.",
"retry_after": e.description,
}), 429
# ํ๋ ์ด์คํ๋
def get_tier_for_key(api_key):
return "free"
3.3 ์ฌ์ฉ์ ์ ์ ํ ํฐ ๋ฒํท ๊ตฌํ¶
"""
์ฒ์๋ถํฐ ํ ํฐ ๋ฒํท ์๋ ์ ํ๊ธฐ ๊ตฌํ.
"""
import time
import threading
from dataclasses import dataclass, field
@dataclass
class TokenBucket:
"""ํ ํฐ ๋ฒํท ์๋ ์ ํ๊ธฐ.
Args:
capacity: ๋ฒํท์ ์ต๋ ํ ํฐ ์
refill_rate: ์ด๋น ์ถ๊ฐ๋๋ ํ ํฐ
"""
capacity: int
refill_rate: float
tokens: float = field(init=False)
last_refill: float = field(init=False)
lock: threading.Lock = field(default_factory=threading.Lock, init=False)
def __post_init__(self):
self.tokens = float(self.capacity)
self.last_refill = time.monotonic()
def _refill(self):
"""๊ฒฝ๊ณผ ์๊ฐ์ ๋ฐ๋ผ ํ ํฐ ์ถ๊ฐ."""
now = time.monotonic()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
def consume(self, tokens: int = 1) -> bool:
"""ํ ํฐ ์๋น ์๋. ํ์ฉ๋๋ฉด True ๋ฐํ."""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_time(self) -> float:
"""์ต์ 1๊ฐ์ ํ ํฐ์ ์ฌ์ฉํ ์ ์์ ๋๊น์ง์ ์ด ๋ฐํ."""
with self.lock:
self._refill()
if self.tokens >= 1:
return 0.0
return (1 - self.tokens) / self.refill_rate
class RateLimiterStore:
"""ํด๋ผ์ด์ธํธ ํค๋ณ ์๋ ์ ํ๊ธฐ ๊ด๋ฆฌ."""
def __init__(self, capacity: int = 100, refill_rate: float = 10.0):
self.capacity = capacity
self.refill_rate = refill_rate
self._buckets: dict[str, TokenBucket] = {}
self._lock = threading.Lock()
def get_bucket(self, key: str) -> TokenBucket:
"""์ฃผ์ด์ง ํค์ ๋ํ ํ ํฐ ๋ฒํท ๊ฐ์ ธ์ค๊ธฐ ๋๋ ์์ฑ."""
if key not in self._buckets:
with self._lock:
if key not in self._buckets:
self._buckets[key] = TokenBucket(
capacity=self.capacity,
refill_rate=self.refill_rate
)
return self._buckets[key]
def is_allowed(self, key: str, tokens: int = 1) -> bool:
"""์ฃผ์ด์ง ํค์ ๋ํ ์์ฒญ์ด ํ์ฉ๋๋์ง ํ์ธ."""
bucket = self.get_bucket(key)
return bucket.consume(tokens)
# โโ Flask์ ํจ๊ป ์ฌ์ฉ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
from flask import Flask, request, jsonify
from functools import wraps
app = Flask(__name__)
rate_limiter = RateLimiterStore(capacity=100, refill_rate=10.0)
def rate_limit(capacity=100, refill_rate=10.0):
"""์ฌ์ฉ์ ์ ์ ์๋ ์ ํ ๋ฐ์ฝ๋ ์ดํฐ."""
store = RateLimiterStore(capacity=capacity, refill_rate=refill_rate)
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# IP + ์๋ํฌ์ธํธ๋ฅผ ์๋ ์ ํ ํค๋ก ์ฌ์ฉ
client_ip = request.remote_addr
key = f"{client_ip}:{request.endpoint}"
bucket = store.get_bucket(key)
if not bucket.consume():
wait = bucket.wait_time()
return jsonify({
"error": "rate_limit_exceeded",
"retry_after": round(wait, 2)
}), 429
response = f(*args, **kwargs)
return response
return decorated
return decorator
@app.route('/api/resource')
@rate_limit(capacity=10, refill_rate=1.0) # 10 ๋ฒ์คํธ, ์ด๋น 1 ์ง์
def get_resource():
return jsonify({"data": "resource"})
4. ์ ๋ ฅ ๊ฒ์ฆ ๋ฐ ์ ์ ¶
4.1 ๊ฒ์ฆ ์ํคํ ์ฒ¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ์
๋ ฅ ๊ฒ์ฆ ๊ณ์ธต โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Request โโฌโโ ๊ณ์ธต 1: ์คํค๋ง ๊ฒ์ฆ โ
โ โ (๊ตฌ์กฐ, ํ์
, ํ์ ํ๋) โ
โ โ โ
โ โโโ ๊ณ์ธต 2: ๋น์ฆ๋์ค ๊ฒ์ฆ โ
โ โ (๋ฒ์, ํ์, ์ผ๊ด์ฑ) โ
โ โ โ
โ โโโ ๊ณ์ธต 3: ์ ์ โ
โ โ (๊ณต๋ฐฑ ์ ๊ฑฐ, ์ ๊ทํ, ์ธ์ฝ๋ฉ) โ
โ โ โ
โ โโโ ๊ณ์ธต 4: ๋งค๊ฐ๋ณ์ํ๋ ์์
โ
โ (SQL ๋งค๊ฐ๋ณ์ํ, ํ
ํ๋ฆฟ ์ด์ค์ผ์ดํ) โ
โ โ
โ ์์น: ์กฐ๊ธฐ ๊ฒ์ฆ, ๋น ๋ฅธ ์คํจ, ํด๋ผ์ด์ธํธ ๋ฐ์ดํฐ๋ฅผ ์ ๋ ์ ๋ขฐํ์ง ๋ง ๊ฒ. โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
4.2 Marshmallow๋ฅผ ์ฌ์ฉํ ์คํค๋ง ๊ฒ์ฆ¶
"""
Marshmallow ์คํค๋ง๋ฅผ ์ฌ์ฉํ API ์
๋ ฅ ๊ฒ์ฆ.
pip install marshmallow
"""
from marshmallow import (
Schema, fields, validate, validates, validates_schema,
ValidationError, pre_load, RAISE
)
from flask import Flask, request, jsonify
app = Flask(__name__)
# โโ ์คํค๋ง ์ ์ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
class UserCreateSchema(Schema):
"""์ ์ฌ์ฉ์ ์์ฑ์ฉ ์คํค๋ง."""
class Meta:
# ์ ์ ์๋ ํ๋์ ๋ํด ์ค๋ฅ ๋ฐ์ (๋๋ ํ ๋น ๋ฐฉ์ง)
unknown = RAISE
username = fields.String(
required=True,
validate=[
validate.Length(min=3, max=30),
validate.Regexp(
r'^[a-zA-Z0-9_]+$',
error="์ฌ์ฉ์ ์ด๋ฆ์ ๋ฌธ์, ์ซ์, ๋ฐ์ค๋ง ํฌํจํด์ผ ํฉ๋๋ค"
),
]
)
email = fields.Email(required=True)
password = fields.String(
required=True,
load_only=True, # ์ง๋ ฌํ๋ ์ถ๋ ฅ์ ์ ๋ ํฌํจํ์ง ์์
validate=validate.Length(min=12, max=128),
)
age = fields.Integer(
validate=validate.Range(min=13, max=150),
load_default=None,
)
role = fields.String(
validate=validate.OneOf(["user", "moderator"]),
load_default="user",
# ์ฐธ๊ณ : "admin"์ API๋ฅผ ํตํด ํ์ฉ๋์ง ์์ โ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ํตํด์๋ง
)
@validates('password')
def validate_password_complexity(self, value):
"""๋น๋ฐ๋ฒํธ ๋ณต์ก์ฑ ์๊ตฌ์ฌํญ ์ ์ฉ."""
errors = []
if not any(c.isupper() for c in value):
errors.append("์ต์ ํ๋์ ๋๋ฌธ์๋ฅผ ํฌํจํด์ผ ํฉ๋๋ค")
if not any(c.islower() for c in value):
errors.append("์ต์ ํ๋์ ์๋ฌธ์๋ฅผ ํฌํจํด์ผ ํฉ๋๋ค")
if not any(c.isdigit() for c in value):
errors.append("์ต์ ํ๋์ ์ซ์๋ฅผ ํฌํจํด์ผ ํฉ๋๋ค")
if not any(c in '!@#$%^&*()_+-=[]{}|;:,.<>?' for c in value):
errors.append("์ต์ ํ๋์ ํน์ ๋ฌธ์๋ฅผ ํฌํจํด์ผ ํฉ๋๋ค")
if errors:
raise ValidationError(errors)
@pre_load
def normalize_input(self, data, **kwargs):
"""๊ฒ์ฆ ์ ์ ์
๋ ฅ ๋ฐ์ดํฐ ์ ๊ทํ."""
if 'email' in data:
data['email'] = data['email'].strip().lower()
if 'username' in data:
data['username'] = data['username'].strip()
return data
class SearchQuerySchema(Schema):
"""๊ฒ์ ์ฟผ๋ฆฌ์ฉ ์คํค๋ง."""
class Meta:
unknown = RAISE
q = fields.String(
required=True,
validate=validate.Length(min=1, max=200),
)
page = fields.Integer(
validate=validate.Range(min=1, max=1000),
load_default=1,
)
per_page = fields.Integer(
validate=validate.Range(min=1, max=100),
load_default=20,
)
sort = fields.String(
validate=validate.OneOf(["relevance", "date", "name"]),
load_default="relevance",
)
order = fields.String(
validate=validate.OneOf(["asc", "desc"]),
load_default="desc",
)
# โโ ๊ฒ์ฆ ๋ฐ์ฝ๋ ์ดํฐ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def validate_input(schema_class, location="json"):
"""์คํค๋ง์ ๋ํด ์์ฒญ ์
๋ ฅ์ ๊ฒ์ฆํ๋ ๋ฐ์ฝ๋ ์ดํฐ."""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
schema = schema_class()
if location == "json":
data = request.get_json(silent=True)
if data is None:
return jsonify({
"error": "invalid_request",
"message": "์์ฒญ ๋ณธ๋ฌธ์ ์ ํจํ JSON์ด์ด์ผ ํฉ๋๋ค"
}), 400
elif location == "args":
data = request.args.to_dict()
elif location == "form":
data = request.form.to_dict()
else:
raise ValueError(f"์ ์ ์๋ ์์น: {location}")
try:
validated = schema.load(data)
except ValidationError as err:
return jsonify({
"error": "validation_error",
"messages": err.messages,
}), 422
request.validated_data = validated
return f(*args, **kwargs)
return decorated
return decorator
# โโ ๊ฒ์ฆ ๋ฐ์ฝ๋ ์ดํฐ ์ฌ์ฉ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
@app.route('/api/users', methods=['POST'])
@validate_input(UserCreateSchema, location="json")
def create_user():
"""๊ฒ์ฆ๋ ์
๋ ฅ์ผ๋ก ์ ์ฌ์ฉ์ ์์ฑ."""
data = request.validated_data
# ์ด ์์ ์์ data๋ ์ ํจํจ์ด ๋ณด์ฅ๋จ
return jsonify({
"message": "์ฌ์ฉ์ ์์ฑ๋จ",
"username": data["username"],
"email": data["email"],
}), 201
@app.route('/api/search')
@validate_input(SearchQuerySchema, location="args")
def search():
"""๊ฒ์ฆ๋ ์ฟผ๋ฆฌ ๋งค๊ฐ๋ณ์๋ก ๊ฒ์."""
data = request.validated_data
return jsonify({
"query": data["q"],
"page": data["page"],
"per_page": data["per_page"],
})
4.3 Pydantic ๊ฒ์ฆ (๋์)¶
"""
Pydantic v2๋ฅผ ์ฌ์ฉํ API ๊ฒ์ฆ.
pip install pydantic
"""
from pydantic import (
BaseModel, Field, field_validator, model_validator,
EmailStr, ConfigDict
)
from typing import Optional
import re
class UserCreate(BaseModel):
"""์ฌ์ฉ์ ์์ฑ์ฉ Pydantic ๋ชจ๋ธ."""
model_config = ConfigDict(
str_strip_whitespace=True,
extra='forbid', # ์ ์ ์๋ ํ๋ ๊ฑฐ๋ถ
)
username: str = Field(
min_length=3,
max_length=30,
pattern=r'^[a-zA-Z0-9_]+$',
)
email: EmailStr
password: str = Field(min_length=12, max_length=128)
age: Optional[int] = Field(default=None, ge=13, le=150)
role: str = Field(default="user", pattern=r'^(user|moderator)$')
@field_validator('password')
@classmethod
def validate_password(cls, v):
if not re.search(r'[A-Z]', v):
raise ValueError('๋๋ฌธ์๋ฅผ ํฌํจํด์ผ ํฉ๋๋ค')
if not re.search(r'[a-z]', v):
raise ValueError('์๋ฌธ์๋ฅผ ํฌํจํด์ผ ํฉ๋๋ค')
if not re.search(r'\d', v):
raise ValueError('์ซ์๋ฅผ ํฌํจํด์ผ ํฉ๋๋ค')
if not re.search(r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]', v):
raise ValueError('ํน์ ๋ฌธ์๋ฅผ ํฌํจํด์ผ ํฉ๋๋ค')
return v
@field_validator('email')
@classmethod
def normalize_email(cls, v):
return v.lower()
# โโ Flask์์ ์ฌ์ฉ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
from flask import Flask, request, jsonify
from pydantic import ValidationError as PydanticValidationError
app = Flask(__name__)
@app.route('/api/users', methods=['POST'])
def create_user():
try:
user = UserCreate(**request.get_json())
except PydanticValidationError as e:
return jsonify({
"error": "validation_error",
"details": e.errors(),
}), 422
return jsonify({"username": user.username}), 201
5. CORS (Cross-Origin Resource Sharing)¶
5.1 CORS ์๋ ๋ฐฉ์¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CORS ํ๋ฆ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ ๊ฐ๋จํ ์์ฒญ (GET, HEAD, ๊ฐ๋จํ ํค๋๊ฐ ์๋ POST): โ
โ โ
โ Browser โโโโGET /api/dataโโโโโโโถ Server โ
โ (origin: https://app.com) โ
โ โ
โ Server โโโโResponseโโโโโโโโโโโโโโถ Browser โ
โ Access-Control-Allow-Origin: https://app.com โ
โ โโโ ๋ธ๋ผ์ฐ์ ๊ฐ ์๋ต ํ์ฉ โ โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ ํ๋ฆฌํ๋ผ์ดํธ ์์ฒญ (PUT, DELETE, ์ฌ์ฉ์ ์ ์ ํค๋, JSON): โ
โ โ
โ ๋จ๊ณ 1: ๋ธ๋ผ์ฐ์ ๊ฐ OPTIONS ํ๋ฆฌํ๋ผ์ดํธ ์ ์ก โ
โ Browser โโโโOPTIONS /api/dataโโโโถ Server โ
โ Origin: https://app.com โ
โ Access-Control-Request-Method: PUT โ
โ Access-Control-Request-Headers: Content-Type, Authorization โ
โ โ
โ ๋จ๊ณ 2: ์๋ฒ๊ฐ ํ์ฉ๋ ๋ฉ์๋/ํค๋๋ก ์๋ต โ
โ Server โโโโ204 No Contentโโโโโโโโถ Browser โ
โ Access-Control-Allow-Origin: https://app.com โ
โ Access-Control-Allow-Methods: GET, POST, PUT, DELETE โ
โ Access-Control-Allow-Headers: Content-Type, Authorization โ
โ Access-Control-Max-Age: 86400 โ
โ โ
โ ๋จ๊ณ 3: ๋ธ๋ผ์ฐ์ ๊ฐ ์ค์ ์์ฒญ ์ ์ก โ
โ Browser โโโโPUT /api/dataโโโโโโโโถ Server โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
5.2 Flask CORS ๊ตฌ์ฑ¶
"""
Flask์์ CORS ๊ตฌ์ฑ.
pip install flask-cors
"""
from flask import Flask, jsonify
from flask_cors import CORS
app = Flask(__name__)
# โโ ์ต์
1: ํน์ ์ถ์ฒ ํ์ฉ (๊ถ์ฅ) โโโโโโโโโโโโโโ
CORS(app, resources={
r"/api/*": {
"origins": [
"https://app.example.com",
"https://admin.example.com",
],
"methods": ["GET", "POST", "PUT", "DELETE"],
"allow_headers": ["Content-Type", "Authorization"],
"expose_headers": ["X-Request-Id", "X-RateLimit-Remaining"],
"supports_credentials": True,
"max_age": 86400,
}
})
# โโ ์ต์
2: ๋ค๋ฅธ ๋ผ์ฐํธ์ ๋ํ ๋ค๋ฅธ CORS โโโโโโโโโโโโโโโโ
app2 = Flask(__name__)
# ๊ณต๊ฐ API: ๋ชจ๋ ์ถ์ฒ ํ์ฉ (์๊ฒฉ ์ฆ๋ช
์์)
CORS(app2, resources={
r"/api/public/*": {
"origins": "*",
"methods": ["GET"],
"allow_headers": ["Content-Type"],
"supports_credentials": False, # origin: *์ธ ๊ฒฝ์ฐ False์ฌ์ผ ํจ
}
})
# ๋น๊ณต๊ฐ API: ์๊ฒฉ ์ฆ๋ช
์ด ์๋ ํน์ ์ถ์ฒ
CORS(app2, resources={
r"/api/private/*": {
"origins": ["https://app.example.com"],
"methods": ["GET", "POST", "PUT", "DELETE"],
"allow_headers": ["Content-Type", "Authorization"],
"supports_credentials": True,
"max_age": 86400,
}
})
# โโ ์ต์
3: ์๋ CORS ๊ตฌํ โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
from flask import Flask, request, make_response
app3 = Flask(__name__)
ALLOWED_ORIGINS = {
"https://app.example.com",
"https://admin.example.com",
}
@app3.after_request
def add_cors_headers(response):
origin = request.headers.get('Origin')
if origin in ALLOWED_ORIGINS:
response.headers['Access-Control-Allow-Origin'] = origin
response.headers['Access-Control-Allow-Credentials'] = 'true'
response.headers['Access-Control-Allow-Methods'] = (
'GET, POST, PUT, DELETE, OPTIONS'
)
response.headers['Access-Control-Allow-Headers'] = (
'Content-Type, Authorization, X-Requested-With'
)
response.headers['Access-Control-Expose-Headers'] = (
'X-Request-Id, X-RateLimit-Remaining'
)
response.headers['Access-Control-Max-Age'] = '86400'
# Vary: Origin์ ์บ์์ ์๋ต์ด ์ถ์ฒ์ ๋ฐ๋ผ ๋ฌ๋ผ์ง๋ค๊ณ ์๋ฆผ
response.headers.add('Vary', 'Origin')
return response
@app3.route('/api/data', methods=['OPTIONS'])
def preflight():
"""CORS ํ๋ฆฌํ๋ผ์ดํธ ์์ฒญ ์ฒ๋ฆฌ."""
return make_response('', 204)
# โโ CORS ๋ณด์ ๊ท์น โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
"""
1. ์๊ฒฉ ์ฆ๋ช
๊ณผ ํจ๊ป Access-Control-Allow-Origin: *๋ฅผ ์ ๋ ์ฌ์ฉํ์ง ๋ง ๊ฒ
- ์ค์ ๋ก ๋ธ๋ผ์ฐ์ ์์ ์ฐจ๋จ๋จ
- ์๊ฒฉ ์ฆ๋ช
์ด ํ์ํ ๊ฒฝ์ฐ ํน์ ์ถ์ฒ ๋์ด
2. ํ์ธ ์์ด Origin ํค๋๋ฅผ Allow-Origin์ผ๋ก ์ ๋ ๋ฐ์ํ์ง ๋ง ๊ฒ
- ์ด๋ ๋ชจ๋ ์ถ์ฒ๋ฅผ ํ์ฉํ๋ ๊ฒ๊ณผ ๋์ผ
- ๋์จ: response.headers['ACAO'] = request.headers['Origin']
- ์ข์: ๋จผ์ ํ์ฉ ๋ชฉ๋ก์ ๋ํด ํ์ธ
3. Access-Control-Allow-Methods๋ฅผ ์ค์ ๋ก ํ์ํ ๊ฒ์ผ๋ก ์ ํ
- ๋ผ์ฐํธ๊ฐ ์ง์ํ์ง ์๋ ๊ฒฝ์ฐ DELETE๋ฅผ ํ์ฉํ์ง ๋ง ๊ฒ
4. ํ๋ฆฌํ๋ผ์ดํธ ์์ฒญ์ ์ค์ด๊ธฐ ์ํด Access-Control-Max-Age ์ค์
- 86400 (24์๊ฐ)์ด ํฉ๋ฆฌ์
5. ์ฌ์ฉ์ ์ ์ ํค๋์ Access-Control-Expose-Headers ์ฌ์ฉ
- ๊ธฐ๋ณธ์ ์ผ๋ก ๊ฐ๋จํ ํค๋๋ง JavaScript์์ ์ฝ์ ์ ์์
6. ACAO๊ฐ ์์ฒญ๋ณ๋ก ๋ณ๊ฒฝ๋๋ ๊ฒฝ์ฐ ํญ์ Vary: Origin ์ถ๊ฐ
- ์บ์ ํฌ์ด์ฆ๋ ๋ฐฉ์ง
"""
6. GraphQL ๋ณด์¶
6.1 GraphQL ํน์ ์ํ¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ GraphQL ๋ณด์ ์ํ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ 1. ์ฟผ๋ฆฌ ๊น์ด ๊ณต๊ฒฉ โ
โ query { user { posts { comments { author { posts { ... } } } } }โ
โ โโโถ ๊น์ด ์ค์ฒฉ ์ฟผ๋ฆฌ๊ฐ ๊ธฐํ๊ธ์์ DB ๋ถํ ์ ๋ฐ โ
โ โ
โ 2. ์ฟผ๋ฆฌ ํญ ๊ณต๊ฒฉ โ
โ query { user1: user(id:1) {...} user2: user(id:2) {...} ... } โ
โ โโโถ ๋ง์ ๋ณ์นญ์ด ์ฟผ๋ฆฌ ๋น์ฉ์ ๊ณฑํจ โ
โ โ
โ 3. ์ธํธ๋ก์คํ์
๋จ์ฉ โ
โ query { __schema { types { name fields { name } } } } โ
โ โโโถ ๊ณต๊ฒฉ์์๊ฒ ์ ์ฒด API ์คํค๋ง ๋
ธ์ถ โ
โ โ
โ 4. ๋ฐฐ์น ๊ณต๊ฒฉ โ
โ [{"query": "..."}, {"query": "..."}, ... x 1000] โ
โ โโโถ ๋จ์ผ ์์ฒญ์ ์ฌ๋ฌ ์ฟผ๋ฆฌ โ
โ โ
โ 5. ๋ณ์๋ฅผ ํตํ ์ฃผ์
โ
โ query ($id: String!) { user(id: $id) { ... } } โ
โ variables: { "id": "1 OR 1=1" } โ
โ โโโถ ๊ฒ์ฆ๋์ง ์์ ๋ณ์๋ฅผ ํตํ SQL ์ฃผ์
โ
โ โ
โ 6. ์ ๋ณด ๋
ธ์ถ โ
โ ๋ด๋ถ ์ธ๋ถ ์ ๋ณด๋ฅผ ๋๋ฌ๋ด๋ ์์ธํ ์ค๋ฅ ๋ฉ์์ง โ
โ ํ๋ ์ ์: "secretAdminField๋ฅผ ์๋ฏธํ์
จ๋์?" โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
6.2 GraphQL ๋ณด์ ์ํ¶
"""
graphene (Python)์ ์ฌ์ฉํ GraphQL ๋ณด์ ์กฐ์น.
pip install graphene flask-graphql
"""
# โโ 1. ์ฟผ๋ฆฌ ๊น์ด ์ ํ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
class DepthAnalyzer:
"""GraphQL ์ฟผ๋ฆฌ ๊น์ด ๋ถ์ ๋ฐ ์ ํ."""
def __init__(self, max_depth: int = 10):
self.max_depth = max_depth
def analyze(self, query_ast, depth: int = 0) -> int:
"""์ฟผ๋ฆฌ์ ์ต๋ ๊น์ด ๊ณ์ฐ."""
if depth > self.max_depth:
raise ValueError(
f"์ฟผ๋ฆฌ ๊น์ด {depth}๊ฐ ํ์ฉ๋ ์ต๋๊ฐ({self.max_depth})์ ์ด๊ณผํฉ๋๋ค"
)
max_child_depth = depth
if hasattr(query_ast, 'selection_set') and query_ast.selection_set:
for selection in query_ast.selection_set.selections:
child_depth = self.analyze(selection, depth + 1)
max_child_depth = max(max_child_depth, child_depth)
return max_child_depth
# โโ 2. ์ฟผ๋ฆฌ ๋น์ฉ ๋ถ์ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
class QueryCostAnalyzer:
"""GraphQL ์ฟผ๋ฆฌ์ ๋น์ฉ ์ถ์ ."""
# ํ๋ ์ ํ๋ณ ๋น์ฉ ์ ์
FIELD_COSTS = {
"user": 1,
"posts": 5, # ๋ฆฌ์คํธ ํ๋, ์ ์ฌ์ ์ผ๋ก ๋น์
"comments": 3,
"search": 10, # ์ ์ฒด ํ
์คํธ ๊ฒ์์ ๋น์
}
def __init__(self, max_cost: int = 1000):
self.max_cost = max_cost
def calculate_cost(self, query_ast, multiplier: int = 1) -> int:
"""์์ ์ฟผ๋ฆฌ ๋น์ฉ ๊ณ์ฐ."""
total_cost = 0
if hasattr(query_ast, 'selection_set') and query_ast.selection_set:
for selection in query_ast.selection_set.selections:
field_name = selection.name.value
field_cost = self.FIELD_COSTS.get(field_name, 1)
# ๋น์ฉ์ ๊ณฑํ๋ ํ์ด์ง๋ค์ด์
์ธ์ ํ์ธ
args = {
arg.name.value: arg.value.value
for arg in (selection.arguments or [])
if hasattr(arg.value, 'value')
}
limit = int(args.get('first', args.get('limit', 1)))
cost = field_cost * multiplier * max(limit, 1)
total_cost += cost
# ์์ ์ ํ์ผ๋ก ์ฌ๊ท
total_cost += self.calculate_cost(selection, limit)
if total_cost > self.max_cost:
raise ValueError(
f"์ฟผ๋ฆฌ ๋น์ฉ {total_cost}๊ฐ ์ต๋๊ฐ({self.max_cost})์ ์ด๊ณผํฉ๋๋ค"
)
return total_cost
# โโ 3. ํ๋ก๋์
์์ ์ธํธ๋ก์คํ์
๋นํ์ฑํ โโโโโโโโโโโโโโโโโโโโโโ
"""
์ธํธ๋ก์คํ์
์ ์ ์ฒด API ์คํค๋ง๋ฅผ ๋๋ฌ๋
๋๋ค.
ํ๋ก๋์
์์๋ ๋นํ์ฑํํ์ธ์.
"""
from graphql import GraphQLError
class DisableIntrospection:
"""ํ๋ก๋์
์์ ์ธํธ๋ก์คํ์
์ฟผ๋ฆฌ๋ฅผ ๋นํ์ฑํํ๋ ๋ฏธ๋ค์จ์ด."""
def resolve(self, next, root, info, **kwargs):
# __schema ๋ฐ __type ์ฟผ๋ฆฌ ์ฐจ๋จ
if info.field_name in ('__schema', '__type'):
raise GraphQLError("์ธํธ๋ก์คํ์
์ด ๋นํ์ฑํ๋์์ต๋๋ค")
return next(root, info, **kwargs)
# โโ 4. ์๋ ์ ํ + ๋ฐฐ์น ์ ํ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
from flask import Flask, request, jsonify
app = Flask(__name__)
MAX_BATCH_SIZE = 5 # ๋ฐฐ์น ์์ฒญ๋น ์ต๋ ์ฟผ๋ฆฌ ์
@app.before_request
def limit_batch_queries():
"""๋ฐฐ์น ์์ฒญ์ ์ฟผ๋ฆฌ ์ ์ ํ."""
if request.is_json:
data = request.get_json(silent=True)
if isinstance(data, list):
if len(data) > MAX_BATCH_SIZE:
return jsonify({
"error": "batch_limit_exceeded",
"message": f"๋ฐฐ์น๋น ์ต๋ {MAX_BATCH_SIZE}๊ฐ ์ฟผ๋ฆฌ"
}), 400
# โโ 5. ์ง์ ์ฟผ๋ฆฌ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
"""
์์์ ์ฟผ๋ฆฌ๋ฅผ ์๋ฝํ๋ ๋์ , ์ฌ์ ๋ฑ๋ก๋
์ฟผ๋ฆฌ ํด์๋ง ์๋ฝํฉ๋๋ค. ์ด๋ ์ฃผ์
๋ฐ ์ฟผ๋ฆฌ ์กฐ์์ ๋ฐฉ์งํฉ๋๋ค.
"""
import hashlib
# ์ฌ์ ๋ฑ๋ก๋ ์ฟผ๋ฆฌ (๋น๋ ํ์์ ์์ฑ)
PERSISTED_QUERIES = {
"abc123": "query { users { id name } }",
"def456": "query GetUser($id: ID!) { user(id: $id) { id name email } }",
}
@app.route('/graphql', methods=['POST'])
def graphql_endpoint():
data = request.get_json()
# ํ๋ก๋์
์์๋ ์ง์ ์ฟผ๋ฆฌ๋ง ์๋ฝ
query_hash = data.get('extensions', {}).get('persistedQuery', {}).get('sha256Hash')
if query_hash:
query = PERSISTED_QUERIES.get(query_hash)
if not query:
return jsonify({"error": "query_not_found"}), 404
else:
# ๊ฐ๋ฐ์์๋ ์์์ ์ฟผ๋ฆฌ ํ์ฉ
# ํ๋ก๋์
์์๋ ๊ฑฐ๋ถ:
return jsonify({
"error": "persisted_queries_only",
"message": "์ง์ ์ฟผ๋ฆฌ๋ง ์๋ฝ๋ฉ๋๋ค"
}), 400
# ์ฟผ๋ฆฌ ์คํ...
return jsonify({"data": {}})
7. API ๊ฒ์ดํธ์จ์ด ๋ณด์¶
7.1 ๊ฒ์ดํธ์จ์ด ์ํคํ ์ฒ¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ API ๊ฒ์ดํธ์จ์ด ๋ณด์ ์ํคํ
์ฒ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Client โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ API ๊ฒ์ดํธ์จ์ด โ โ
โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ TLS โ โ ์ธ์ฆ โ โ ์๋ โ โ โ
โ โ โ ์ข
๋ฃ โ โ ๊ฒ์ฆ โ โ ์ ํ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ ์
๋ ฅ โ โ ์์ฒญ โ โ CORS โ โ โ
โ โ โ ๊ฒ์ฆ โ โ ๋ก๊น
โ โ ์ฒ๋ฆฌ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ WAF โ โ IP ํ์ฉ/ โ โ ์๋ต โ โ โ
โ โ โ ๊ท์น โ โ ์ฐจ๋จ ๋ชฉ๋กโ โ ํํฐ๋ง โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ โ
โ โผ โผ โผ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ์๋น์ค A โ โ์๋น์ค B โ โ์๋น์ค C โ โ
โ โ(์ฌ์ฉ์) โ โ(์ฃผ๋ฌธ) โ โ(๊ฒ์) โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ
โ ์ค์ ๊ฒ์ดํธ์จ์ด์ ๋ณด์ ์ด์ : โ
โ โข ์ธ์ฆ ์ ์ฉ์ ์ํ ๋จ์ผ ์ง์ โ
โ โข ์๋น์ค ์ ๋ฐ์ ๊ฑธ์น ์ผ๊ด๋ ์๋ ์ ํ โ
โ โข ์ค์ ์ง์ค์ ๋ก๊น
๋ฐ ๋ชจ๋ํฐ๋ง โ
โ โข ๋ฐฑ์๋ ์๋น์ค๊ฐ TLS๋ฅผ ์ฒ๋ฆฌํ์ง ์์ โ
โ โข ๋จ์ํ๋ ๋ณด์ ์ ์ฑ
๊ด๋ฆฌ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
7.2 ์์ฒญ/์๋ต ํํฐ๋ง¶
"""
API ๊ฒ์ดํธ์จ์ด ์์ฒญ/์๋ต ํํฐ๋ง ๋ฏธ๋ค์จ์ด.
"""
from flask import Flask, request, jsonify, g
import re
import uuid
import time
import logging
app = Flask(__name__)
logger = logging.getLogger('api_gateway')
# โโ ์์ฒญ ID ์ถ์ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
@app.before_request
def add_request_id():
"""์ถ์ ์ ์ํ ๊ณ ์ ์์ฒญ ID ์ถ๊ฐ."""
g.request_id = request.headers.get(
'X-Request-Id',
str(uuid.uuid4())
)
g.request_start = time.monotonic()
@app.after_request
def add_response_headers(response):
"""์๋ต์ ๋ณด์ ๋ฐ ์ถ์ ํค๋ ์ถ๊ฐ."""
response.headers['X-Request-Id'] = g.request_id
# ํ์ด๋ฐ ์ถ๊ฐ
elapsed = time.monotonic() - g.request_start
response.headers['X-Response-Time'] = f"{elapsed:.3f}s"
return response
# โโ ์์ฒญ ํฌ๊ธฐ ์ ํ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
MAX_CONTENT_LENGTH = 1 * 1024 * 1024 # 1 MB
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH
@app.before_request
def check_content_length():
"""๊ณผ๋ํ ํฌ๊ธฐ์ ์์ฒญ ๊ฑฐ๋ถ."""
content_length = request.content_length
if content_length and content_length > MAX_CONTENT_LENGTH:
return jsonify({
"error": "payload_too_large",
"max_bytes": MAX_CONTENT_LENGTH,
}), 413
# โโ IP ํ์ฉ ๋ชฉ๋ก/์ฐจ๋จ ๋ชฉ๋ก โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
BLOCKED_IPS = {"192.168.1.100", "10.0.0.50"}
ADMIN_ALLOWED_IPS = {"10.0.0.1", "10.0.0.2"}
@app.before_request
def check_ip():
"""๊ธ์ง๋ IP๋ก๋ถํฐ์ ์์ฒญ ์ฐจ๋จ."""
client_ip = request.remote_addr
if client_ip in BLOCKED_IPS:
logger.warning(f"๊ธ์ง๋ IP์์ ์์ฒญ ์ฐจ๋จ: {client_ip}")
return jsonify({"error": "forbidden"}), 403
# ๊ด๋ฆฌ์ ๋ผ์ฐํธ๋ ํน์ IP ํ์
if request.path.startswith('/admin/'):
if client_ip not in ADMIN_ALLOWED_IPS:
return jsonify({"error": "forbidden"}), 403
# โโ ์๋ต ๋ฐ์ดํฐ ํํฐ๋ง โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
SENSITIVE_FIELDS = {'password', 'ssn', 'credit_card', 'secret_key',
'token', 'api_key'}
def filter_sensitive_data(data):
"""์๋ต ๋ฐ์ดํฐ์์ ๋ฏผ๊ฐํ ํ๋๋ฅผ ์ฌ๊ท์ ์ผ๋ก ์ ๊ฑฐ."""
if isinstance(data, dict):
return {
k: filter_sensitive_data(v)
for k, v in data.items()
if k.lower() not in SENSITIVE_FIELDS
}
elif isinstance(data, list):
return [filter_sensitive_data(item) for item in data]
return data
# โโ ๊ฐ์ฌ ๋ก๊น
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
@app.after_request
def audit_log(response):
"""๊ฐ์ฌ ์ถ์ ์ ์ํ ๋ชจ๋ API ์์ฒญ ๋ก๊ทธ."""
logger.info(
"API ์์ฒญ: method=%s path=%s status=%s "
"ip=%s user_agent=%s request_id=%s duration=%s",
request.method,
request.path,
response.status_code,
request.remote_addr,
request.user_agent.string[:100],
g.request_id,
response.headers.get('X-Response-Time', 'N/A'),
)
return response
8. OpenAPI ๋ณด์ ์ ์¶
8.1 OpenAPI 3.0์ ๋ณด์ ์คํด¶
# openapi.yaml - ๋ณด์ ์คํด ์ ์
openapi: 3.0.3
info:
title: Secure API
version: 1.0.0
# โโ ๋ณด์ ์คํด ์ ์ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
components:
securitySchemes:
# API ํค ์ธ์ฆ
ApiKeyAuth:
type: apiKey
in: header
name: X-API-Key
description: ์๋ฒ ๊ฐ ํต์ ์ ์ํ API ํค
# JWT Bearer ํ ํฐ
BearerAuth:
type: http
scheme: bearer
bearerFormat: JWT
description: JWT ์ก์ธ์ค ํ ํฐ
# OAuth 2.0
OAuth2:
type: oauth2
flows:
authorizationCode:
authorizationUrl: https://auth.example.com/authorize
tokenUrl: https://auth.example.com/token
refreshUrl: https://auth.example.com/refresh
scopes:
read: ๋ฆฌ์์ค ์ฝ๊ธฐ ์ ๊ทผ
write: ๋ฆฌ์์ค ์ฐ๊ธฐ ์ ๊ทผ
admin: ๊ด๋ฆฌ ์ ๊ทผ
# OpenID Connect
OpenIdConnect:
type: openIdConnect
openIdConnectUrl: https://auth.example.com/.well-known/openid-configuration
# โโ ์ ์ญ ๋ณด์ (๋ชจ๋ ์๋ํฌ์ธํธ์ ์ ์ฉ) โโโโโโโโโโโโโโโโโโ
security:
- BearerAuth: []
# โโ ์๋ํฌ์ธํธ๋ณ ๋ณด์ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
paths:
/api/public/status:
get:
summary: ์ํ ํ์ธ (์ธ์ฆ ๋ถํ์)
security: [] # ์ฌ์ ์: ์ธ์ฆ ์์
responses:
'200':
description: OK
/api/users:
get:
summary: ์ฌ์ฉ์ ๋ชฉ๋ก
security:
- BearerAuth: []
- ApiKeyAuth: [] # ๋์ ์ธ์ฆ (OR)
responses:
'200':
description: ์ฌ์ฉ์ ๋ชฉ๋ก
/api/admin/settings:
put:
summary: ์ค์ ์
๋ฐ์ดํธ (๊ด๋ฆฌ์๋ง)
security:
- OAuth2: [admin] # 'admin' ๋ฒ์ ํ์
responses:
'200':
description: ์ค์ ์
๋ฐ์ดํธ๋จ
/api/data:
post:
summary: ๋ฐ์ดํฐ ์์ฑ (read + write ํ์)
security:
- OAuth2: [read, write] # ๋ ๋ฒ์ ๋ชจ๋ ํ์
responses:
'201':
description: ๋ฐ์ดํฐ ์์ฑ๋จ
8.2 API ๋ฒ์ ๊ด๋ฆฌ ๋ณด์¶
"""
๋ณด์์ ์ํ API ๋ฒ์ ๊ด๋ฆฌ ๊ณ ๋ ค ์ฌํญ.
"""
# โโ URL ๊ฒฝ๋ก ๋ฒ์ ๊ด๋ฆฌ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# /api/v1/users (๊ตฌ๋ฒ์ , ์๋ ค์ง ์ทจ์ฝ์ ์ด ์์ ์ ์์)
# /api/v2/users (ํ์ฌ, ํจ์น๋จ)
# โโ ํค๋ ๋ฒ์ ๊ด๋ฆฌ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Accept: application/vnd.example.v2+json
# โโ ๋ฒ์ ๊ด๋ฆฌ์ ๊ด๋ จ๋ ๋ณด์ ๋ฌธ์ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
"""
1. ๊ตฌ API ๋ฒ์ ์๋ ์๋ ค์ง ์ทจ์ฝ์ ์ด ์์ ์ ์์
- ํ๊ธฐ ๋ ์ง๋ฅผ ์ค์ ํ๊ณ ์ ์ฉ
- Deprecation ๋ฐ Sunset ํค๋ ๋ฐํ
2. ํ๊ธฐ๋ ๋ฒ์ ์ ๋ํด ๋ณด์ ํจ์น๋ฅผ ์ ์งํ์ง ๋ง ๊ฒ
- ์ต์ ๋ฒ์ ์ผ๋ก ๊ฐ์ ๋ง์ด๊ทธ๋ ์ด์
3. ํ๊ธฐ๋ ๋ฒ์ ์ ์ฌ์ฉ ๋ชจ๋ํฐ๋ง
- ๊ตฌ๋ฒ์ ์ด ์ฌ์ ํ ์ฌ์ฉ ์ค์ผ ๋ ๊ฒฝ๊ณ
"""
from flask import Flask, request, jsonify
from datetime import datetime
app = Flask(__name__)
API_VERSIONS = {
"v1": {
"status": "deprecated",
"sunset": "2025-06-01",
"successor": "v2",
},
"v2": {
"status": "current",
"sunset": None,
"successor": None,
},
}
@app.before_request
def check_api_version():
"""ํ๊ธฐ๋ API ๋ฒ์ ๊ฒฝ๊ณ ๋๋ ์ฐจ๋จ."""
# URL ๊ฒฝ๋ก์์ ๋ฒ์ ์ถ์ถ
path_parts = request.path.strip('/').split('/')
if len(path_parts) >= 2 and path_parts[0] == 'api':
version = path_parts[1]
version_info = API_VERSIONS.get(version)
if not version_info:
return jsonify({
"error": "invalid_version",
"supported": list(API_VERSIONS.keys()),
}), 400
if version_info["status"] == "deprecated":
sunset_date = version_info["sunset"]
if sunset_date:
# ์ผ๋ชฐ ๋ ์ง๊ฐ ์ง๋ฌ๋์ง ํ์ธ
if datetime.now() > datetime.fromisoformat(sunset_date):
return jsonify({
"error": "version_retired",
"message": f"API {version}์(๋) {sunset_date}์ ํ๊ธฐ๋์์ต๋๋ค",
"upgrade_to": version_info["successor"],
}), 410 # 410 Gone
@app.after_request
def add_deprecation_headers(response):
"""์๋ต ํค๋์ ํ๊ธฐ ๊ฒฝ๊ณ ์ถ๊ฐ."""
path_parts = request.path.strip('/').split('/')
if len(path_parts) >= 2 and path_parts[0] == 'api':
version = path_parts[1]
version_info = API_VERSIONS.get(version, {})
if version_info.get("status") == "deprecated":
response.headers['Deprecation'] = 'true'
if version_info.get("sunset"):
response.headers['Sunset'] = version_info["sunset"]
response.headers['Link'] = (
f'</api/{version_info["successor"]}>; rel="successor-version"'
)
return response
9. ์์ฒญ/์๋ต ์ํธํ¶
9.1 ์ ์ก ๊ณ์ธต ๋ณด์¶
"""
TLS ๊ตฌ์ฑ ๋ฐ ์ธ์ฆ์ ๊ณ ์ .
"""
# โโ TLS๋ฅผ ์ฌ์ฉํ Flask (๊ฐ๋ฐ) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# ๊ฐ๋ฐ์ฉ ์์ฒด ์๋ช
์ธ์ฆ์ ์์ฑ:
# openssl req -x509 -newkey rsa:4096 -nodes \
# -out cert.pem -keyout key.pem -days 365
from flask import Flask
app = Flask(__name__)
# TLS๋ก ์คํ (๊ฐ๋ฐ ์ ์ฉ)
# ํ๋ก๋์
์์๋ TLS๊ฐ ์ญ๋ฐฉํฅ ํ๋ก์(nginx, ๋ก๋ ๋ฐธ๋ฐ์)์์ ์ฒ๋ฆฌ๋จ
if __name__ == '__main__':
app.run(
ssl_context=('cert.pem', 'key.pem'),
host='0.0.0.0',
port=443,
)
# โโ ์ธ์ฆ์ ๊ณ ์ (API ํด๋ผ์ด์ธํธ์ฉ) โโโโโโโโโโโโโโโโโโโโโโโโโโโ
import requests
import hashlib
import ssl
def verify_certificate_pin(host: str, expected_pin: str) -> bool:
"""์๋ฒ ์ธ์ฆ์๊ฐ ์์ ํ๊ณผ ์ผ์นํ๋์ง ํ์ธ."""
import socket
context = ssl.create_default_context()
with socket.create_connection((host, 443)) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
cert_der = ssock.getpeercert(True)
cert_hash = hashlib.sha256(cert_der).hexdigest()
return cert_hash == expected_pin
# โโ ํ์ด๋ก๋ ์์ค ์ํธํ (๋ฏผ๊ฐํ ํ๋์ฉ) โโโโโโโโโโโโโโโโโโโโโโโโโ
from cryptography.fernet import Fernet
# ํค ์์ฑ (์ฝ๋๊ฐ ์๋ ์์ ํ๊ฒ ์ ์ฅ)
encryption_key = Fernet.generate_key()
cipher = Fernet(encryption_key)
def encrypt_sensitive_fields(data: dict, fields: list[str]) -> dict:
"""์๋ต ํ์ด๋ก๋์์ ํน์ ํ๋ ์ํธํ."""
result = data.copy()
for field in fields:
if field in result:
value = str(result[field]).encode()
result[field] = cipher.encrypt(value).decode()
return result
def decrypt_sensitive_fields(data: dict, fields: list[str]) -> dict:
"""์์ฒญ ํ์ด๋ก๋์์ ํน์ ํ๋ ๋ณตํธํ."""
result = data.copy()
for field in fields:
if field in result:
value = result[field].encode()
result[field] = cipher.decrypt(value).decode()
return result
10. ์ฐ์ต ๋ฌธ์ ¶
์ฐ์ต 1: ์์ ํ JWT ์ธ์ฆ ์๋น์ค¶
๋ค์์ ํฌํจํ๋ Flask๋ฅผ ์ฌ์ฉํ ์์ ํ JWT ์ธ์ฆ ์๋น์ค ๊ตฌ์ถ:
- ๋น๋ฐ๋ฒํธ ํด์ฑ์ด ์๋ ์ฌ์ฉ์ ๋ฑ๋ก (argon2 ๋๋ bcrypt)
- ์ก์ธ์ค + ๋ฆฌํ๋ ์ ํ ํฐ์ ๋ฐํํ๋ ๋ก๊ทธ์ธ ์๋ํฌ์ธํธ
- ๋ฆฌํ๋ ์ ํ ํฐ ์ํ์ด ์๋ ํ ํฐ ๋ฆฌํ๋ ์ ์๋ํฌ์ธํธ
- ํ ํฐ ๋ธ๋๋ฆฌ์คํธ๊ฐ ์๋ ๋ก๊ทธ์์ ์๋ํฌ์ธํธ (Redis ๋๋ ์ธ๋ฉ๋ชจ๋ฆฌ ์ธํธ ์ฌ์ฉ)
- "admin" ๋ฒ์๊ฐ ํ์ํ ๋ณดํธ๋ ์๋ํฌ์ธํธ
- ๋ง๋ฃ, ์๋ชป๋ ํ์ ๋ฐ ํ๊ธฐ๋ ํ ํฐ์ ๋ํ ์ ์ ํ ์ค๋ฅ ์ฒ๋ฆฌ
- ๋ก๊ทธ์ธ ์๋ํฌ์ธํธ์ ๋ํ ์๋ ์ ํ (IP๋น ๋ถ๋น 5ํ ์๋)
์ฐ์ต 2: CORS ๋ณด์ ๊ฐ์ฌ¶
๋ค์์ ์ํํ๋ Python ์คํฌ๋ฆฝํธ ์์ฑ:
- API URL ๋ชฉ๋ก ๊ฐ์ ธ์ค๊ธฐ
- ๋ค์ํ Origin ํค๋๋ก ์์ฒญ ์ ์ก
- API๊ฐ ์์์ ์ถ์ฒ๋ฅผ ๋ฐ์ํ๋์ง ํ ์คํธ (์ทจ์ฝ์ )
- ์๊ฒฉ ์ฆ๋ช ์ด ์์ผ๋์นด๋ ์ถ์ฒ์ ํจ๊ป ํ์ฉ๋๋์ง ํ์ธ
- ์ผ๋ฐ์ ์ธ ๋ฉ์๋ ๋ฐ ํค๋์ ๋ํ ํ๋ฆฌํ๋ผ์ดํธ ์ฒ๋ฆฌ ํ ์คํธ
- ๊ฐ ์๋ํฌ์ธํธ์ ๋ํ ๋ณด์ ๋ณด๊ณ ์ ์์ฑ
์ฐ์ต 3: GraphQL ๋ณด์ ๋ฏธ๋ค์จ์ด¶
๋ค์์ ์ํํ๋ GraphQL ๋ณด์ ๋ฏธ๋ค์จ์ด ๊ตฌํ:
- ๊ตฌ์ฑ ๊ฐ๋ฅํ ์ต๋๊ฐ์ผ๋ก ์ฟผ๋ฆฌ ๊น์ด ์ ํ (๊ธฐ๋ณธ๊ฐ: 10)
- ์ฟผ๋ฆฌ ๋น์ฉ ๊ณ์ฐ ๋ฐ ๋น์ฉ์ด ๋ง์ด ๋๋ ์ฟผ๋ฆฌ ๊ฑฐ๋ถ
- ํ๋ก๋์ ์์ ์ธํธ๋ก์คํ์ ๋นํ์ฑํ
- ๋ฐฐ์น ์ฟผ๋ฆฌ ํฌ๊ธฐ ์ ํ
- ๋น์ฉ ๋ฐ ์คํ ์๊ฐ๊ณผ ํจ๊ป ๋ชจ๋ ์ฟผ๋ฆฌ ๋ก๊ทธ
- ํด์ ํ์ฉ ๋ชฉ๋ก์ด ์๋ ์ง์ ์ฟผ๋ฆฌ ๊ตฌํ
์ฐ์ต 4: ์ฌ๋ผ์ด๋ฉ ์๋์ฐ๊ฐ ์๋ ์๋ ์ ํ๊ธฐ¶
๋ค์์ ์ํํ๋ ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ๋ก๊ทธ ์๋ ์ ํ๊ธฐ ๊ตฌํ:
- Redis๋ฅผ ๋ฐฑ์ ์ ์ฅ์๋ก ์ฌ์ฉ (๋๋ ์ธ๋ฉ๋ชจ๋ฆฌ ์๋ฎฌ๋ ์ด์ )
- ๊ฐ ์์ฒญ์ ์ ํํ ํ์์คํฌํ ์ถ์
- ๊ตฌ์ฑ ๊ฐ๋ฅํ ์๋์ฐ ์ง์ (์ด๋น, ๋ถ๋น, ์๊ฐ๋น)
- ๋ค๋ฅธ API ํค ํฐ์ด์ ๋ํ ๋ค๋ฅธ ์ ํ ์ง์
- ์ ์ ํ ์๋ ์ ํ ํค๋ ๋ฐํ (X-RateLimit-Limit, Remaining, Reset)
- ๋ถ์ฐ ๋ฐฐํฌ ์ฒ๋ฆฌ (์ฌ๋ฌ ์๋ฒ ์ธ์คํด์ค)
์ฐ์ต 5: API ์ ๋ ฅ ๊ฒ์ฆ ํ๋ ์์ํฌ¶
๋ค์์ ์ํํ๋ ์ฌ์ฌ์ฉ ๊ฐ๋ฅํ ๊ฒ์ฆ ํ๋ ์์ํฌ ๊ตฌ์ถ:
- ์คํค๋ง์ ๋ํ JSON ์์ฒญ ๋ณธ๋ฌธ ๊ฒ์ฆ
- ํ์ ๊ฐ์ ๋ฅผ ์ฌ์ฉํ ์ฟผ๋ฆฌ ๋งค๊ฐ๋ณ์ ๊ฒ์ฆ
- ๊ฒฝ๋ก ๋งค๊ฐ๋ณ์ ๊ฒ์ฆ
- ์ค์ฒฉ ๊ฐ์ฒด ๊ฒ์ฆ ์ง์
- ์ผ๊ด๋ ์ค๋ฅ ์๋ต ํ์ ๋ฐํ (RFC 7807)
- ๋๋ ํ ๋น์ผ๋ก๋ถํฐ ๋ณดํธ (์ ์ ์๋ ํ๋ ๊ฑฐ๋ถ)
- ๋ฌธ์์ด ์ ๋ ฅ ์ ์ (๊ณต๋ฐฑ ์ ๊ฑฐ, ์ ๋์ฝ๋ ์ ๊ทํ)
์ฐ์ต 6: API ๋ณด์ ์ค์บ๋¶
๋ค์์ ๋ํ API๋ฅผ ํ ์คํธํ๋ ๋ณด์ ์ค์บ๋ ๋๊ตฌ ์์ฑ:
- ์๋ํฌ์ธํธ์์ ์ธ์ฆ ๋๋ฝ
- ๊นจ์ง ๊ฐ์ฒด ์์ค ์ธ๊ฐ (BOLA/IDOR)
- ์๋ ์ ํ ๋๋ฝ
- ๋ด๋ถ ์ธ๋ถ ์ ๋ณด๋ฅผ ๋๋ฌ๋ด๋ ์์ธํ ์ค๋ฅ ๋ฉ์์ง
- ๋ณด์ ํค๋ ๋๋ฝ
- CORS ์๋ชป๋ ๊ตฌ์ฑ
- ์ฌ๊ฐ๋ ๋ฑ๊ธ์ด ์๋ ๋ณด๊ณ ์ ์์ฑ
์์ฝ¶
API ๋ณด์ ์ฒดํฌ๋ฆฌ์คํธ¶
| ๋ฒ์ฃผ | ํญ๋ชฉ | ์ฐ์ ์์ |
|---|---|---|
| ์ธ์ฆ | ์ฌ์ฉ์์๊ฒ OAuth 2.0 ๋๋ JWT ์ฌ์ฉ (API ํค๋ง ์ฌ์ฉํ์ง ๋ง ๊ฒ) | Critical |
| ์ธ์ฆ | ๋จ๊ธฐ ์ก์ธ์ค ํ ํฐ (15๋ถ) | Critical |
| ์ธ์ฆ | ๋ฆฌํ๋ ์ ํ ํฐ ์ํ | High |
| ์ธ๊ฐ | ๊ฐ์ฒด ์์ค ๊ถํ ํ์ธ (BOLA ๋ฐฉ์ง) | Critical |
| ์ธ๊ฐ | ๋ชจ๋ ์์ฒญ์์ ๋ฒ์ ๊ฒ์ฆ | Critical |
| ์๋ ์ ํ | IP๋น ๋ฐ ์ฌ์ฉ์๋น ์๋ ์ ํ | High |
| ์๋ ์ ํ | ์ธ์ฆ ์๋ํฌ์ธํธ์ ๋ํ ์๊ฒฉํ ์ ํ | High |
| ์ ๋ ฅ ๊ฒ์ฆ | ๋ชจ๋ ์ ๋ ฅ์ ๋ํ ์คํค๋ง ๊ฒ์ฆ | Critical |
| ์ ๋ ฅ ๊ฒ์ฆ | ์ ์ ์๋ ํ๋ ๊ฑฐ๋ถ | High |
| CORS | ํน์ ์ถ์ฒ ํ์ฉ ๋ชฉ๋ก (์๊ฒฉ ์ฆ๋ช ๊ณผ ํจ๊ป ์์ผ๋์นด๋ ์์) | Critical |
| CORS | Vary: Origin ํค๋ | Medium |
| ์ํธํ | ์ด๋์๋ TLS (HTTPS๋ง) | Critical |
| ๋ก๊น | ๊ณ ์ ํ ์์ฒญ ID๋ก ๋ชจ๋ ์์ฒญ ๋ก๊ทธ | High |
| ๋ฒ์ ๊ด๋ฆฌ | ์ผ๋ชฐ ๋ ์ง๋ก ๊ตฌ๋ฒ์ ํ๊ธฐ | Medium |
| ๊ฒ์ดํธ์จ์ด | ์ค์ ์ง์ค์ ์ธ์ฆ ๋ฐ ์๋ ์ ํ | Recommended |
ํต์ฌ ์์ ¶
- ์ฌ์ธต ๋ฐฉ์ด โ ๋ชจ๋ ๊ณ์ธต(๋คํธ์ํฌ, ๊ฒ์ดํธ์จ์ด, ์ ํ๋ฆฌ์ผ์ด์ , ๋ฐ์ดํฐ๋ฒ ์ด์ค)์์ ๋ณด์ ์ ์ฉ
- ํด๋ผ์ด์ธํธ ์ ๋ ฅ์ ์ ๋ ์ ๋ขฐํ์ง ๋ง ๊ฒ โ ์๋ฒ์์ ๋ชจ๋ ๊ฒ์ ๋งค๋ฒ ๊ฒ์ฆ
- ํ๋ฆฝ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ฌ์ฉ โ ์์ฒด ์ธ์ฆ ๋๋ ์ํธํ๋ฅผ ๋ง๋ค์ง ๋ง ๊ฒ
- ๋ชจ๋ํฐ๋ง ๋ฐ ๊ฒฝ๊ณ โ ๋ชจ๋ํฐ๋ง ์๋ ๋ณด์์ ๋ถ์์ ํจ
- API ๋ณด์ ๋ฌธ์ํ โ ๋ช ํ์ฑ์ ์ํด OpenAPI ๋ณด์ ์คํด ์ฌ์ฉ
์ด์ : 09_Web_Security_Headers.md | ๋ค์: 11_Secrets_Management.md