API Security
API Security¶
Previous: 09_Web_Security_Headers.md | Next: 11_Secrets_Management.md
APIs are the backbone of modern software systems. They connect microservices, power mobile applications, and enable third-party integrations. Because APIs expose application logic and data directly, they are a prime target for attackers. A single misconfigured endpoint can expose millions of records. This lesson covers the essential security practices for building, deploying, and maintaining secure APIs β from authentication and rate limiting to input validation and CORS configuration.
Learning Objectives¶
- Implement robust API authentication using API keys, OAuth 2.0, and JWT
- Design and deploy rate limiting strategies to prevent abuse
- Validate and sanitize all API inputs to prevent injection attacks
- Configure CORS correctly to control cross-origin access
- Secure GraphQL endpoints against common attack patterns
- Define security schemes in OpenAPI/Swagger specifications
- Apply API gateway security patterns for production environments
1. API Threat Landscape¶
1.1 OWASP API Security Top 10 (2023)¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β OWASP API Security Top 10 (2023) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β API1 Broken Object Level Authorization (BOLA) β
β Accessing resources belonging to other users β
β GET /api/users/OTHER_USER_ID/orders β
β β
β API2 Broken Authentication β
β Weak authentication mechanisms, token leakage β
β β
β API3 Broken Object Property Level Authorization β
β Mass assignment, exposing sensitive properties β
β β
β API4 Unrestricted Resource Consumption β
β No rate limiting, large payloads, expensive queries β
β β
β API5 Broken Function Level Authorization β
β Accessing admin functions as regular user β
β POST /api/admin/users/delete β
β β
β API6 Unrestricted Access to Sensitive Business Flows β
β Automated abuse of business features β
β β
β API7 Server-Side Request Forgery (SSRF) β
β Fetching URLs from user input without validation β
β β
β API8 Security Misconfiguration β
β Missing headers, verbose errors, default creds β
β β
β API9 Improper Inventory Management β
β Unmanaged API versions, shadow APIs β
β β
β API10 Unsafe Consumption of APIs β
β Trusting third-party API responses blindly β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 API Attack Surface¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β API Attack Surface β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Client βββββΆβ Network βββββΆβ API βββββΆβ Database β β
β β β β β β Server β β β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β
β Attack vectors at each layer: β
β β
β Client: Tampered requests, stolen tokens, replay attacks β
β Network: Man-in-the-middle, eavesdropping, DNS hijacking β
β API Server: Injection, broken auth, BOLA, mass assignment β
β Database: SQL injection, data exfiltration, unauthorized access β
β β
β Cross-cutting concerns: β
β βββ Authentication (Who are you?) β
β βββ Authorization (What can you access?) β
β βββ Input validation (Is this request safe?) β
β βββ Rate limiting (Are you abusing the API?) β
β βββ Encryption (Is data protected in transit?) β
β βββ Logging/Monitoring (Can we detect attacks?) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2. API Authentication Patterns¶
2.1 API Keys¶
"""
API Key authentication β simple but limited.
Suitable for server-to-server communication and public APIs
with usage tracking.
"""
import secrets
import hashlib
from flask import Flask, request, jsonify, abort
from functools import wraps
from datetime import datetime
app = Flask(__name__)
# ββ API Key Generation βββββββββββββββββββββββββββββββββββββββββββ
def generate_api_key() -> tuple[str, str]:
"""Generate an API key and its hash for storage."""
# Generate a cryptographically secure key
# Prefix helps identify the key type and version
raw_key = f"sk_live_{secrets.token_urlsafe(32)}"
# Store only the hash β never store the raw key
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
return raw_key, key_hash
# raw_key: sk_live_Abc123... (sent to client once, never stored)
# key_hash: e3b0c4... (stored in database)
# ββ Simulated key storage (use a real database in production) ββββ
API_KEYS = {
# hash -> metadata
hashlib.sha256(b"sk_live_test_key_12345").hexdigest(): {
"client_id": "client_001",
"name": "Test Client",
"rate_limit": 100, # requests per minute
"scopes": ["read", "write"],
"created_at": "2025-01-01",
"last_used": None,
}
}
def require_api_key(f):
"""Decorator to require a valid API key."""
@wraps(f)
def decorated(*args, **kwargs):
# Check multiple locations for the API key
api_key = (
request.headers.get('X-API-Key') or
request.headers.get('Authorization', '').replace('Bearer ', '') or
request.args.get('api_key') # Less secure, avoid if possible
)
if not api_key:
return jsonify({
"error": "missing_api_key",
"message": "API key is required. "
"Pass it in the X-API-Key header."
}), 401
# Hash the provided key and look it up
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
key_data = API_KEYS.get(key_hash)
if not key_data:
return jsonify({
"error": "invalid_api_key",
"message": "The provided API key is not valid."
}), 401
# Update last used timestamp
key_data["last_used"] = datetime.utcnow().isoformat()
# Store key metadata in request context
request.api_client = key_data
return f(*args, **kwargs)
return decorated
@app.route('/api/data')
@require_api_key
def get_data():
"""Protected endpoint requiring API key."""
client = request.api_client
return jsonify({
"client": client["name"],
"data": [1, 2, 3]
})
# ββ API Key Security Best Practices βββββββββββββββββββββββββββββ
"""
1. Transmission:
- Always use HTTPS
- Prefer headers over query parameters (query params appear in logs)
- Use X-API-Key header or Authorization: Bearer <key>
2. Storage:
- Hash keys before storing (SHA-256 minimum)
- Never log full API keys
- Show only last 4 characters in UI: sk_live_****5678
3. Rotation:
- Support multiple active keys per client
- Allow key rotation without downtime
- Set expiration dates on keys
4. Scoping:
- Assign scopes/permissions to each key
- Use separate keys for read vs write operations
- Use separate keys for test vs production
5. Limitations:
- API keys identify applications, not users
- They lack built-in expiration (unlike tokens)
- They cannot be easily scoped per-request
- Use OAuth 2.0 for user-context authorization
"""
2.2 OAuth 2.0¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β OAuth 2.0 Authorization Code Flow β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. User clicks "Login with Provider" β
β Client βββββββββββββββββββββββββββΆ Auth Server β
β GET /authorize?response_type=code β
β &client_id=CLIENT_ID β
β &redirect_uri=CALLBACK_URL β
β &scope=read+write β
β &state=RANDOM_STATE β
β β
β 2. User authenticates and approves scopes β
β Auth Server ββββββββββββββββββββββΆ Client Callback β
β GET /callback?code=AUTH_CODE&state=RANDOM_STATE β
β β
β 3. Client exchanges code for tokens β
β Client βββββββββββββββββββββββββββΆ Auth Server β
β POST /token β
β grant_type=authorization_code β
β &code=AUTH_CODE β
β &client_id=CLIENT_ID β
β &client_secret=CLIENT_SECRET β
β &redirect_uri=CALLBACK_URL β
β β
β 4. Auth server returns tokens β
β Auth Server ββββββββββββββββββββββΆ Client β
β { "access_token": "...", β
β "refresh_token": "...", β
β "token_type": "Bearer", β
β "expires_in": 3600 } β
β β
β 5. Client uses access token β
β Client βββββββββββββββββββββββββββΆ Resource Server β
β Authorization: Bearer ACCESS_TOKEN β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
"""
OAuth 2.0 implementation with Flask (server-side).
"""
import requests
import secrets
from flask import Flask, redirect, request, session, jsonify
from urllib.parse import urlencode
app = Flask(__name__)
app.secret_key = secrets.token_hex(32)
# OAuth 2.0 configuration (example: GitHub)
OAUTH_CONFIG = {
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"authorize_url": "https://github.com/login/oauth/authorize",
"token_url": "https://github.com/login/oauth/access_token",
"api_url": "https://api.github.com/user",
"redirect_uri": "http://localhost:5000/callback",
"scope": "read:user user:email",
}
@app.route('/login')
def login():
"""Initiate OAuth 2.0 Authorization Code flow."""
# Generate state parameter to prevent CSRF
state = secrets.token_urlsafe(32)
session['oauth_state'] = state
# Build authorization URL
params = {
"client_id": OAUTH_CONFIG["client_id"],
"redirect_uri": OAUTH_CONFIG["redirect_uri"],
"scope": OAUTH_CONFIG["scope"],
"state": state,
"response_type": "code",
}
auth_url = f"{OAUTH_CONFIG['authorize_url']}?{urlencode(params)}"
return redirect(auth_url)
@app.route('/callback')
def callback():
"""Handle OAuth 2.0 callback."""
# ββ Verify state parameter βββββββββββββββββββββββββββββββββββ
state = request.args.get('state')
if state != session.pop('oauth_state', None):
return jsonify({"error": "Invalid state parameter"}), 400
# ββ Check for error response βββββββββββββββββββββββββββββββββ
error = request.args.get('error')
if error:
return jsonify({
"error": error,
"description": request.args.get('error_description', '')
}), 400
# ββ Exchange authorization code for tokens βββββββββββββββββββ
code = request.args.get('code')
token_response = requests.post(
OAUTH_CONFIG["token_url"],
data={
"client_id": OAUTH_CONFIG["client_id"],
"client_secret": OAUTH_CONFIG["client_secret"],
"code": code,
"redirect_uri": OAUTH_CONFIG["redirect_uri"],
"grant_type": "authorization_code",
},
headers={"Accept": "application/json"},
timeout=10,
)
token_data = token_response.json()
if "error" in token_data:
return jsonify(token_data), 400
access_token = token_data["access_token"]
# ββ Fetch user information βββββββββββββββββββββββββββββββββββ
user_response = requests.get(
OAUTH_CONFIG["api_url"],
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
},
timeout=10,
)
user_data = user_response.json()
# Store in session (or create/update user in database)
session['user'] = {
"id": user_data["id"],
"name": user_data.get("name", user_data["login"]),
"email": user_data.get("email"),
}
return redirect('/dashboard')
# ββ OAuth 2.0 Security Checklist ββββββββββββββββββββββββββββββββ
"""
1. Always use the state parameter (prevents CSRF)
2. Validate redirect_uri exactly (no open redirect)
3. Use Authorization Code flow (not Implicit for server apps)
4. Store tokens securely (encrypted, server-side)
5. Use PKCE for public clients (SPAs, mobile apps)
6. Validate token scopes on every request
7. Use short-lived access tokens + refresh tokens
8. Revoke tokens on logout
"""
2.3 JWT (JSON Web Tokens)¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β JWT Structure β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β eyJhbGci... . eyJzdWIi... . SflKxwRJ... β
β βββββββββ βββββββββ ββββββββββ β
β Header Payload Signature β
β β
β Header (base64url): β
β { β
β "alg": "RS256", β
β "typ": "JWT", β
β "kid": "key-id-123" β
β } β
β β
β Payload (base64url): β
β { β
β "sub": "user_123", // Subject (user ID) β
β "iss": "api.example.com", // Issuer β
β "aud": "app.example.com", // Audience β
β "exp": 1700000000, // Expiration time β
β "iat": 1699996400, // Issued at β
β "nbf": 1699996400, // Not before β
β "jti": "unique-token-id", // JWT ID (for revocation) β
β "scope": "read write", // Custom claims β
β "role": "admin" β
β } β
β β
β Signature: β
β RSASHA256(base64url(header) + "." + base64url(payload), key) β
β β
β IMPORTANT: Payload is NOT encrypted β it is only base64url β
β encoded. Anyone can read it. Never store secrets in JWT. β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
"""
JWT authentication with PyJWT β secure implementation.
"""
import jwt
import time
import uuid
from datetime import datetime, timedelta, timezone
from flask import Flask, request, jsonify
from functools import wraps
app = Flask(__name__)
# ββ Key configuration ββββββββββββββββββββββββββββββββββββββββββββ
# Use RS256 (asymmetric) for production
# Private key signs tokens; public key verifies them
# This allows microservices to verify without the private key
# For this example, we use HS256 (symmetric) for simplicity
JWT_SECRET = "your-256-bit-secret-change-this" # Use env var in production
JWT_ALGORITHM = "HS256"
JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=15)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
# ββ Token blacklist (use Redis in production) ββββββββββββββββββββ
revoked_tokens = set()
def create_access_token(user_id: str, role: str = "user",
scopes: list[str] = None) -> str:
"""Create a short-lived access token."""
now = datetime.now(timezone.utc)
payload = {
"sub": user_id,
"iss": "api.example.com",
"aud": "app.example.com",
"iat": now,
"exp": now + JWT_ACCESS_TOKEN_EXPIRES,
"nbf": now,
"jti": str(uuid.uuid4()), # Unique ID for revocation
"type": "access",
"role": role,
"scopes": scopes or ["read"],
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def create_refresh_token(user_id: str) -> str:
"""Create a long-lived refresh token."""
now = datetime.now(timezone.utc)
payload = {
"sub": user_id,
"iss": "api.example.com",
"iat": now,
"exp": now + JWT_REFRESH_TOKEN_EXPIRES,
"jti": str(uuid.uuid4()),
"type": "refresh",
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def decode_token(token: str) -> dict:
"""Decode and validate a JWT token."""
try:
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM],
issuer="api.example.com",
audience="app.example.com",
options={
"require": ["exp", "iat", "sub", "iss", "aud", "jti"],
"verify_exp": True,
"verify_iss": True,
"verify_aud": True,
"verify_nbf": True,
}
)
# Check if token has been revoked
if payload["jti"] in revoked_tokens:
raise jwt.InvalidTokenError("Token has been revoked")
return payload
except jwt.ExpiredSignatureError:
raise ValueError("Token has expired")
except jwt.InvalidAudienceError:
raise ValueError("Invalid token audience")
except jwt.InvalidIssuerError:
raise ValueError("Invalid token issuer")
except jwt.InvalidTokenError as e:
raise ValueError(f"Invalid token: {e}")
def require_auth(scopes: list[str] = None):
"""Decorator to require JWT authentication with optional scope check."""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Extract token from Authorization header
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({
"error": "missing_token",
"message": "Authorization header with Bearer token required"
}), 401
token = auth_header.split(' ', 1)[1]
try:
payload = decode_token(token)
except ValueError as e:
return jsonify({
"error": "invalid_token",
"message": str(e)
}), 401
# Verify token type
if payload.get("type") != "access":
return jsonify({
"error": "wrong_token_type",
"message": "Access token required"
}), 401
# Check scopes
if scopes:
token_scopes = set(payload.get("scopes", []))
required_scopes = set(scopes)
if not required_scopes.issubset(token_scopes):
missing = required_scopes - token_scopes
return jsonify({
"error": "insufficient_scope",
"message": f"Missing scopes: {', '.join(missing)}"
}), 403
request.user = payload
return f(*args, **kwargs)
return decorated
return decorator
# ββ Routes βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.route('/api/login', methods=['POST'])
def login():
"""Authenticate and return JWT tokens."""
data = request.get_json()
username = data.get('username')
password = data.get('password')
# Validate credentials (use bcrypt/argon2 hash comparison)
# This is simplified β see Authentication lesson for details
user = authenticate_user(username, password)
if not user:
# Use consistent timing to prevent user enumeration
return jsonify({
"error": "invalid_credentials",
"message": "Invalid username or password"
}), 401
# Generate token pair
access_token = create_access_token(
user_id=user["id"],
role=user["role"],
scopes=user["scopes"]
)
refresh_token = create_refresh_token(user_id=user["id"])
return jsonify({
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": int(JWT_ACCESS_TOKEN_EXPIRES.total_seconds()),
})
@app.route('/api/refresh', methods=['POST'])
def refresh():
"""Exchange a refresh token for a new access token."""
data = request.get_json()
refresh_token = data.get('refresh_token')
if not refresh_token:
return jsonify({"error": "missing_refresh_token"}), 400
try:
# Decode without audience check (refresh tokens may differ)
payload = jwt.decode(
refresh_token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM],
issuer="api.example.com",
options={"require": ["exp", "sub", "jti", "iss"]}
)
if payload.get("type") != "refresh":
raise ValueError("Not a refresh token")
if payload["jti"] in revoked_tokens:
raise ValueError("Refresh token has been revoked")
except (jwt.InvalidTokenError, ValueError) as e:
return jsonify({"error": "invalid_refresh_token",
"message": str(e)}), 401
# Revoke old refresh token (rotation)
revoked_tokens.add(payload["jti"])
# Issue new token pair
user_id = payload["sub"]
# Look up current user roles/scopes from database
user = get_user_by_id(user_id)
new_access = create_access_token(
user_id=user_id,
role=user["role"],
scopes=user["scopes"]
)
new_refresh = create_refresh_token(user_id=user_id)
return jsonify({
"access_token": new_access,
"refresh_token": new_refresh,
"token_type": "Bearer",
"expires_in": int(JWT_ACCESS_TOKEN_EXPIRES.total_seconds()),
})
@app.route('/api/logout', methods=['POST'])
@require_auth()
def logout():
"""Revoke the current access token."""
revoked_tokens.add(request.user["jti"])
return jsonify({"message": "Successfully logged out"}), 200
@app.route('/api/protected')
@require_auth(scopes=["read"])
def protected_resource():
"""Protected endpoint requiring 'read' scope."""
return jsonify({
"user_id": request.user["sub"],
"message": "You have access to this protected resource"
})
# Placeholder functions for completeness
def authenticate_user(username, password):
"""Placeholder β implement with proper password hashing."""
return None
def get_user_by_id(user_id):
"""Placeholder β implement with database lookup."""
return {"id": user_id, "role": "user", "scopes": ["read"]}
2.4 JWT Security Best Practices¶
"""
JWT security best practices and common pitfalls.
"""
# ββ PITFALL 1: Using 'none' algorithm βββββββββββββββββββββββββββ
# Attack: Change header to {"alg": "none"} and remove signature
# Defense: Always specify allowed algorithms explicitly
payload = jwt.decode(
token, key,
algorithms=["RS256"], # NEVER include "none" or allow all
)
# ββ PITFALL 2: Confusing HS256 and RS256 ββββββββββββββββββββββββ
# Attack: If server uses RS256, attacker might:
# 1. Get the public key (it is public)
# 2. Sign a new token with HS256 using the public key as secret
# 3. Server treats public key as HMAC secret
# Defense: Strictly validate algorithm in header
payload = jwt.decode(
token, rsa_public_key,
algorithms=["RS256"], # Only allow expected algorithm
)
# ββ PITFALL 3: Missing expiration βββββββββββββββββββββββββββββββ
# Tokens without exp claim live forever
# Defense: Always set short expiration
payload = {
"sub": "user_123",
"exp": datetime.now(timezone.utc) + timedelta(minutes=15),
}
# ββ PITFALL 4: Storing sensitive data in payload βββββββββββββββββ
# JWT payload is base64url-encoded, NOT encrypted
# Anyone can decode it without the key
import base64
header, payload_b64, signature = token.split('.')
decoded = base64.urlsafe_b64decode(payload_b64 + '==')
# Entire payload is now readable!
# NEVER include: passwords, SSNs, credit cards, PII in JWT
# Only include: user ID, role, scopes, expiration
# ββ PITFALL 5: No token revocation mechanism βββββββββββββββββββββ
# JWTs are self-contained β the server cannot invalidate them
# Solutions:
# 1. Short-lived access tokens (15 min) + refresh token rotation
# 2. Token blacklist (Redis with TTL = token exp)
# 3. Token versioning (store token version in DB, check on each request)
# 4. Change signing key (invalidates ALL tokens β nuclear option)
# ββ RS256 Setup (Production Recommended) ββββββββββββββββββββββββ
"""
# Generate RSA key pair
openssl genrsa -out private.pem 2048
openssl rsa -in private.pem -pubout -out public.pem
# Private key: Used by auth server to SIGN tokens
# Public key: Used by all services to VERIFY tokens
# Share public key freely; guard private key
"""
from cryptography.hazmat.primitives import serialization
# Load keys
with open('private.pem', 'rb') as f:
private_key = serialization.load_pem_private_key(f.read(), password=None)
with open('public.pem', 'rb') as f:
public_key = serialization.load_pem_public_key(f.read())
# Sign with private key
token = jwt.encode(payload, private_key, algorithm="RS256")
# Verify with public key (any service can do this)
decoded = jwt.decode(token, public_key, algorithms=["RS256"])
3. Rate Limiting¶
3.1 Rate Limiting Algorithms¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Rate Limiting Algorithms β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Fixed Window β
β ββββββββββββββββββββββββββββββββββββ β
β β Window 1 ββ Window 2 ββ Window 3 β β
β β β β β β β ββ β β β ββ β β β β β β β β β
β β 5/10 ββ 3/10 ββ 7/10 β (limit: 10 per window) β
β ββββββββββββββββββββββββββββββββββββ β
β Pro: Simple. Con: Burst at window boundary (2x limit) β
β β
β 2. Sliding Window Log β
β Time: βββββββββ[====current window====]ββββββββ β
β Track exact timestamp of each request β
β Count requests within sliding window β
β Pro: Accurate. Con: Memory-intensive (stores all timestamps) β
β β
β 3. Sliding Window Counter β
β Combine fixed window counts with weighted overlap β
β weight = (window_size - elapsed) / window_size β
β count = prev_count * weight + current_count β
β Pro: Memory-efficient. Con: Approximate β
β β
β 4. Token Bucket β
β βββββββββββ β
β β β β β β β β Bucket (capacity: 10 tokens) β
β β β β β β β
β βββββββββββ β
β β β
β Refill: 1 token/second β
β Each request consumes 1 token β
β Pro: Allows bursts. Con: More state to manage β
β β
β 5. Leaky Bucket β
β Requests enter a queue (bucket) β
β Processed at a fixed rate (leak rate) β
β Overflow is rejected β
β Pro: Smooth output rate. Con: Delays even when capacity exists β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3.2 Flask Rate Limiting with Flask-Limiter¶
"""
Rate limiting implementation with Flask-Limiter.
pip install Flask-Limiter
"""
from flask import Flask, jsonify, request
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
app = Flask(__name__)
# ββ Basic setup ββββββββββββββββββββββββββββββββββββββββββββββββββ
limiter = Limiter(
app=app,
key_func=get_remote_address, # Rate limit by IP address
default_limits=["200 per day", "50 per hour"],
storage_uri="redis://localhost:6379", # Use Redis for distributed
# storage_uri="memory://", # In-memory for development
strategy="fixed-window-elastic-expiry",
)
# ββ Global rate limit (applies to all routes) βββββββββββββββββββ
# Already set via default_limits above
# ββ Per-route rate limits ββββββββββββββββββββββββββββββββββββββββ
@app.route('/api/search')
@limiter.limit("10 per minute")
def search():
"""Search endpoint β stricter limit to prevent scraping."""
query = request.args.get('q', '')
return jsonify({"query": query, "results": []})
@app.route('/api/login', methods=['POST'])
@limiter.limit("5 per minute") # Prevent brute force
def login():
"""Login endpoint with aggressive rate limiting."""
return jsonify({"message": "login"})
@app.route('/api/data')
@limiter.limit("100 per hour")
@limiter.limit("10 per minute") # Multiple limits
def get_data():
"""Data endpoint with tiered rate limits."""
return jsonify({"data": []})
# ββ Dynamic rate limit based on API key tier ββββββββββββββββββββ
def get_rate_limit_by_tier():
"""Return rate limit string based on the client's API tier."""
api_key = request.headers.get('X-API-Key', '')
# Look up tier from database (simplified)
tiers = {
"free": "100 per hour",
"pro": "1000 per hour",
"enterprise": "10000 per hour",
}
tier = get_tier_for_key(api_key)
return tiers.get(tier, "50 per hour") # Default to free
@app.route('/api/premium')
@limiter.limit(get_rate_limit_by_tier)
def premium_endpoint():
"""Endpoint with tier-based rate limiting."""
return jsonify({"data": "premium"})
# ββ Rate limit headers ββββββββββββββββββββββββββββββββββββββββββ
# Flask-Limiter automatically adds these headers:
# X-RateLimit-Limit: 100
# X-RateLimit-Remaining: 95
# X-RateLimit-Reset: 1699999999
# Retry-After: 60 (when limit exceeded)
# ββ Custom error handler ββββββββββββββββββββββββββββββββββββββββ
@app.errorhandler(429)
def ratelimit_handler(e):
"""Custom response when rate limit is exceeded."""
return jsonify({
"error": "rate_limit_exceeded",
"message": "Too many requests. Please try again later.",
"retry_after": e.description,
}), 429
# Placeholder
def get_tier_for_key(api_key):
return "free"
3.3 Custom Token Bucket Implementation¶
"""
Token bucket rate limiter implementation from scratch.
"""
import time
import threading
from dataclasses import dataclass, field
@dataclass
class TokenBucket:
"""Token bucket rate limiter.
Args:
capacity: Maximum number of tokens in the bucket
refill_rate: Tokens added per second
"""
capacity: int
refill_rate: float
tokens: float = field(init=False)
last_refill: float = field(init=False)
lock: threading.Lock = field(default_factory=threading.Lock, init=False)
def __post_init__(self):
self.tokens = float(self.capacity)
self.last_refill = time.monotonic()
def _refill(self):
"""Add tokens based on elapsed time."""
now = time.monotonic()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
def consume(self, tokens: int = 1) -> bool:
"""Try to consume tokens. Returns True if allowed."""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_time(self) -> float:
"""Returns seconds until at least 1 token is available."""
with self.lock:
self._refill()
if self.tokens >= 1:
return 0.0
return (1 - self.tokens) / self.refill_rate
class RateLimiterStore:
"""Manage rate limiters per client key."""
def __init__(self, capacity: int = 100, refill_rate: float = 10.0):
self.capacity = capacity
self.refill_rate = refill_rate
self._buckets: dict[str, TokenBucket] = {}
self._lock = threading.Lock()
def get_bucket(self, key: str) -> TokenBucket:
"""Get or create a token bucket for the given key."""
if key not in self._buckets:
with self._lock:
if key not in self._buckets:
self._buckets[key] = TokenBucket(
capacity=self.capacity,
refill_rate=self.refill_rate
)
return self._buckets[key]
def is_allowed(self, key: str, tokens: int = 1) -> bool:
"""Check if a request is allowed for the given key."""
bucket = self.get_bucket(key)
return bucket.consume(tokens)
# ββ Usage with Flask βββββββββββββββββββββββββββββββββββββββββββββ
from flask import Flask, request, jsonify
from functools import wraps
app = Flask(__name__)
rate_limiter = RateLimiterStore(capacity=100, refill_rate=10.0)
def rate_limit(capacity=100, refill_rate=10.0):
"""Custom rate limit decorator."""
store = RateLimiterStore(capacity=capacity, refill_rate=refill_rate)
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Use IP + endpoint as the rate limit key
client_ip = request.remote_addr
key = f"{client_ip}:{request.endpoint}"
bucket = store.get_bucket(key)
if not bucket.consume():
wait = bucket.wait_time()
return jsonify({
"error": "rate_limit_exceeded",
"retry_after": round(wait, 2)
}), 429
response = f(*args, **kwargs)
return response
return decorated
return decorator
@app.route('/api/resource')
@rate_limit(capacity=10, refill_rate=1.0) # 10 burst, 1/sec sustained
def get_resource():
return jsonify({"data": "resource"})
4. Input Validation and Sanitization¶
4.1 Validation Architecture¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Input Validation Layers β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Request ββ¬ββ Layer 1: Schema Validation β
β β (structure, types, required fields) β
β β β
β βββ Layer 2: Business Validation β
β β (ranges, formats, consistency) β
β β β
β βββ Layer 3: Sanitization β
β β (trim whitespace, normalize, encode) β
β β β
β βββ Layer 4: Parameterized Operations β
β (SQL parameterization, template escaping) β
β β
β Principle: Validate early, fail fast, never trust client data. β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
4.2 Schema Validation with Marshmallow¶
"""
API input validation using Marshmallow schemas.
pip install marshmallow
"""
from marshmallow import (
Schema, fields, validate, validates, validates_schema,
ValidationError, pre_load, RAISE
)
from flask import Flask, request, jsonify
app = Flask(__name__)
# ββ Schema Definition ββββββββββββββββββββββββββββββββββββββββββββ
class UserCreateSchema(Schema):
"""Schema for creating a new user."""
class Meta:
# Raise error on unknown fields (prevents mass assignment)
unknown = RAISE
username = fields.String(
required=True,
validate=[
validate.Length(min=3, max=30),
validate.Regexp(
r'^[a-zA-Z0-9_]+$',
error="Username must contain only letters, numbers, underscores"
),
]
)
email = fields.Email(required=True)
password = fields.String(
required=True,
load_only=True, # Never include in serialized output
validate=validate.Length(min=12, max=128),
)
age = fields.Integer(
validate=validate.Range(min=13, max=150),
load_default=None,
)
role = fields.String(
validate=validate.OneOf(["user", "moderator"]),
load_default="user",
# Note: "admin" is not allowed via API β only via database
)
@validates('password')
def validate_password_complexity(self, value):
"""Enforce password complexity requirements."""
errors = []
if not any(c.isupper() for c in value):
errors.append("Must contain at least one uppercase letter")
if not any(c.islower() for c in value):
errors.append("Must contain at least one lowercase letter")
if not any(c.isdigit() for c in value):
errors.append("Must contain at least one digit")
if not any(c in '!@#$%^&*()_+-=[]{}|;:,.<>?' for c in value):
errors.append("Must contain at least one special character")
if errors:
raise ValidationError(errors)
@pre_load
def normalize_input(self, data, **kwargs):
"""Normalize input data before validation."""
if 'email' in data:
data['email'] = data['email'].strip().lower()
if 'username' in data:
data['username'] = data['username'].strip()
return data
class SearchQuerySchema(Schema):
"""Schema for search queries."""
class Meta:
unknown = RAISE
q = fields.String(
required=True,
validate=validate.Length(min=1, max=200),
)
page = fields.Integer(
validate=validate.Range(min=1, max=1000),
load_default=1,
)
per_page = fields.Integer(
validate=validate.Range(min=1, max=100),
load_default=20,
)
sort = fields.String(
validate=validate.OneOf(["relevance", "date", "name"]),
load_default="relevance",
)
order = fields.String(
validate=validate.OneOf(["asc", "desc"]),
load_default="desc",
)
# ββ Validation decorator ββββββββββββββββββββββββββββββββββββββββ
def validate_input(schema_class, location="json"):
"""Decorator to validate request input against a schema."""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
schema = schema_class()
if location == "json":
data = request.get_json(silent=True)
if data is None:
return jsonify({
"error": "invalid_request",
"message": "Request body must be valid JSON"
}), 400
elif location == "args":
data = request.args.to_dict()
elif location == "form":
data = request.form.to_dict()
else:
raise ValueError(f"Unknown location: {location}")
try:
validated = schema.load(data)
except ValidationError as err:
return jsonify({
"error": "validation_error",
"messages": err.messages,
}), 422
request.validated_data = validated
return f(*args, **kwargs)
return decorated
return decorator
# ββ Using the validation decorator βββββββββββββββββββββββββββββββ
@app.route('/api/users', methods=['POST'])
@validate_input(UserCreateSchema, location="json")
def create_user():
"""Create a new user with validated input."""
data = request.validated_data
# data is guaranteed to be valid at this point
return jsonify({
"message": "User created",
"username": data["username"],
"email": data["email"],
}), 201
@app.route('/api/search')
@validate_input(SearchQuerySchema, location="args")
def search():
"""Search with validated query parameters."""
data = request.validated_data
return jsonify({
"query": data["q"],
"page": data["page"],
"per_page": data["per_page"],
})
4.3 Pydantic Validation (Alternative)¶
"""
API validation using Pydantic v2.
pip install pydantic
"""
from pydantic import (
BaseModel, Field, field_validator, model_validator,
EmailStr, ConfigDict
)
from typing import Optional
import re
class UserCreate(BaseModel):
"""Pydantic model for user creation."""
model_config = ConfigDict(
str_strip_whitespace=True,
extra='forbid', # Reject unknown fields
)
username: str = Field(
min_length=3,
max_length=30,
pattern=r'^[a-zA-Z0-9_]+$',
)
email: EmailStr
password: str = Field(min_length=12, max_length=128)
age: Optional[int] = Field(default=None, ge=13, le=150)
role: str = Field(default="user", pattern=r'^(user|moderator)$')
@field_validator('password')
@classmethod
def validate_password(cls, v):
if not re.search(r'[A-Z]', v):
raise ValueError('Must contain uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Must contain lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Must contain digit')
if not re.search(r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]', v):
raise ValueError('Must contain special character')
return v
@field_validator('email')
@classmethod
def normalize_email(cls, v):
return v.lower()
# ββ Usage in Flask βββββββββββββββββββββββββββββββββββββββββββββββ
from flask import Flask, request, jsonify
from pydantic import ValidationError as PydanticValidationError
app = Flask(__name__)
@app.route('/api/users', methods=['POST'])
def create_user():
try:
user = UserCreate(**request.get_json())
except PydanticValidationError as e:
return jsonify({
"error": "validation_error",
"details": e.errors(),
}), 422
return jsonify({"username": user.username}), 201
5. CORS (Cross-Origin Resource Sharing)¶
5.1 How CORS Works¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CORS Flow β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Simple Request (GET, HEAD, POST with simple headers): β
β β
β Browser ββββGET /api/dataβββββββΆ Server β
β (origin: https://app.com) β
β β
β Server ββββResponseββββββββββββββΆ Browser β
β Access-Control-Allow-Origin: https://app.com β
β βββ Browser allows response β β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Preflight Request (PUT, DELETE, custom headers, JSON): β
β β
β Step 1: Browser sends OPTIONS preflight β
β Browser ββββOPTIONS /api/dataββββΆ Server β
β Origin: https://app.com β
β Access-Control-Request-Method: PUT β
β Access-Control-Request-Headers: Content-Type, Authorization β
β β
β Step 2: Server responds with allowed methods/headers β
β Server ββββ204 No ContentββββββββΆ Browser β
β Access-Control-Allow-Origin: https://app.com β
β Access-Control-Allow-Methods: GET, POST, PUT, DELETE β
β Access-Control-Allow-Headers: Content-Type, Authorization β
β Access-Control-Max-Age: 86400 β
β β
β Step 3: Browser sends actual request β
β Browser ββββPUT /api/dataββββββββΆ Server β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
5.2 Flask CORS Configuration¶
"""
CORS configuration in Flask.
pip install flask-cors
"""
from flask import Flask, jsonify
from flask_cors import CORS
app = Flask(__name__)
# ββ Option 1: Allow specific origins (RECOMMENDED) ββββββββββββββ
CORS(app, resources={
r"/api/*": {
"origins": [
"https://app.example.com",
"https://admin.example.com",
],
"methods": ["GET", "POST", "PUT", "DELETE"],
"allow_headers": ["Content-Type", "Authorization"],
"expose_headers": ["X-Request-Id", "X-RateLimit-Remaining"],
"supports_credentials": True,
"max_age": 86400,
}
})
# ββ Option 2: Different CORS for different routes ββββββββββββββββ
app2 = Flask(__name__)
# Public API: allow any origin (no credentials)
CORS(app2, resources={
r"/api/public/*": {
"origins": "*",
"methods": ["GET"],
"allow_headers": ["Content-Type"],
"supports_credentials": False, # MUST be False with origin: *
}
})
# Private API: specific origins with credentials
CORS(app2, resources={
r"/api/private/*": {
"origins": ["https://app.example.com"],
"methods": ["GET", "POST", "PUT", "DELETE"],
"allow_headers": ["Content-Type", "Authorization"],
"supports_credentials": True,
"max_age": 86400,
}
})
# ββ Option 3: Manual CORS implementation ββββββββββββββββββββββββ
from flask import Flask, request, make_response
app3 = Flask(__name__)
ALLOWED_ORIGINS = {
"https://app.example.com",
"https://admin.example.com",
}
@app3.after_request
def add_cors_headers(response):
origin = request.headers.get('Origin')
if origin in ALLOWED_ORIGINS:
response.headers['Access-Control-Allow-Origin'] = origin
response.headers['Access-Control-Allow-Credentials'] = 'true'
response.headers['Access-Control-Allow-Methods'] = (
'GET, POST, PUT, DELETE, OPTIONS'
)
response.headers['Access-Control-Allow-Headers'] = (
'Content-Type, Authorization, X-Requested-With'
)
response.headers['Access-Control-Expose-Headers'] = (
'X-Request-Id, X-RateLimit-Remaining'
)
response.headers['Access-Control-Max-Age'] = '86400'
# Vary: Origin tells caches that response varies by origin
response.headers.add('Vary', 'Origin')
return response
@app3.route('/api/data', methods=['OPTIONS'])
def preflight():
"""Handle CORS preflight requests."""
return make_response('', 204)
# ββ CORS Security Rules βββββββββββββββββββββββββββββββββββββββββ
"""
1. NEVER use Access-Control-Allow-Origin: * with credentials
- This is actually blocked by browsers
- If you need credentials, list specific origins
2. NEVER reflect the Origin header as Allow-Origin without checking
- This is equivalent to allowing all origins
- BAD: response.headers['ACAO'] = request.headers['Origin']
- GOOD: Check against an allowlist first
3. Limit Access-Control-Allow-Methods to what is actually needed
- Don't allow DELETE if the route doesn't support it
4. Set Access-Control-Max-Age to reduce preflight requests
- 86400 (24 hours) is reasonable
5. Use Access-Control-Expose-Headers for custom headers
- By default, only simple headers are readable by JavaScript
6. Always add Vary: Origin when ACAO changes per request
- Prevents cache poisoning
"""
6. GraphQL Security¶
6.1 GraphQL-Specific Threats¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β GraphQL Security Threats β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Query Depth Attack β
β query { user { posts { comments { author { posts { ... } } } } }β
β βββΆ Deeply nested queries cause exponential DB load β
β β
β 2. Query Breadth Attack β
β query { user1: user(id:1) {...} user2: user(id:2) {...} ... } β
β βββΆ Many aliases multiplies query cost β
β β
β 3. Introspection Abuse β
β query { __schema { types { name fields { name } } } } β
β βββΆ Exposes entire API schema to attackers β
β β
β 4. Batching Attack β
β [{"query": "..."}, {"query": "..."}, ... x 1000] β
β βββΆ Multiple queries in a single request β
β β
β 5. Injection via Variables β
β query ($id: String!) { user(id: $id) { ... } } β
β variables: { "id": "1 OR 1=1" } β
β βββΆ SQL injection through unvalidated variables β
β β
β 6. Information Disclosure β
β Verbose error messages revealing internal details β
β Field suggestions: "Did you mean 'secretAdminField'?" β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
6.2 GraphQL Security Mitigations¶
"""
GraphQL security measures with graphene (Python).
pip install graphene flask-graphql
"""
# ββ 1. Query Depth Limiting βββββββββββββββββββββββββββββββββββββ
class DepthAnalyzer:
"""Analyze and limit GraphQL query depth."""
def __init__(self, max_depth: int = 10):
self.max_depth = max_depth
def analyze(self, query_ast, depth: int = 0) -> int:
"""Calculate the maximum depth of a query."""
if depth > self.max_depth:
raise ValueError(
f"Query depth {depth} exceeds maximum allowed ({self.max_depth})"
)
max_child_depth = depth
if hasattr(query_ast, 'selection_set') and query_ast.selection_set:
for selection in query_ast.selection_set.selections:
child_depth = self.analyze(selection, depth + 1)
max_child_depth = max(max_child_depth, child_depth)
return max_child_depth
# ββ 2. Query Cost Analysis ββββββββββββββββββββββββββββββββββββββ
class QueryCostAnalyzer:
"""Estimate the cost of a GraphQL query."""
# Define cost per field type
FIELD_COSTS = {
"user": 1,
"posts": 5, # List field, potentially expensive
"comments": 3,
"search": 10, # Full-text search is expensive
}
def __init__(self, max_cost: int = 1000):
self.max_cost = max_cost
def calculate_cost(self, query_ast, multiplier: int = 1) -> int:
"""Calculate estimated query cost."""
total_cost = 0
if hasattr(query_ast, 'selection_set') and query_ast.selection_set:
for selection in query_ast.selection_set.selections:
field_name = selection.name.value
field_cost = self.FIELD_COSTS.get(field_name, 1)
# Check for pagination arguments that multiply cost
args = {
arg.name.value: arg.value.value
for arg in (selection.arguments or [])
if hasattr(arg.value, 'value')
}
limit = int(args.get('first', args.get('limit', 1)))
cost = field_cost * multiplier * max(limit, 1)
total_cost += cost
# Recurse into child selections
total_cost += self.calculate_cost(selection, limit)
if total_cost > self.max_cost:
raise ValueError(
f"Query cost {total_cost} exceeds maximum ({self.max_cost})"
)
return total_cost
# ββ 3. Disable Introspection in Production ββββββββββββββββββββββ
"""
Introspection reveals your entire API schema.
Disable it in production.
"""
from graphql import GraphQLError
class DisableIntrospection:
"""Middleware to disable introspection queries in production."""
def resolve(self, next, root, info, **kwargs):
# Block __schema and __type queries
if info.field_name in ('__schema', '__type'):
raise GraphQLError("Introspection is disabled")
return next(root, info, **kwargs)
# ββ 4. Rate Limit + Batch Limiting ββββββββββββββββββββββββββββββ
from flask import Flask, request, jsonify
app = Flask(__name__)
MAX_BATCH_SIZE = 5 # Maximum queries per batch request
@app.before_request
def limit_batch_queries():
"""Limit the number of queries in a batch request."""
if request.is_json:
data = request.get_json(silent=True)
if isinstance(data, list):
if len(data) > MAX_BATCH_SIZE:
return jsonify({
"error": "batch_limit_exceeded",
"message": f"Maximum {MAX_BATCH_SIZE} queries per batch"
}), 400
# ββ 5. Persisted Queries ββββββββββββββββββββββββββββββββββββββββ
"""
Instead of accepting arbitrary queries, only accept pre-registered
query hashes. This prevents injection and query manipulation.
"""
import hashlib
# Pre-registered queries (build-time generated)
PERSISTED_QUERIES = {
"abc123": "query { users { id name } }",
"def456": "query GetUser($id: ID!) { user(id: $id) { id name email } }",
}
@app.route('/graphql', methods=['POST'])
def graphql_endpoint():
data = request.get_json()
# Only accept persisted queries in production
query_hash = data.get('extensions', {}).get('persistedQuery', {}).get('sha256Hash')
if query_hash:
query = PERSISTED_QUERIES.get(query_hash)
if not query:
return jsonify({"error": "query_not_found"}), 404
else:
# In development, allow arbitrary queries
# In production, reject:
return jsonify({
"error": "persisted_queries_only",
"message": "Only persisted queries are accepted"
}), 400
# Execute the query...
return jsonify({"data": {}})
7. API Gateway Security¶
7.1 Gateway Architecture¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β API Gateway Security Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Client β
β β β
β βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β API Gateway β β
β β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β TLS β β Auth β β Rate β β β
β β βTerminationβ βValidationβ β Limiting β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β Input β β Request β β CORS β β β
β β βValidationβ β Logging β β Handling β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β WAF β β IP Allow/β β Response β β β
β β β Rules β β Blocklistβ β Filteringβ β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β Service Aβ β Service Bβ β Service Cβ β
β β (Users) β β (Orders) β β (Search) β β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β
β Security benefits of centralized gateway: β
β β’ Single point for auth enforcement β
β β’ Consistent rate limiting across services β
β β’ Centralized logging and monitoring β
β β’ Backend services don't handle TLS β
β β’ Simplified security policy management β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
7.2 Request/Response Filtering¶
"""
API gateway request/response filtering middleware.
"""
from flask import Flask, request, jsonify, g
import re
import uuid
import time
import logging
app = Flask(__name__)
logger = logging.getLogger('api_gateway')
# ββ Request ID tracking βββββββββββββββββββββββββββββββββββββββββ
@app.before_request
def add_request_id():
"""Add a unique request ID for tracing."""
g.request_id = request.headers.get(
'X-Request-Id',
str(uuid.uuid4())
)
g.request_start = time.monotonic()
@app.after_request
def add_response_headers(response):
"""Add security and tracking headers to response."""
response.headers['X-Request-Id'] = g.request_id
# Add timing
elapsed = time.monotonic() - g.request_start
response.headers['X-Response-Time'] = f"{elapsed:.3f}s"
return response
# ββ Request size limiting βββββββββββββββββββββββββββββββββββββββ
MAX_CONTENT_LENGTH = 1 * 1024 * 1024 # 1 MB
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH
@app.before_request
def check_content_length():
"""Reject oversized requests."""
content_length = request.content_length
if content_length and content_length > MAX_CONTENT_LENGTH:
return jsonify({
"error": "payload_too_large",
"max_bytes": MAX_CONTENT_LENGTH,
}), 413
# ββ IP allowlist/blocklist ββββββββββββββββββββββββββββββββββββββ
BLOCKED_IPS = {"192.168.1.100", "10.0.0.50"}
ADMIN_ALLOWED_IPS = {"10.0.0.1", "10.0.0.2"}
@app.before_request
def check_ip():
"""Block requests from banned IPs."""
client_ip = request.remote_addr
if client_ip in BLOCKED_IPS:
logger.warning(f"Blocked request from banned IP: {client_ip}")
return jsonify({"error": "forbidden"}), 403
# Admin routes require specific IPs
if request.path.startswith('/admin/'):
if client_ip not in ADMIN_ALLOWED_IPS:
return jsonify({"error": "forbidden"}), 403
# ββ Response data filtering βββββββββββββββββββββββββββββββββββββ
SENSITIVE_FIELDS = {'password', 'ssn', 'credit_card', 'secret_key',
'token', 'api_key'}
def filter_sensitive_data(data):
"""Recursively remove sensitive fields from response data."""
if isinstance(data, dict):
return {
k: filter_sensitive_data(v)
for k, v in data.items()
if k.lower() not in SENSITIVE_FIELDS
}
elif isinstance(data, list):
return [filter_sensitive_data(item) for item in data]
return data
# ββ Audit logging βββββββββββββββββββββββββββββββββββββββββββββββ
@app.after_request
def audit_log(response):
"""Log every API request for audit trail."""
logger.info(
"API Request: method=%s path=%s status=%s "
"ip=%s user_agent=%s request_id=%s duration=%s",
request.method,
request.path,
response.status_code,
request.remote_addr,
request.user_agent.string[:100],
g.request_id,
response.headers.get('X-Response-Time', 'N/A'),
)
return response
8. OpenAPI Security Definitions¶
8.1 Security Schemes in OpenAPI 3.0¶
# openapi.yaml - Security scheme definitions
openapi: 3.0.3
info:
title: Secure API
version: 1.0.0
# ββ Security Scheme Definitions ββββββββββββββββββββββββββββββββββ
components:
securitySchemes:
# API Key authentication
ApiKeyAuth:
type: apiKey
in: header
name: X-API-Key
description: API key for server-to-server communication
# JWT Bearer token
BearerAuth:
type: http
scheme: bearer
bearerFormat: JWT
description: JWT access token
# OAuth 2.0
OAuth2:
type: oauth2
flows:
authorizationCode:
authorizationUrl: https://auth.example.com/authorize
tokenUrl: https://auth.example.com/token
refreshUrl: https://auth.example.com/refresh
scopes:
read: Read access to resources
write: Write access to resources
admin: Administrative access
# OpenID Connect
OpenIdConnect:
type: openIdConnect
openIdConnectUrl: https://auth.example.com/.well-known/openid-configuration
# ββ Global security (applies to all endpoints) ββββββββββββββββββ
security:
- BearerAuth: []
# ββ Per-endpoint security ββββββββββββββββββββββββββββββββββββββββ
paths:
/api/public/status:
get:
summary: Health check (no auth required)
security: [] # Override: no authentication
responses:
'200':
description: OK
/api/users:
get:
summary: List users
security:
- BearerAuth: []
- ApiKeyAuth: [] # Alternative auth (OR)
responses:
'200':
description: User list
/api/admin/settings:
put:
summary: Update settings (admin only)
security:
- OAuth2: [admin] # Requires 'admin' scope
responses:
'200':
description: Settings updated
/api/data:
post:
summary: Create data (requires read + write)
security:
- OAuth2: [read, write] # Requires BOTH scopes
responses:
'201':
description: Data created
8.2 API Versioning Security¶
"""
API versioning considerations for security.
"""
# ββ URL path versioning ββββββββββββββββββββββββββββββββββββββββββ
# /api/v1/users (old, may have known vulnerabilities)
# /api/v2/users (current, patched)
# ββ Header versioning βββββββββββββββββββββββββββββββββββββββββββ
# Accept: application/vnd.example.v2+json
# ββ Security concerns with versioning βββββββββββββββββββββββββββ
"""
1. Old API versions may have known vulnerabilities
- Set deprecation dates and enforce them
- Return Deprecation and Sunset headers
2. Don't maintain security patches for deprecated versions
- Force migration to latest version
3. Monitor usage of deprecated versions
- Alert when old versions are still in use
"""
from flask import Flask, request, jsonify
from datetime import datetime
app = Flask(__name__)
API_VERSIONS = {
"v1": {
"status": "deprecated",
"sunset": "2025-06-01",
"successor": "v2",
},
"v2": {
"status": "current",
"sunset": None,
"successor": None,
},
}
@app.before_request
def check_api_version():
"""Warn or block deprecated API versions."""
# Extract version from URL path
path_parts = request.path.strip('/').split('/')
if len(path_parts) >= 2 and path_parts[0] == 'api':
version = path_parts[1]
version_info = API_VERSIONS.get(version)
if not version_info:
return jsonify({
"error": "invalid_version",
"supported": list(API_VERSIONS.keys()),
}), 400
if version_info["status"] == "deprecated":
sunset_date = version_info["sunset"]
if sunset_date:
# Check if past sunset date
if datetime.now() > datetime.fromisoformat(sunset_date):
return jsonify({
"error": "version_retired",
"message": f"API {version} was retired on {sunset_date}",
"upgrade_to": version_info["successor"],
}), 410 # 410 Gone
@app.after_request
def add_deprecation_headers(response):
"""Add deprecation warnings to response headers."""
path_parts = request.path.strip('/').split('/')
if len(path_parts) >= 2 and path_parts[0] == 'api':
version = path_parts[1]
version_info = API_VERSIONS.get(version, {})
if version_info.get("status") == "deprecated":
response.headers['Deprecation'] = 'true'
if version_info.get("sunset"):
response.headers['Sunset'] = version_info["sunset"]
response.headers['Link'] = (
f'</api/{version_info["successor"]}>; rel="successor-version"'
)
return response
9. Request/Response Encryption¶
9.1 Transport Layer Security¶
"""
TLS configuration and certificate pinning.
"""
# ββ Flask with TLS (development) ββββββββββββββββββββββββββββββββ
# Generate self-signed cert for development:
# openssl req -x509 -newkey rsa:4096 -nodes \
# -out cert.pem -keyout key.pem -days 365
from flask import Flask
app = Flask(__name__)
# Run with TLS (development only)
# In production, TLS is handled by reverse proxy (nginx, load balancer)
if __name__ == '__main__':
app.run(
ssl_context=('cert.pem', 'key.pem'),
host='0.0.0.0',
port=443,
)
# ββ Certificate Pinning (for API clients) βββββββββββββββββββββββ
import requests
import hashlib
import ssl
def verify_certificate_pin(host: str, expected_pin: str) -> bool:
"""Verify server certificate matches expected pin."""
import socket
context = ssl.create_default_context()
with socket.create_connection((host, 443)) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
cert_der = ssock.getpeercert(True)
cert_hash = hashlib.sha256(cert_der).hexdigest()
return cert_hash == expected_pin
# ββ Payload-level encryption (for sensitive fields) βββββββββββββ
from cryptography.fernet import Fernet
# Generate key (store securely, not in code)
encryption_key = Fernet.generate_key()
cipher = Fernet(encryption_key)
def encrypt_sensitive_fields(data: dict, fields: list[str]) -> dict:
"""Encrypt specific fields in a response payload."""
result = data.copy()
for field in fields:
if field in result:
value = str(result[field]).encode()
result[field] = cipher.encrypt(value).decode()
return result
def decrypt_sensitive_fields(data: dict, fields: list[str]) -> dict:
"""Decrypt specific fields in a request payload."""
result = data.copy()
for field in fields:
if field in result:
value = result[field].encode()
result[field] = cipher.decrypt(value).decode()
return result
10. Exercises¶
Exercise 1: Secure JWT Authentication Service¶
Build a complete JWT authentication service with Flask that includes:
- User registration with password hashing (argon2 or bcrypt)
- Login endpoint that returns access + refresh tokens
- Token refresh endpoint with refresh token rotation
- Logout endpoint with token blacklisting (use Redis or in-memory set)
- A protected endpoint that requires the "admin" scope
- Proper error handling for expired, malformed, and revoked tokens
- Rate limiting on the login endpoint (5 attempts per minute per IP)
Exercise 2: CORS Security Audit¶
Write a Python script that:
- Takes a list of API URLs
- Sends requests with various Origin headers
- Tests if the API reflects arbitrary origins (vulnerability)
- Checks if credentials are allowed with wildcard origins
- Tests preflight handling for common methods and headers
- Generates a security report for each endpoint
Exercise 3: GraphQL Security Middleware¶
Implement a GraphQL security middleware that:
- Limits query depth to a configurable maximum (default: 10)
- Calculates query cost and rejects expensive queries
- Disables introspection in production
- Limits batch query size
- Logs all queries with their cost and execution time
- Implements persisted queries with a hash allowlist
Exercise 4: Rate Limiter with Sliding Window¶
Implement a sliding window log rate limiter that:
- Uses Redis as the backing store (or an in-memory simulation)
- Tracks exact timestamps of each request
- Supports configurable windows (per second, minute, hour)
- Supports different limits for different API key tiers
- Returns proper rate limit headers (X-RateLimit-Limit, Remaining, Reset)
- Handles distributed deployment (multiple server instances)
Exercise 5: API Input Validation Framework¶
Build a reusable validation framework that:
- Validates JSON request bodies against schemas
- Validates query parameters with type coercion
- Validates path parameters
- Supports nested object validation
- Returns consistent error response format (RFC 7807)
- Protects against mass assignment (reject unknown fields)
- Sanitizes string inputs (trim, normalize Unicode)
Exercise 6: API Security Scanner¶
Create a security scanning tool that tests an API for:
- Missing authentication on endpoints
- Broken object-level authorization (BOLA/IDOR)
- Missing rate limiting
- Verbose error messages revealing internal details
- Missing security headers
- CORS misconfiguration
- Generate a report with severity ratings
Summary¶
API Security Checklist¶
| Category | Item | Priority |
|---|---|---|
| Authentication | Use OAuth 2.0 or JWT (not API keys alone for users) | Critical |
| Authentication | Short-lived access tokens (15 min) | Critical |
| Authentication | Refresh token rotation | High |
| Authorization | Check object-level permissions (prevent BOLA) | Critical |
| Authorization | Validate scopes on every request | Critical |
| Rate Limiting | Per-IP and per-user rate limits | High |
| Rate Limiting | Stricter limits on auth endpoints | High |
| Input Validation | Schema validation on all inputs | Critical |
| Input Validation | Reject unknown fields | High |
| CORS | Specific origin allowlist (no wildcard with credentials) | Critical |
| CORS | Vary: Origin header | Medium |
| Encryption | TLS everywhere (HTTPS only) | Critical |
| Logging | Log all requests with unique request IDs | High |
| Versioning | Deprecate old versions with sunset dates | Medium |
| Gateway | Centralized auth and rate limiting | Recommended |
Key Takeaways¶
- Defense in depth β apply security at every layer (network, gateway, application, database)
- Never trust client input β validate everything, on the server, every time
- Use established libraries β do not roll your own authentication or encryption
- Monitor and alert β security without monitoring is incomplete
- Document your API security β use OpenAPI security schemes for clarity
Previous: 09_Web_Security_Headers.md | Next: 11_Secrets_Management.md