owasp_demo.py

Download
python 541 lines 17.5 KB
  1"""
  2OWASP Top 10 Security Patterns Demo
  3====================================
  4
  5Educational demonstration of secure vs vulnerable code patterns
  6based on the OWASP Top 10 Web Application Security Risks.
  7
  8Topics covered:
  9- SQL Injection (A03:2021 - Injection)
 10- Input validation
 11- Secure error handling
 12- Secure session configuration concepts
 13- Broken Access Control patterns (A01:2021)
 14- Security Misconfiguration (A05:2021)
 15
 16All examples are DEFENSIVE - showing how to identify and FIX
 17vulnerabilities. Uses Python standard library + sqlite3.
 18"""
 19
 20import sqlite3
 21import hashlib
 22import hmac
 23import html
 24import json
 25import os
 26import re
 27import secrets
 28import traceback
 29from urllib.parse import urlparse
 30
 31print("=" * 65)
 32print("  OWASP Top 10 Security Patterns Demo")
 33print("=" * 65)
 34print()
 35
 36
 37# ============================================================
 38# Section 1: A03:2021 - Injection (SQL Injection)
 39# ============================================================
 40
 41print("-" * 65)
 42print("  Section 1: A03 - SQL Injection")
 43print("-" * 65)
 44
 45# Setup in-memory database
 46conn = sqlite3.connect(":memory:")
 47cursor = conn.cursor()
 48cursor.execute("""
 49    CREATE TABLE users (
 50        id INTEGER PRIMARY KEY,
 51        username TEXT UNIQUE,
 52        email TEXT,
 53        password_hash TEXT,
 54        is_admin INTEGER DEFAULT 0
 55    )
 56""")
 57cursor.executemany(
 58    "INSERT INTO users (username, email, password_hash, is_admin) VALUES (?, ?, ?, ?)",
 59    [
 60        ("alice", "alice@example.com", hashlib.sha256(b"hashed_pw").hexdigest(), 0),
 61        ("bob", "bob@example.com", hashlib.sha256(b"hashed_pw2").hexdigest(), 0),
 62        ("admin", "admin@example.com", hashlib.sha256(b"admin_pw").hexdigest(), 1),
 63    ],
 64)
 65conn.commit()
 66
 67print("""
 68  VULNERABLE: String formatting in SQL queries
 69  SECURE:     Parameterized queries (prepared statements)
 70""")
 71
 72
 73# VULNERABLE: String concatenation
 74def get_user_vulnerable(username: str) -> list:
 75    """VULNERABLE: SQL injection via string formatting."""
 76    query = f"SELECT * FROM users WHERE username = '{username}'"
 77    print(f"    Query: {query}")
 78    return cursor.execute(query).fetchall()
 79
 80
 81# SECURE: Parameterized query
 82def get_user_secure(username: str) -> list:
 83    """SECURE: Uses parameterized query."""
 84    query = "SELECT * FROM users WHERE username = ?"
 85    print(f"    Query: {query}  params=[{username}]")
 86    return cursor.execute(query, (username,)).fetchall()
 87
 88
 89# Normal usage
 90print("  Normal input: 'alice'")
 91print("  -- Vulnerable --")
 92result_v = get_user_vulnerable("alice")
 93print(f"    Result: {result_v}")
 94print("  -- Secure --")
 95result_s = get_user_secure("alice")
 96print(f"    Result: {result_s}")
 97print()
 98
 99# SQL Injection attack
100malicious_input = "' OR '1'='1"
101print(f"  Malicious input: {malicious_input!r}")
102print("  -- Vulnerable --")
103result_v = get_user_vulnerable(malicious_input)
104print(f"    Result: {len(result_v)} rows returned (ALL users leaked!)")
105for row in result_v:
106    print(f"      {row}")
107print("  -- Secure --")
108result_s = get_user_secure(malicious_input)
109print(f"    Result: {len(result_s)} rows (correctly returns nothing)")
110print()
111
112
113# ============================================================
114# Section 2: Input Validation
115# ============================================================
116
117print("-" * 65)
118print("  Section 2: Input Validation")
119print("-" * 65)
120
121print("""
122  Validate ALL user input. Reject invalid data early.
123  Use allowlists (not blocklists) when possible.
124""")
125
126
127class InputValidator:
128    """Collection of input validation methods."""
129
130    @staticmethod
131    def validate_email(email: str) -> tuple[bool, str]:
132        """Validate email format."""
133        if not email or len(email) > 254:
134            return False, "Email too long or empty"
135        # Basic RFC 5322 pattern (simplified)
136        pattern = r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$"
137        if not re.match(pattern, email):
138            return False, "Invalid email format"
139        return True, "Valid"
140
141    @staticmethod
142    def validate_username(username: str) -> tuple[bool, str]:
143        """Validate username: alphanumeric + underscore, 3-32 chars."""
144        if not username:
145            return False, "Username is required"
146        if len(username) < 3 or len(username) > 32:
147            return False, "Username must be 3-32 characters"
148        if not re.match(r"^[a-zA-Z0-9_]+$", username):
149            return False, "Username can only contain letters, numbers, underscore"
150        return True, "Valid"
151
152    @staticmethod
153    def validate_url(url: str) -> tuple[bool, str]:
154        """Validate URL - prevent open redirect / SSRF."""
155        try:
156            parsed = urlparse(url)
157        except Exception:
158            return False, "Invalid URL format"
159
160        # Must have scheme and netloc
161        if not parsed.scheme or not parsed.netloc:
162            return False, "URL must include scheme and host"
163
164        # Only allow http/https
165        if parsed.scheme not in ("http", "https"):
166            return False, f"Scheme '{parsed.scheme}' not allowed (http/https only)"
167
168        # Block internal IPs (SSRF prevention)
169        blocked_hosts = ["localhost", "127.0.0.1", "0.0.0.0", "169.254.169.254"]
170        if parsed.hostname in blocked_hosts:
171            return False, f"Host '{parsed.hostname}' is blocked (SSRF prevention)"
172
173        # Block internal network ranges
174        if parsed.hostname and parsed.hostname.startswith(("10.", "192.168.", "172.")):
175            return False, "Internal network addresses are blocked"
176
177        return True, "Valid"
178
179    @staticmethod
180    def validate_integer(value: str, min_val: int = None,
181                         max_val: int = None) -> tuple[bool, str]:
182        """Validate integer input with range check."""
183        try:
184            num = int(value)
185        except (ValueError, TypeError):
186            return False, "Not a valid integer"
187        if min_val is not None and num < min_val:
188            return False, f"Value must be >= {min_val}"
189        if max_val is not None and num > max_val:
190            return False, f"Value must be <= {max_val}"
191        return True, "Valid"
192
193
194validator = InputValidator()
195
196# Test email validation
197print("\n  Email Validation:")
198test_emails = [
199    "user@example.com",
200    "invalid-email",
201    "user@.com",
202    "<script>@evil.com",
203    "a" * 300 + "@example.com",
204]
205for email in test_emails:
206    valid, msg = validator.validate_email(email)
207    status = "PASS" if valid else "FAIL"
208    print(f"    [{status}] {email[:35]:<35} -> {msg}")
209
210# Test URL validation (SSRF prevention)
211print("\n  URL Validation (SSRF Prevention):")
212test_urls = [
213    "https://example.com/page",
214    "http://localhost/admin",
215    "http://169.254.169.254/latest/meta-data/",
216    "file:///etc/passwd",
217    "https://192.168.1.1/internal",
218]
219for url in test_urls:
220    valid, msg = validator.validate_url(url)
221    status = "PASS" if valid else "BLOCK"
222    print(f"    [{status}] {url[:40]:<40} -> {msg}")
223print()
224
225
226# ============================================================
227# Section 3: Secure Error Handling
228# ============================================================
229
230print("-" * 65)
231print("  Section 3: Secure Error Handling")
232print("-" * 65)
233
234print("""
235  VULNERABLE: Exposing stack traces, SQL errors, internal paths
236  SECURE:     Generic messages to users, detailed logs internally
237""")
238
239
240class SecureErrorHandler:
241    """Demonstrates secure vs insecure error handling."""
242
243    def __init__(self):
244        self.error_log: list[dict] = []  # Simulates server-side log
245
246    def handle_error_vulnerable(self, error: Exception) -> str:
247        """VULNERABLE: Exposes internal details to user."""
248        return f"Error: {type(error).__name__}: {error}\n{traceback.format_exc()}"
249
250    def handle_error_secure(self, error: Exception, request_id: str = None) -> str:
251        """SECURE: Generic message to user, details to internal log."""
252        if request_id is None:
253            request_id = secrets.token_hex(8)
254
255        # Log full details internally
256        self.error_log.append({
257            "request_id": request_id,
258            "error_type": type(error).__name__,
259            "error_message": str(error),
260            "traceback": traceback.format_exc(),
261        })
262
263        # Return sanitized message to user
264        return (
265            f"An unexpected error occurred. "
266            f"Reference ID: {request_id}. "
267            f"Please contact support if this persists."
268        )
269
270
271handler = SecureErrorHandler()
272
273# Simulate an error
274try:
275    result = 1 / 0
276except Exception as e:
277    print("\n  -- VULNERABLE error response (exposes internals) --")
278    vuln_response = handler.handle_error_vulnerable(e)
279    for line in vuln_response.strip().split("\n")[:4]:
280        print(f"    {line}")
281    print("    ...")
282
283    print("\n  -- SECURE error response (generic to user) --")
284    secure_response = handler.handle_error_secure(e)
285    print(f"    {secure_response}")
286
287    print("\n  -- Internal log (server-side only) --")
288    log_entry = handler.error_log[-1]
289    print(f"    Request ID:  {log_entry['request_id']}")
290    print(f"    Error Type:  {log_entry['error_type']}")
291    print(f"    Message:     {log_entry['error_message']}")
292print()
293
294
295# ============================================================
296# Section 4: Secure Session Configuration
297# ============================================================
298
299print("-" * 65)
300print("  Section 4: Secure Session Configuration")
301print("-" * 65)
302
303print("""
304  Session security checklist for web applications:
305""")
306
307secure_config = {
308    "session_cookie_name": "__Host-session",  # __Host- prefix for extra security
309    "session_cookie_httponly": True,   # No JavaScript access
310    "session_cookie_secure": True,    # HTTPS only
311    "session_cookie_samesite": "Lax", # CSRF protection
312    "session_cookie_path": "/",
313    "session_lifetime_seconds": 3600,    # 1 hour
314    "session_idle_timeout_seconds": 900, # 15 minutes
315    "session_regenerate_on_login": True,
316    "session_regenerate_on_privilege_change": True,
317    "max_concurrent_sessions": 3,
318}
319
320insecure_config = {
321    "session_cookie_name": "session",     # No prefix
322    "session_cookie_httponly": False,      # JS can steal cookie!
323    "session_cookie_secure": False,       # Sent over HTTP!
324    "session_cookie_samesite": "None",    # No CSRF protection!
325    "session_cookie_path": "/",
326    "session_lifetime_seconds": 86400 * 30,  # 30 days!
327    "session_idle_timeout_seconds": None,    # Never expires!
328    "session_regenerate_on_login": False,
329    "session_regenerate_on_privilege_change": False,
330    "max_concurrent_sessions": None,         # Unlimited!
331}
332
333print(f"  {'Setting':<42} {'INSECURE':<14} {'SECURE':<14}")
334print(f"  {'-'*42} {'-'*14} {'-'*14}")
335for key in secure_config:
336    insecure_val = str(insecure_config.get(key, "N/A"))[:12]
337    secure_val = str(secure_config[key])[:12]
338    clean_key = key.replace("session_cookie_", "cookie.").replace("session_", "")
339    print(f"  {clean_key:<42} {insecure_val:<14} {secure_val:<14}")
340print()
341
342
343# ============================================================
344# Section 5: A01:2021 - Broken Access Control
345# ============================================================
346
347print("-" * 65)
348print("  Section 5: A01 - Broken Access Control")
349print("-" * 65)
350
351print("""
352  Broken access control allows users to act outside
353  their intended permissions.
354""")
355
356
357# VULNERABLE: Direct object reference without authorization check
358class VulnerableAPI:
359    """VULNERABLE: No authorization checks on resource access."""
360
361    def __init__(self):
362        self.documents = {
363            1: {"owner": "alice", "title": "Alice's Report", "content": "Secret data"},
364            2: {"owner": "bob", "title": "Bob's Notes", "content": "Private notes"},
365        }
366
367    def get_document(self, doc_id: int, requesting_user: str) -> dict:
368        """VULNERABLE: Any user can access any document by ID."""
369        # No authorization check!
370        doc = self.documents.get(doc_id)
371        if doc:
372            return {"status": "ok", "document": doc}
373        return {"status": "error", "message": "Not found"}
374
375
376# SECURE: Authorization check before access
377class SecureAPI:
378    """SECURE: Checks authorization before returning resources."""
379
380    def __init__(self):
381        self.documents = {
382            1: {"owner": "alice", "title": "Alice's Report", "content": "Secret data"},
383            2: {"owner": "bob", "title": "Bob's Notes", "content": "Private notes"},
384        }
385        self.admin_users = {"admin"}
386
387    def get_document(self, doc_id: int, requesting_user: str) -> dict:
388        """SECURE: Checks ownership or admin status before access."""
389        doc = self.documents.get(doc_id)
390        if not doc:
391            return {"status": "error", "message": "Not found"}
392
393        # Authorization check
394        if doc["owner"] != requesting_user and requesting_user not in self.admin_users:
395            return {"status": "error", "message": "Access denied"}
396
397        return {"status": "ok", "document": doc}
398
399
400vuln_api = VulnerableAPI()
401secure_api = SecureAPI()
402
403print("\n  Scenario: Bob tries to access Alice's document (doc_id=1)")
404print()
405print("  -- VULNERABLE API --")
406result = vuln_api.get_document(1, "bob")
407print(f"    Result: {result['status']} - {result.get('document', {}).get('title', 'N/A')}")
408print(f"    Content exposed: {result.get('document', {}).get('content', 'N/A')}")
409
410print("\n  -- SECURE API --")
411result = secure_api.get_document(1, "bob")
412print(f"    Result: {result['status']} - {result.get('message', 'N/A')}")
413
414result = secure_api.get_document(1, "alice")
415print(f"    Alice's own request: {result['status']} - {result.get('document', {}).get('title', 'N/A')}")
416print()
417
418
419# ============================================================
420# Section 6: A05:2021 - Security Misconfiguration
421# ============================================================
422
423print("-" * 65)
424print("  Section 6: A05 - Security Misconfiguration")
425print("-" * 65)
426
427
428def security_headers_check(headers: dict) -> list[dict]:
429    """Check HTTP response headers for security issues."""
430    findings = []
431
432    recommended_headers = {
433        "Strict-Transport-Security": {
434            "expected": "max-age=31536000; includeSubDomains",
435            "severity": "HIGH",
436            "description": "HSTS forces HTTPS, prevents downgrade attacks",
437        },
438        "Content-Security-Policy": {
439            "expected": "default-src 'self'",
440            "severity": "HIGH",
441            "description": "CSP prevents XSS and data injection",
442        },
443        "X-Content-Type-Options": {
444            "expected": "nosniff",
445            "severity": "MEDIUM",
446            "description": "Prevents MIME type sniffing",
447        },
448        "X-Frame-Options": {
449            "expected": "DENY",
450            "severity": "MEDIUM",
451            "description": "Prevents clickjacking via iframes",
452        },
453        "Referrer-Policy": {
454            "expected": "strict-origin-when-cross-origin",
455            "severity": "LOW",
456            "description": "Controls referrer information leakage",
457        },
458        "Permissions-Policy": {
459            "expected": "camera=(), microphone=(), geolocation=()",
460            "severity": "LOW",
461            "description": "Restricts browser feature access",
462        },
463    }
464
465    # Headers that should NOT be present
466    dangerous_headers = {
467        "Server": "Reveals server software version",
468        "X-Powered-By": "Reveals framework/language",
469    }
470
471    for header, info in recommended_headers.items():
472        if header not in headers:
473            findings.append({
474                "type": "MISSING",
475                "header": header,
476                "severity": info["severity"],
477                "description": info["description"],
478                "recommendation": f"Add: {header}: {info['expected']}",
479            })
480
481    for header, desc in dangerous_headers.items():
482        if header in headers:
483            findings.append({
484                "type": "REMOVE",
485                "header": header,
486                "severity": "LOW",
487                "description": desc,
488                "recommendation": f"Remove {header} header",
489            })
490
491    return findings
492
493
494# Example: Check insecure headers
495insecure_headers = {
496    "Server": "Apache/2.4.41 (Ubuntu)",
497    "X-Powered-By": "Express",
498    "Content-Type": "text/html",
499}
500
501print("\n  Security Headers Audit:")
502print(f"  Checking response headers: {json.dumps(insecure_headers, indent=2)}")
503print()
504
505findings = security_headers_check(insecure_headers)
506for f in findings:
507    icon = "!" if f["severity"] == "HIGH" else "-"
508    print(f"    [{f['severity']:<6}] {icon} {f['type']}: {f['header']}")
509    print(f"              {f['description']}")
510    print(f"              Fix: {f['recommendation']}")
511print()
512
513
514# ============================================================
515# Section 7: Summary
516# ============================================================
517
518print("=" * 65)
519print("  OWASP Top 10 (2021) Summary")
520print("=" * 65)
521print("""
522  Rank | Category                         | Key Defense
523  -----+----------------------------------+---------------------------
524  A01  | Broken Access Control            | Authz checks, deny default
525  A02  | Cryptographic Failures           | TLS, strong hashing, KMS
526  A03  | Injection                        | Parameterized queries
527  A04  | Insecure Design                  | Threat modeling, secure arch
528  A05  | Security Misconfiguration        | Hardening, security headers
529  A06  | Vulnerable Components            | Dependency scanning, SCA
530  A07  | Auth & Identity Failures         | MFA, rate limiting
531  A08  | Software & Data Integrity        | CI/CD security, signatures
532  A09  | Logging & Monitoring Failures    | Centralized logging, alerts
533  A10  | Server-Side Request Forgery      | Input validation, allowlists
534
535  Remember: Security is a process, not a product.
536  Apply defense in depth - no single control is sufficient.
537""")
538
539# Cleanup
540conn.close()