1"""
2OWASP Top 10 Security Patterns Demo
3====================================
4
5Educational demonstration of secure vs vulnerable code patterns
6based on the OWASP Top 10 Web Application Security Risks.
7
8Topics covered:
9- SQL Injection (A03:2021 - Injection)
10- Input validation
11- Secure error handling
12- Secure session configuration concepts
13- Broken Access Control patterns (A01:2021)
14- Security Misconfiguration (A05:2021)
15
16All examples are DEFENSIVE - showing how to identify and FIX
17vulnerabilities. Uses Python standard library + sqlite3.
18"""
19
20import sqlite3
21import hashlib
22import hmac
23import html
24import json
25import os
26import re
27import secrets
28import traceback
29from urllib.parse import urlparse
30
31print("=" * 65)
32print(" OWASP Top 10 Security Patterns Demo")
33print("=" * 65)
34print()
35
36
37# ============================================================
38# Section 1: A03:2021 - Injection (SQL Injection)
39# ============================================================
40
41print("-" * 65)
42print(" Section 1: A03 - SQL Injection")
43print("-" * 65)
44
45# Setup in-memory database
46conn = sqlite3.connect(":memory:")
47cursor = conn.cursor()
48cursor.execute("""
49 CREATE TABLE users (
50 id INTEGER PRIMARY KEY,
51 username TEXT UNIQUE,
52 email TEXT,
53 password_hash TEXT,
54 is_admin INTEGER DEFAULT 0
55 )
56""")
57cursor.executemany(
58 "INSERT INTO users (username, email, password_hash, is_admin) VALUES (?, ?, ?, ?)",
59 [
60 ("alice", "alice@example.com", hashlib.sha256(b"hashed_pw").hexdigest(), 0),
61 ("bob", "bob@example.com", hashlib.sha256(b"hashed_pw2").hexdigest(), 0),
62 ("admin", "admin@example.com", hashlib.sha256(b"admin_pw").hexdigest(), 1),
63 ],
64)
65conn.commit()
66
67print("""
68 VULNERABLE: String formatting in SQL queries
69 SECURE: Parameterized queries (prepared statements)
70""")
71
72
73# VULNERABLE: String concatenation
74def get_user_vulnerable(username: str) -> list:
75 """VULNERABLE: SQL injection via string formatting."""
76 query = f"SELECT * FROM users WHERE username = '{username}'"
77 print(f" Query: {query}")
78 return cursor.execute(query).fetchall()
79
80
81# SECURE: Parameterized query
82def get_user_secure(username: str) -> list:
83 """SECURE: Uses parameterized query."""
84 query = "SELECT * FROM users WHERE username = ?"
85 print(f" Query: {query} params=[{username}]")
86 return cursor.execute(query, (username,)).fetchall()
87
88
89# Normal usage
90print(" Normal input: 'alice'")
91print(" -- Vulnerable --")
92result_v = get_user_vulnerable("alice")
93print(f" Result: {result_v}")
94print(" -- Secure --")
95result_s = get_user_secure("alice")
96print(f" Result: {result_s}")
97print()
98
99# SQL Injection attack
100malicious_input = "' OR '1'='1"
101print(f" Malicious input: {malicious_input!r}")
102print(" -- Vulnerable --")
103result_v = get_user_vulnerable(malicious_input)
104print(f" Result: {len(result_v)} rows returned (ALL users leaked!)")
105for row in result_v:
106 print(f" {row}")
107print(" -- Secure --")
108result_s = get_user_secure(malicious_input)
109print(f" Result: {len(result_s)} rows (correctly returns nothing)")
110print()
111
112
113# ============================================================
114# Section 2: Input Validation
115# ============================================================
116
117print("-" * 65)
118print(" Section 2: Input Validation")
119print("-" * 65)
120
121print("""
122 Validate ALL user input. Reject invalid data early.
123 Use allowlists (not blocklists) when possible.
124""")
125
126
127class InputValidator:
128 """Collection of input validation methods."""
129
130 @staticmethod
131 def validate_email(email: str) -> tuple[bool, str]:
132 """Validate email format."""
133 if not email or len(email) > 254:
134 return False, "Email too long or empty"
135 # Basic RFC 5322 pattern (simplified)
136 pattern = r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$"
137 if not re.match(pattern, email):
138 return False, "Invalid email format"
139 return True, "Valid"
140
141 @staticmethod
142 def validate_username(username: str) -> tuple[bool, str]:
143 """Validate username: alphanumeric + underscore, 3-32 chars."""
144 if not username:
145 return False, "Username is required"
146 if len(username) < 3 or len(username) > 32:
147 return False, "Username must be 3-32 characters"
148 if not re.match(r"^[a-zA-Z0-9_]+$", username):
149 return False, "Username can only contain letters, numbers, underscore"
150 return True, "Valid"
151
152 @staticmethod
153 def validate_url(url: str) -> tuple[bool, str]:
154 """Validate URL - prevent open redirect / SSRF."""
155 try:
156 parsed = urlparse(url)
157 except Exception:
158 return False, "Invalid URL format"
159
160 # Must have scheme and netloc
161 if not parsed.scheme or not parsed.netloc:
162 return False, "URL must include scheme and host"
163
164 # Only allow http/https
165 if parsed.scheme not in ("http", "https"):
166 return False, f"Scheme '{parsed.scheme}' not allowed (http/https only)"
167
168 # Block internal IPs (SSRF prevention)
169 blocked_hosts = ["localhost", "127.0.0.1", "0.0.0.0", "169.254.169.254"]
170 if parsed.hostname in blocked_hosts:
171 return False, f"Host '{parsed.hostname}' is blocked (SSRF prevention)"
172
173 # Block internal network ranges
174 if parsed.hostname and parsed.hostname.startswith(("10.", "192.168.", "172.")):
175 return False, "Internal network addresses are blocked"
176
177 return True, "Valid"
178
179 @staticmethod
180 def validate_integer(value: str, min_val: int = None,
181 max_val: int = None) -> tuple[bool, str]:
182 """Validate integer input with range check."""
183 try:
184 num = int(value)
185 except (ValueError, TypeError):
186 return False, "Not a valid integer"
187 if min_val is not None and num < min_val:
188 return False, f"Value must be >= {min_val}"
189 if max_val is not None and num > max_val:
190 return False, f"Value must be <= {max_val}"
191 return True, "Valid"
192
193
194validator = InputValidator()
195
196# Test email validation
197print("\n Email Validation:")
198test_emails = [
199 "user@example.com",
200 "invalid-email",
201 "user@.com",
202 "<script>@evil.com",
203 "a" * 300 + "@example.com",
204]
205for email in test_emails:
206 valid, msg = validator.validate_email(email)
207 status = "PASS" if valid else "FAIL"
208 print(f" [{status}] {email[:35]:<35} -> {msg}")
209
210# Test URL validation (SSRF prevention)
211print("\n URL Validation (SSRF Prevention):")
212test_urls = [
213 "https://example.com/page",
214 "http://localhost/admin",
215 "http://169.254.169.254/latest/meta-data/",
216 "file:///etc/passwd",
217 "https://192.168.1.1/internal",
218]
219for url in test_urls:
220 valid, msg = validator.validate_url(url)
221 status = "PASS" if valid else "BLOCK"
222 print(f" [{status}] {url[:40]:<40} -> {msg}")
223print()
224
225
226# ============================================================
227# Section 3: Secure Error Handling
228# ============================================================
229
230print("-" * 65)
231print(" Section 3: Secure Error Handling")
232print("-" * 65)
233
234print("""
235 VULNERABLE: Exposing stack traces, SQL errors, internal paths
236 SECURE: Generic messages to users, detailed logs internally
237""")
238
239
240class SecureErrorHandler:
241 """Demonstrates secure vs insecure error handling."""
242
243 def __init__(self):
244 self.error_log: list[dict] = [] # Simulates server-side log
245
246 def handle_error_vulnerable(self, error: Exception) -> str:
247 """VULNERABLE: Exposes internal details to user."""
248 return f"Error: {type(error).__name__}: {error}\n{traceback.format_exc()}"
249
250 def handle_error_secure(self, error: Exception, request_id: str = None) -> str:
251 """SECURE: Generic message to user, details to internal log."""
252 if request_id is None:
253 request_id = secrets.token_hex(8)
254
255 # Log full details internally
256 self.error_log.append({
257 "request_id": request_id,
258 "error_type": type(error).__name__,
259 "error_message": str(error),
260 "traceback": traceback.format_exc(),
261 })
262
263 # Return sanitized message to user
264 return (
265 f"An unexpected error occurred. "
266 f"Reference ID: {request_id}. "
267 f"Please contact support if this persists."
268 )
269
270
271handler = SecureErrorHandler()
272
273# Simulate an error
274try:
275 result = 1 / 0
276except Exception as e:
277 print("\n -- VULNERABLE error response (exposes internals) --")
278 vuln_response = handler.handle_error_vulnerable(e)
279 for line in vuln_response.strip().split("\n")[:4]:
280 print(f" {line}")
281 print(" ...")
282
283 print("\n -- SECURE error response (generic to user) --")
284 secure_response = handler.handle_error_secure(e)
285 print(f" {secure_response}")
286
287 print("\n -- Internal log (server-side only) --")
288 log_entry = handler.error_log[-1]
289 print(f" Request ID: {log_entry['request_id']}")
290 print(f" Error Type: {log_entry['error_type']}")
291 print(f" Message: {log_entry['error_message']}")
292print()
293
294
295# ============================================================
296# Section 4: Secure Session Configuration
297# ============================================================
298
299print("-" * 65)
300print(" Section 4: Secure Session Configuration")
301print("-" * 65)
302
303print("""
304 Session security checklist for web applications:
305""")
306
307secure_config = {
308 "session_cookie_name": "__Host-session", # __Host- prefix for extra security
309 "session_cookie_httponly": True, # No JavaScript access
310 "session_cookie_secure": True, # HTTPS only
311 "session_cookie_samesite": "Lax", # CSRF protection
312 "session_cookie_path": "/",
313 "session_lifetime_seconds": 3600, # 1 hour
314 "session_idle_timeout_seconds": 900, # 15 minutes
315 "session_regenerate_on_login": True,
316 "session_regenerate_on_privilege_change": True,
317 "max_concurrent_sessions": 3,
318}
319
320insecure_config = {
321 "session_cookie_name": "session", # No prefix
322 "session_cookie_httponly": False, # JS can steal cookie!
323 "session_cookie_secure": False, # Sent over HTTP!
324 "session_cookie_samesite": "None", # No CSRF protection!
325 "session_cookie_path": "/",
326 "session_lifetime_seconds": 86400 * 30, # 30 days!
327 "session_idle_timeout_seconds": None, # Never expires!
328 "session_regenerate_on_login": False,
329 "session_regenerate_on_privilege_change": False,
330 "max_concurrent_sessions": None, # Unlimited!
331}
332
333print(f" {'Setting':<42} {'INSECURE':<14} {'SECURE':<14}")
334print(f" {'-'*42} {'-'*14} {'-'*14}")
335for key in secure_config:
336 insecure_val = str(insecure_config.get(key, "N/A"))[:12]
337 secure_val = str(secure_config[key])[:12]
338 clean_key = key.replace("session_cookie_", "cookie.").replace("session_", "")
339 print(f" {clean_key:<42} {insecure_val:<14} {secure_val:<14}")
340print()
341
342
343# ============================================================
344# Section 5: A01:2021 - Broken Access Control
345# ============================================================
346
347print("-" * 65)
348print(" Section 5: A01 - Broken Access Control")
349print("-" * 65)
350
351print("""
352 Broken access control allows users to act outside
353 their intended permissions.
354""")
355
356
357# VULNERABLE: Direct object reference without authorization check
358class VulnerableAPI:
359 """VULNERABLE: No authorization checks on resource access."""
360
361 def __init__(self):
362 self.documents = {
363 1: {"owner": "alice", "title": "Alice's Report", "content": "Secret data"},
364 2: {"owner": "bob", "title": "Bob's Notes", "content": "Private notes"},
365 }
366
367 def get_document(self, doc_id: int, requesting_user: str) -> dict:
368 """VULNERABLE: Any user can access any document by ID."""
369 # No authorization check!
370 doc = self.documents.get(doc_id)
371 if doc:
372 return {"status": "ok", "document": doc}
373 return {"status": "error", "message": "Not found"}
374
375
376# SECURE: Authorization check before access
377class SecureAPI:
378 """SECURE: Checks authorization before returning resources."""
379
380 def __init__(self):
381 self.documents = {
382 1: {"owner": "alice", "title": "Alice's Report", "content": "Secret data"},
383 2: {"owner": "bob", "title": "Bob's Notes", "content": "Private notes"},
384 }
385 self.admin_users = {"admin"}
386
387 def get_document(self, doc_id: int, requesting_user: str) -> dict:
388 """SECURE: Checks ownership or admin status before access."""
389 doc = self.documents.get(doc_id)
390 if not doc:
391 return {"status": "error", "message": "Not found"}
392
393 # Authorization check
394 if doc["owner"] != requesting_user and requesting_user not in self.admin_users:
395 return {"status": "error", "message": "Access denied"}
396
397 return {"status": "ok", "document": doc}
398
399
400vuln_api = VulnerableAPI()
401secure_api = SecureAPI()
402
403print("\n Scenario: Bob tries to access Alice's document (doc_id=1)")
404print()
405print(" -- VULNERABLE API --")
406result = vuln_api.get_document(1, "bob")
407print(f" Result: {result['status']} - {result.get('document', {}).get('title', 'N/A')}")
408print(f" Content exposed: {result.get('document', {}).get('content', 'N/A')}")
409
410print("\n -- SECURE API --")
411result = secure_api.get_document(1, "bob")
412print(f" Result: {result['status']} - {result.get('message', 'N/A')}")
413
414result = secure_api.get_document(1, "alice")
415print(f" Alice's own request: {result['status']} - {result.get('document', {}).get('title', 'N/A')}")
416print()
417
418
419# ============================================================
420# Section 6: A05:2021 - Security Misconfiguration
421# ============================================================
422
423print("-" * 65)
424print(" Section 6: A05 - Security Misconfiguration")
425print("-" * 65)
426
427
428def security_headers_check(headers: dict) -> list[dict]:
429 """Check HTTP response headers for security issues."""
430 findings = []
431
432 recommended_headers = {
433 "Strict-Transport-Security": {
434 "expected": "max-age=31536000; includeSubDomains",
435 "severity": "HIGH",
436 "description": "HSTS forces HTTPS, prevents downgrade attacks",
437 },
438 "Content-Security-Policy": {
439 "expected": "default-src 'self'",
440 "severity": "HIGH",
441 "description": "CSP prevents XSS and data injection",
442 },
443 "X-Content-Type-Options": {
444 "expected": "nosniff",
445 "severity": "MEDIUM",
446 "description": "Prevents MIME type sniffing",
447 },
448 "X-Frame-Options": {
449 "expected": "DENY",
450 "severity": "MEDIUM",
451 "description": "Prevents clickjacking via iframes",
452 },
453 "Referrer-Policy": {
454 "expected": "strict-origin-when-cross-origin",
455 "severity": "LOW",
456 "description": "Controls referrer information leakage",
457 },
458 "Permissions-Policy": {
459 "expected": "camera=(), microphone=(), geolocation=()",
460 "severity": "LOW",
461 "description": "Restricts browser feature access",
462 },
463 }
464
465 # Headers that should NOT be present
466 dangerous_headers = {
467 "Server": "Reveals server software version",
468 "X-Powered-By": "Reveals framework/language",
469 }
470
471 for header, info in recommended_headers.items():
472 if header not in headers:
473 findings.append({
474 "type": "MISSING",
475 "header": header,
476 "severity": info["severity"],
477 "description": info["description"],
478 "recommendation": f"Add: {header}: {info['expected']}",
479 })
480
481 for header, desc in dangerous_headers.items():
482 if header in headers:
483 findings.append({
484 "type": "REMOVE",
485 "header": header,
486 "severity": "LOW",
487 "description": desc,
488 "recommendation": f"Remove {header} header",
489 })
490
491 return findings
492
493
494# Example: Check insecure headers
495insecure_headers = {
496 "Server": "Apache/2.4.41 (Ubuntu)",
497 "X-Powered-By": "Express",
498 "Content-Type": "text/html",
499}
500
501print("\n Security Headers Audit:")
502print(f" Checking response headers: {json.dumps(insecure_headers, indent=2)}")
503print()
504
505findings = security_headers_check(insecure_headers)
506for f in findings:
507 icon = "!" if f["severity"] == "HIGH" else "-"
508 print(f" [{f['severity']:<6}] {icon} {f['type']}: {f['header']}")
509 print(f" {f['description']}")
510 print(f" Fix: {f['recommendation']}")
511print()
512
513
514# ============================================================
515# Section 7: Summary
516# ============================================================
517
518print("=" * 65)
519print(" OWASP Top 10 (2021) Summary")
520print("=" * 65)
521print("""
522 Rank | Category | Key Defense
523 -----+----------------------------------+---------------------------
524 A01 | Broken Access Control | Authz checks, deny default
525 A02 | Cryptographic Failures | TLS, strong hashing, KMS
526 A03 | Injection | Parameterized queries
527 A04 | Insecure Design | Threat modeling, secure arch
528 A05 | Security Misconfiguration | Hardening, security headers
529 A06 | Vulnerable Components | Dependency scanning, SCA
530 A07 | Auth & Identity Failures | MFA, rate limiting
531 A08 | Software & Data Integrity | CI/CD security, signatures
532 A09 | Logging & Monitoring Failures | Centralized logging, alerts
533 A10 | Server-Side Request Forgery | Input validation, allowlists
534
535 Remember: Security is a process, not a product.
536 Apply defense in depth - no single control is sufficient.
537""")
538
539# Cleanup
540conn.close()