Project: Building a Vulnerability Scanner
Project: Building a Vulnerability Scanner¶
Previous: 15. Project: Building a Secure REST API
This project lesson walks through building a comprehensive vulnerability scanner in Python. The scanner performs port scanning, service banner grabbing, HTTP security header checking, SSL/TLS configuration analysis, directory brute forcing, and CVE database lookups. We will build a modular, extensible tool with a CLI interface, structured output, and built-in rate limiting to ensure responsible scanning. This project ties together many concepts from the entire Security topic.
IMPORTANT: Only scan systems you own or have explicit written permission to test. Unauthorized scanning is illegal in most jurisdictions. This tool is for educational purposes and authorized security assessments only.
Learning Objectives¶
- Build a modular vulnerability scanner with Python
- Implement TCP port scanning with service detection
- Analyze HTTP security headers and SSL/TLS configurations
- Perform directory and path discovery (educational)
- Look up known vulnerabilities in CVE databases
- Generate structured scan reports
- Apply rate limiting and responsible scanning practices
- Understand the ethical and legal boundaries of security scanning
- Design a CLI tool with argparse
1. Project Overview¶
1.1 Scanner Architecture¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Vulnerability Scanner Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β CLI Interface (argparse) β
β β β
β βΌ β
β ββββββββββββββββββββ β
β β Scan Controller β Orchestrates all scan modules β
β β (main.py) β Manages rate limiting β
β ββββββββββ¬ββββββββββ Generates reports β
β β β
β βββββββΌββββββββββββββββββββββββββββββ β
β β β β β β β
β βΌ βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββ β
β βPort ββBannerββ HTTP ββ SSL ββDirectory β β
β βScan ββGrab ββHeaders ββ/TLS ββDiscovery β β
β β ββ ββCheck ββCheck ββ β β
β ββββ¬βββββββ¬βββββββββ¬βββββββββ¬ββββββββββ¬ββββββ β
β β β β β β β
β ββββββββ΄βββββββββ΄βββββββββ΄ββββββββββ β
β β β
β βΌ β
β ββββββββββββββββββ β
β β CVE Lookup β Match services to known CVEs β
β ββββββββββ¬ββββββββ β
β β β
β βΌ β
β ββββββββββββββββββ β
β β Report Engine β JSON, text, HTML output β
β ββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 Project Structure¶
vuln_scanner/
βββ scanner/
β βββ __init__.py
β βββ controller.py # Scan orchestration
β βββ port_scanner.py # TCP port scanning
β βββ banner_grabber.py # Service banner detection
β βββ http_checker.py # HTTP security header analysis
β βββ ssl_checker.py # SSL/TLS configuration analysis
β βββ dir_scanner.py # Directory/path discovery
β βββ cve_lookup.py # CVE database queries
β βββ report.py # Report generation
β βββ utils.py # Shared utilities, rate limiter
βββ wordlists/
β βββ common_paths.txt # Directory wordlist (small, for demo)
βββ main.py # CLI entry point
βββ requirements.txt
βββ README.md
1.3 Dependencies¶
# requirements.txt
requests==2.31.0
cryptography==42.0.4
2. Ethical Considerations and Legal Boundaries¶
2.1 Rules of Engagement¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ETHICAL AND LEGAL BOUNDARIES β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β YOU MUST: β
β β Only scan systems you OWN or have WRITTEN permission to test β
β β Get explicit, documented authorization before scanning β
β β Define scope clearly (which IPs, ports, tests) β
β β Respect rate limits and bandwidth constraints β
β β Report findings responsibly to the system owner β
β β Stop immediately if you discover you are scanning the β
β wrong target β
β β Keep all findings confidential β
β β
β YOU MUST NOT: β
β β Scan systems without authorization β
β β Exploit discovered vulnerabilities (scan, don't attack) β
β β Perform denial of service (even accidentally via scan volume)β
β β Access, modify, or exfiltrate data β
β β Share findings publicly without owner's consent β
β β Use this tool for competitive intelligence or harassment β
β β
β LEGAL NOTES: β
β - Computer Fraud and Abuse Act (CFAA) - USA β
β - Computer Misuse Act 1990 - UK β
β - StGB Β§202a-c - Germany β
β - Similar laws exist in virtually every country β
β - Unauthorized scanning can result in criminal charges β
β - "I was just testing" is NOT a legal defense β
β β
β SAFE TARGETS FOR PRACTICE: β
β - Your own machines / VMs β
β - Intentionally vulnerable apps: β
β * DVWA (Damn Vulnerable Web Application) β
β * HackTheBox, TryHackMe (with permission) β
β * OWASP WebGoat β
β * Metasploitable β
β - Bug bounty programs (within scope) β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3. Shared Utilities and Rate Limiter¶
3.1 Core Utilities¶
"""
scanner/utils.py - Shared utilities, rate limiter, and data models.
"""
import time
import socket
import threading
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
class Severity(str, Enum):
"""Vulnerability severity levels."""
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
INFO = "INFO"
@dataclass
class Finding:
"""A single vulnerability or information finding."""
title: str
severity: Severity
description: str
module: str # Which scanner module found this
details: dict = field(default_factory=dict)
remediation: str = ""
reference: str = "" # URL or CVE reference
def __str__(self):
return f"[{self.severity.value}] {self.title}: {self.description}"
@dataclass
class PortInfo:
"""Information about a discovered port."""
port: int
state: str # open, closed, filtered
service: str = "" # Detected service name
version: str = "" # Service version
banner: str = "" # Raw banner text
protocol: str = "tcp"
@dataclass
class ScanTarget:
"""Scan target specification."""
host: str
ip: Optional[str] = None
ports: list[int] = field(default_factory=list)
def resolve(self) -> bool:
"""Resolve hostname to IP address."""
try:
self.ip = socket.gethostbyname(self.host)
return True
except socket.gaierror:
return False
@dataclass
class ScanResult:
"""Complete scan result for a target."""
target: ScanTarget
start_time: str = ""
end_time: str = ""
duration_seconds: float = 0.0
open_ports: list[PortInfo] = field(default_factory=list)
findings: list[Finding] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
def __post_init__(self):
if not self.start_time:
self.start_time = datetime.now().isoformat()
class RateLimiter:
"""
Thread-safe rate limiter using token bucket algorithm.
Ensures we don't overwhelm the target with requests.
"""
def __init__(self, requests_per_second: float = 10.0):
"""
Args:
requests_per_second: Maximum requests per second.
"""
self.rate = requests_per_second
self.tokens = requests_per_second
self.max_tokens = requests_per_second
self.last_time = time.monotonic()
self.lock = threading.Lock()
def acquire(self) -> None:
"""Block until a token is available."""
while True:
with self.lock:
now = time.monotonic()
elapsed = now - self.last_time
self.last_time = now
# Add tokens based on elapsed time
self.tokens = min(
self.max_tokens,
self.tokens + elapsed * self.rate
)
if self.tokens >= 1.0:
self.tokens -= 1.0
return
# No tokens available, wait a bit
time.sleep(1.0 / self.rate)
def set_rate(self, requests_per_second: float) -> None:
"""Update the rate limit."""
with self.lock:
self.rate = requests_per_second
self.max_tokens = requests_per_second
def is_valid_target(host: str) -> bool:
"""
Validate that a target is potentially valid and not a
reserved/dangerous address.
"""
# Block scanning of localhost (unless explicitly intended)
dangerous_hosts = {
'localhost', '127.0.0.1', '::1',
'0.0.0.0',
}
if host.lower() in dangerous_hosts:
return False
# Block private IP ranges (comment out for internal scanning)
# This is a safety measure for educational use
try:
ip = socket.gethostbyname(host)
parts = ip.split('.')
if len(parts) == 4:
first_octet = int(parts[0])
second_octet = int(parts[1])
# 10.0.0.0/8
if first_octet == 10:
print(f" [!] Warning: {host} ({ip}) is a private address")
# 172.16.0.0/12
if first_octet == 172 and 16 <= second_octet <= 31:
print(f" [!] Warning: {host} ({ip}) is a private address")
# 192.168.0.0/16
if first_octet == 192 and second_octet == 168:
print(f" [!] Warning: {host} ({ip}) is a private address")
except socket.gaierror:
pass
return True
# Common port list (top 100 most common)
COMMON_PORTS = [
20, 21, 22, 23, 25, 53, 67, 68, 69, 80,
110, 111, 119, 123, 135, 137, 138, 139, 143,
161, 162, 179, 194, 389, 443, 445, 465, 514,
515, 520, 521, 587, 631, 636, 873, 993, 995,
1080, 1194, 1433, 1434, 1521, 1723, 2049, 2082,
2083, 2086, 2087, 2096, 2100, 3128, 3306, 3389,
5060, 5432, 5900, 5901, 6379, 6667, 8000, 8008,
8080, 8443, 8888, 9090, 9200, 9300, 10000, 11211,
27017, 27018, 28017, 50000,
]
# Well-known service names
SERVICE_NAMES = {
20: "ftp-data", 21: "ftp", 22: "ssh", 23: "telnet",
25: "smtp", 53: "dns", 80: "http", 110: "pop3",
111: "rpcbind", 119: "nntp", 123: "ntp", 135: "msrpc",
139: "netbios-ssn", 143: "imap", 161: "snmp", 389: "ldap",
443: "https", 445: "microsoft-ds", 465: "smtps", 514: "syslog",
587: "submission", 636: "ldaps", 993: "imaps", 995: "pop3s",
1433: "mssql", 1521: "oracle", 3306: "mysql", 3389: "rdp",
5432: "postgresql", 5900: "vnc", 6379: "redis",
8080: "http-proxy", 8443: "https-alt", 9200: "elasticsearch",
11211: "memcached", 27017: "mongodb",
}
4. Port Scanner Module¶
4.1 TCP Port Scanner¶
"""
scanner/port_scanner.py - TCP port scanner module.
"""
import socket
import concurrent.futures
from typing import Optional
from scanner.utils import (
PortInfo, RateLimiter, ScanTarget, Finding, Severity,
COMMON_PORTS, SERVICE_NAMES,
)
class PortScanner:
"""
TCP Connect port scanner.
This performs a full TCP three-way handshake to determine
if a port is open. It is reliable but easily detectable.
For educational purposes only. SYN scanning (half-open)
requires raw socket privileges and is not implemented here.
"""
def __init__(
self,
timeout: float = 2.0,
rate_limiter: Optional[RateLimiter] = None,
max_workers: int = 50,
):
"""
Args:
timeout: Connection timeout in seconds per port.
rate_limiter: Rate limiter for responsible scanning.
max_workers: Maximum concurrent connection attempts.
"""
self.timeout = timeout
self.rate_limiter = rate_limiter or RateLimiter(100.0)
self.max_workers = max_workers
def scan_port(self, host: str, port: int) -> PortInfo:
"""
Scan a single port on the target host.
Performs a TCP connect scan (full three-way handshake).
Args:
host: Target hostname or IP.
port: Port number to scan.
Returns:
PortInfo with the scan result.
"""
self.rate_limiter.acquire()
port_info = PortInfo(
port=port,
state="closed",
service=SERVICE_NAMES.get(port, "unknown"),
)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
result = sock.connect_ex((host, port))
if result == 0:
port_info.state = "open"
# Try to get service banner
try:
# Send a generic probe for some services
if port in (80, 8080, 8443):
sock.sendall(
f"HEAD / HTTP/1.1\r\nHost: {host}\r\n\r\n".encode()
)
elif port == 22:
pass # SSH sends banner automatically
elif port in (21, 25, 110, 143):
pass # These services send banners
sock.settimeout(1.5)
banner = sock.recv(1024)
port_info.banner = banner.decode('utf-8', errors='replace').strip()
port_info.version = self._extract_version(port_info.banner)
except (socket.timeout, ConnectionResetError, OSError):
pass
sock.close()
except socket.timeout:
port_info.state = "filtered"
except ConnectionRefusedError:
port_info.state = "closed"
except OSError as e:
port_info.state = "error"
return port_info
def scan_ports(
self,
host: str,
ports: Optional[list[int]] = None,
) -> list[PortInfo]:
"""
Scan multiple ports concurrently.
Args:
host: Target hostname or IP.
ports: List of ports to scan. Defaults to common ports.
Returns:
List of PortInfo for open ports.
"""
if ports is None:
ports = COMMON_PORTS
print(f"[*] Scanning {len(ports)} ports on {host}...")
open_ports = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
# Submit all port scan tasks
future_to_port = {
executor.submit(self.scan_port, host, port): port
for port in ports
}
# Collect results
completed = 0
for future in concurrent.futures.as_completed(future_to_port):
completed += 1
if completed % 100 == 0:
print(f" Progress: {completed}/{len(ports)} ports scanned")
try:
port_info = future.result()
if port_info.state == "open":
open_ports.append(port_info)
print(f" [+] Port {port_info.port:5d}/tcp OPEN "
f"{port_info.service} {port_info.version}")
except Exception as e:
port = future_to_port[future]
print(f" [!] Error scanning port {port}: {e}")
open_ports.sort(key=lambda p: p.port)
print(f"[*] Found {len(open_ports)} open ports")
return open_ports
def _extract_version(self, banner: str) -> str:
"""Extract service version from banner string."""
if not banner:
return ""
# Common patterns
# SSH: SSH-2.0-OpenSSH_8.9
if banner.startswith("SSH-"):
parts = banner.split("-")
if len(parts) >= 3:
return parts[2].split()[0]
# HTTP: Server: nginx/1.24.0
if "Server:" in banner:
for line in banner.split("\r\n"):
if line.startswith("Server:"):
return line.split(":", 1)[1].strip()
# FTP: 220 vsftpd 3.0.5
if banner.startswith("220"):
parts = banner.split()
if len(parts) >= 3:
return " ".join(parts[1:3])
# SMTP: 220 mail.example.com ESMTP Postfix
if "ESMTP" in banner or "SMTP" in banner:
return banner.split("\r\n")[0][:60]
return banner[:60]
def generate_findings(self, open_ports: list[PortInfo]) -> list[Finding]:
"""Generate security findings from open ports."""
findings = []
# Dangerous/insecure services
dangerous_ports = {
21: ("FTP", "FTP transmits credentials in plaintext. Use SFTP instead."),
23: ("Telnet", "Telnet transmits all data in plaintext. Use SSH instead."),
69: ("TFTP", "TFTP has no authentication. Restrict access."),
111: ("RPC", "RPCbind can leak service information. Restrict to trusted networks."),
135: ("MSRPC", "Microsoft RPC can be exploited for remote code execution."),
139: ("NetBIOS", "NetBIOS can leak system information. Block from external access."),
445: ("SMB", "SMB has been target of major exploits (EternalBlue). Restrict access."),
161: ("SNMP", "SNMP v1/v2c use community strings (plaintext). Use v3 with auth."),
1433: ("MSSQL", "Database port should not be publicly accessible."),
3306: ("MySQL", "Database port should not be publicly accessible."),
5432: ("PostgreSQL", "Database port should not be publicly accessible."),
6379: ("Redis", "Redis often has no authentication. Never expose publicly."),
11211: ("Memcached", "Memcached has no built-in auth. Used in amplification attacks."),
27017: ("MongoDB", "MongoDB should not be publicly accessible."),
5900: ("VNC", "VNC may use weak authentication. Use VPN for remote access."),
}
for port_info in open_ports:
if port_info.port in dangerous_ports:
service_name, description = dangerous_ports[port_info.port]
findings.append(Finding(
title=f"Potentially Dangerous Service: {service_name}",
severity=Severity.HIGH if port_info.port in (6379, 11211, 27017)
else Severity.MEDIUM,
description=description,
module="port_scanner",
details={
"port": port_info.port,
"service": port_info.service,
"banner": port_info.banner[:200],
},
remediation=(
f"Restrict port {port_info.port} to trusted networks using "
f"firewall rules. Consider disabling if not needed."
),
))
# Check for unencrypted HTTP (when HTTPS is also available)
has_http = any(p.port == 80 for p in open_ports)
has_https = any(p.port == 443 for p in open_ports)
if has_http and not has_https:
findings.append(Finding(
title="HTTP Without HTTPS",
severity=Severity.HIGH,
description="HTTP (port 80) is open but HTTPS (port 443) is not detected.",
module="port_scanner",
remediation="Enable HTTPS and redirect HTTP to HTTPS.",
))
elif has_http and has_https:
findings.append(Finding(
title="HTTP Port Open (Redirect Needed)",
severity=Severity.LOW,
description="HTTP (port 80) is open alongside HTTPS. Ensure HTTP redirects to HTTPS.",
module="port_scanner",
remediation="Configure HTTP to HTTPS 301 redirect.",
))
return findings
5. Banner Grabber Module¶
5.1 Service Banner Detection¶
"""
scanner/banner_grabber.py - Service banner grabbing and version detection.
"""
import socket
import ssl
from typing import Optional
from scanner.utils import PortInfo, Finding, Severity, RateLimiter
class BannerGrabber:
"""
Grabs service banners to identify running software and versions.
Service banners often reveal software name, version, and OS,
which can be used to identify known vulnerabilities.
"""
# Protocol-specific probes
PROBES = {
# HTTP probe
"http": b"HEAD / HTTP/1.1\r\nHost: {host}\r\nUser-Agent: SecurityScanner/1.0\r\nAccept: */*\r\nConnection: close\r\n\r\n",
# FTP expects no probe (server sends banner)
"ftp": b"",
# SMTP expects no probe
"smtp": b"",
# SSH expects no probe
"ssh": b"",
# POP3 expects no probe
"pop3": b"",
# IMAP expects no probe
"imap": b"",
# MySQL probe
"mysql": b"",
# Redis probe
"redis": b"INFO\r\n",
}
def __init__(
self,
timeout: float = 3.0,
rate_limiter: Optional[RateLimiter] = None,
):
self.timeout = timeout
self.rate_limiter = rate_limiter or RateLimiter(10.0)
def grab_banner(self, host: str, port_info: PortInfo) -> PortInfo:
"""
Attempt to grab a service banner from an open port.
Updates the port_info with banner and version information.
"""
if port_info.state != "open":
return port_info
if port_info.banner:
# Already have a banner from port scan
return port_info
self.rate_limiter.acquire()
# Determine the appropriate probe
service = port_info.service.lower()
probe = self.PROBES.get(service, b"")
# Special handling for HTTP
if service in ("http", "http-proxy"):
probe = self.PROBES["http"].replace(b"{host}", host.encode())
try:
# Determine if we should use SSL
use_ssl = port_info.port in (443, 465, 636, 993, 995, 8443)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect((host, port_info.port))
if use_ssl:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = context.wrap_socket(sock, server_hostname=host)
# Send probe if needed
if probe:
sock.sendall(probe)
# Receive response
banner = sock.recv(4096)
port_info.banner = banner.decode('utf-8', errors='replace').strip()
# Extract version from banner
port_info.version = self._parse_version(
port_info.banner, port_info.service
)
sock.close()
except (socket.timeout, ConnectionRefusedError, ConnectionResetError,
ssl.SSLError, OSError):
pass
return port_info
def grab_all_banners(
self, host: str, open_ports: list[PortInfo]
) -> list[PortInfo]:
"""Grab banners from all open ports."""
print(f"[*] Grabbing banners from {len(open_ports)} open ports...")
for port_info in open_ports:
self.grab_banner(host, port_info)
if port_info.banner:
version = port_info.version or "(no version)"
print(f" [+] Port {port_info.port}: {version}")
return open_ports
def _parse_version(self, banner: str, service: str) -> str:
"""Parse version information from service banner."""
if not banner:
return ""
service = service.lower()
# SSH
if service == "ssh" or banner.startswith("SSH-"):
parts = banner.split("-")
if len(parts) >= 3:
return parts[2].split("\r")[0].split("\n")[0]
# HTTP Server header
if service in ("http", "https", "http-proxy", "https-alt"):
for line in banner.split("\r\n"):
if line.lower().startswith("server:"):
return line.split(":", 1)[1].strip()
return ""
# FTP
if service == "ftp" and banner.startswith("220"):
return banner.split("\r")[0].split("\n")[0][4:].strip()
# SMTP
if service in ("smtp", "smtps", "submission"):
return banner.split("\r")[0].split("\n")[0].strip()
# MySQL
if service == "mysql":
# MySQL banner has version in initial handshake packet
try:
# Look for version string pattern (e.g., 8.0.35)
import re
match = re.search(r'(\d+\.\d+\.\d+)', banner)
if match:
return f"MySQL {match.group(1)}"
except Exception:
pass
# Redis
if service == "redis" and "redis_version:" in banner:
for line in banner.split("\r\n"):
if line.startswith("redis_version:"):
return f"Redis {line.split(':')[1]}"
# Default: first line, truncated
return banner.split("\r")[0].split("\n")[0][:80]
def generate_findings(self, open_ports: list[PortInfo]) -> list[Finding]:
"""Generate findings from banner information."""
findings = []
for port_info in open_ports:
if not port_info.banner:
continue
banner_lower = port_info.banner.lower()
# Check for outdated/vulnerable versions
version_checks = [
("apache/2.2", "Apache 2.2 is end-of-life",
Severity.HIGH, "Upgrade to Apache 2.4+"),
("apache/2.4.49", "Apache 2.4.49 has path traversal vulnerability (CVE-2021-41773)",
Severity.CRITICAL, "Upgrade to Apache 2.4.51+"),
("nginx/1.1", "Very old nginx version detected",
Severity.MEDIUM, "Upgrade to latest stable nginx"),
("openssh_7.", "OpenSSH 7.x may have known vulnerabilities",
Severity.MEDIUM, "Upgrade to OpenSSH 9.x+"),
("openssh_6.", "OpenSSH 6.x has known vulnerabilities",
Severity.HIGH, "Upgrade to OpenSSH 9.x+"),
("mysql 5.5", "MySQL 5.5 is end-of-life",
Severity.HIGH, "Upgrade to MySQL 8.0+"),
("mysql 5.6", "MySQL 5.6 is end-of-life",
Severity.HIGH, "Upgrade to MySQL 8.0+"),
]
for pattern, description, severity, fix in version_checks:
if pattern in banner_lower:
findings.append(Finding(
title=f"Outdated Software: {port_info.version or pattern}",
severity=severity,
description=description,
module="banner_grabber",
details={
"port": port_info.port,
"banner": port_info.banner[:200],
},
remediation=fix,
))
# Check for verbose banners (information disclosure)
if any(keyword in banner_lower for keyword in
['ubuntu', 'debian', 'centos', 'red hat', 'windows']):
findings.append(Finding(
title="OS Information Disclosure",
severity=Severity.LOW,
description=(
f"Service banner reveals operating system information: "
f"{port_info.banner[:100]}"
),
module="banner_grabber",
details={"port": port_info.port, "banner": port_info.banner[:200]},
remediation="Configure service to hide OS details from banner.",
))
return findings
6. HTTP Security Header Checker¶
6.1 Header Analysis¶
"""
scanner/http_checker.py - HTTP security header analysis.
"""
import requests
from typing import Optional
from urllib.parse import urlparse
from scanner.utils import Finding, Severity, RateLimiter
class HTTPChecker:
"""
Checks HTTP security headers and common web misconfigurations.
"""
# Required security headers and their expected values
SECURITY_HEADERS = {
"Strict-Transport-Security": {
"description": "HTTP Strict Transport Security (HSTS)",
"severity": Severity.HIGH,
"recommendation": "Add: Strict-Transport-Security: max-age=31536000; includeSubDomains",
"check_value": lambda v: "max-age=" in v and int(
v.split("max-age=")[1].split(";")[0].strip()
) >= 31536000 if "max-age=" in v else False,
},
"Content-Security-Policy": {
"description": "Content Security Policy (CSP)",
"severity": Severity.HIGH,
"recommendation": "Add a Content-Security-Policy header. Start with: default-src 'self'",
"check_value": lambda v: "default-src" in v or "script-src" in v,
},
"X-Content-Type-Options": {
"description": "MIME type sniffing prevention",
"severity": Severity.MEDIUM,
"recommendation": "Add: X-Content-Type-Options: nosniff",
"check_value": lambda v: v.lower() == "nosniff",
},
"X-Frame-Options": {
"description": "Clickjacking protection",
"severity": Severity.MEDIUM,
"recommendation": "Add: X-Frame-Options: DENY (or SAMEORIGIN)",
"check_value": lambda v: v.upper() in ("DENY", "SAMEORIGIN"),
},
"Referrer-Policy": {
"description": "Referrer information control",
"severity": Severity.LOW,
"recommendation": "Add: Referrer-Policy: strict-origin-when-cross-origin",
"check_value": lambda v: v in (
"no-referrer", "strict-origin",
"strict-origin-when-cross-origin", "same-origin"
),
},
"Permissions-Policy": {
"description": "Browser feature restrictions",
"severity": Severity.LOW,
"recommendation": "Add: Permissions-Policy: camera=(), microphone=(), geolocation=()",
"check_value": lambda v: len(v) > 0,
},
}
# Headers that should NOT be present (information disclosure)
BAD_HEADERS = {
"X-Powered-By": {
"description": "Reveals server technology",
"severity": Severity.LOW,
"recommendation": "Remove X-Powered-By header to reduce information disclosure.",
},
"Server": {
"description": "May reveal server software and version",
"severity": Severity.LOW,
"recommendation": "Configure server to send minimal Server header.",
"check_value": lambda v: any(
keyword in v.lower()
for keyword in ["apache", "nginx", "iis", "tomcat", "express"]
),
},
"X-AspNet-Version": {
"description": "Reveals ASP.NET version",
"severity": Severity.LOW,
"recommendation": "Remove X-AspNet-Version header.",
},
}
def __init__(
self,
timeout: float = 10.0,
rate_limiter: Optional[RateLimiter] = None,
verify_ssl: bool = True,
):
self.timeout = timeout
self.rate_limiter = rate_limiter or RateLimiter(5.0)
self.verify_ssl = verify_ssl
def check(self, url: str) -> list[Finding]:
"""
Check HTTP security headers for a URL.
Args:
url: Target URL (http:// or https://).
Returns:
List of findings.
"""
self.rate_limiter.acquire()
findings = []
print(f"[*] Checking HTTP security headers for {url}...")
try:
response = requests.get(
url,
timeout=self.timeout,
verify=self.verify_ssl,
allow_redirects=True,
headers={
"User-Agent": "SecurityScanner/1.0 (Authorized Security Scan)"
},
)
except requests.exceptions.SSLError as e:
findings.append(Finding(
title="SSL/TLS Error",
severity=Severity.HIGH,
description=f"SSL error connecting to {url}: {str(e)[:200]}",
module="http_checker",
remediation="Fix SSL/TLS configuration. Ensure valid certificate.",
))
return findings
except requests.exceptions.ConnectionError as e:
findings.append(Finding(
title="Connection Error",
severity=Severity.INFO,
description=f"Could not connect to {url}: {str(e)[:200]}",
module="http_checker",
))
return findings
except requests.exceptions.Timeout:
findings.append(Finding(
title="Connection Timeout",
severity=Severity.INFO,
description=f"Connection to {url} timed out after {self.timeout}s",
module="http_checker",
))
return findings
headers = response.headers
# Check for missing security headers
for header_name, config in self.SECURITY_HEADERS.items():
value = headers.get(header_name)
if not value:
findings.append(Finding(
title=f"Missing Security Header: {header_name}",
severity=config["severity"],
description=f"{config['description']} header is not set.",
module="http_checker",
details={"header": header_name, "url": url},
remediation=config["recommendation"],
))
elif config.get("check_value"):
try:
if not config["check_value"](value):
findings.append(Finding(
title=f"Weak Security Header: {header_name}",
severity=Severity.LOW,
description=(
f"{header_name} is set but may be insufficiently configured: "
f"{value[:100]}"
),
module="http_checker",
details={"header": header_name, "value": value, "url": url},
remediation=config["recommendation"],
))
except (ValueError, IndexError):
pass
# Check for information disclosure headers
for header_name, config in self.BAD_HEADERS.items():
value = headers.get(header_name)
if value:
# For Server header, only flag if verbose
if header_name == "Server" and config.get("check_value"):
if not config["check_value"](value):
continue # Minimal server header, OK
findings.append(Finding(
title=f"Information Disclosure: {header_name}",
severity=config["severity"],
description=f"{config['description']}: {value}",
module="http_checker",
details={"header": header_name, "value": value},
remediation=config["recommendation"],
))
# Check for HTTPS redirect
if url.startswith("http://"):
if not response.url.startswith("https://"):
findings.append(Finding(
title="No HTTP to HTTPS Redirect",
severity=Severity.HIGH,
description="HTTP requests are not redirected to HTTPS.",
module="http_checker",
remediation="Configure server to redirect all HTTP requests to HTTPS.",
))
# Check cookie security
for cookie_header in response.headers.getlist("Set-Cookie") if hasattr(
response.headers, 'getlist'
) else []:
self._check_cookie_security(cookie_header, findings, url)
# Check for cookies in raw headers
raw_cookies = headers.get("Set-Cookie", "")
if raw_cookies:
self._check_cookie_security(raw_cookies, findings, url)
print(f" Found {len(findings)} HTTP security issues")
return findings
def _check_cookie_security(self, cookie_str: str, findings: list[Finding],
url: str) -> None:
"""Check cookie security attributes."""
cookie_lower = cookie_str.lower()
if "secure" not in cookie_lower and url.startswith("https"):
findings.append(Finding(
title="Cookie Without Secure Flag",
severity=Severity.MEDIUM,
description="A cookie is set without the Secure flag over HTTPS.",
module="http_checker",
details={"cookie": cookie_str[:100]},
remediation="Add Secure flag to all cookies on HTTPS sites.",
))
if "httponly" not in cookie_lower:
findings.append(Finding(
title="Cookie Without HttpOnly Flag",
severity=Severity.MEDIUM,
description="A cookie is set without the HttpOnly flag (accessible by JavaScript).",
module="http_checker",
details={"cookie": cookie_str[:100]},
remediation="Add HttpOnly flag to cookies not needed by client-side JavaScript.",
))
if "samesite" not in cookie_lower:
findings.append(Finding(
title="Cookie Without SameSite Attribute",
severity=Severity.LOW,
description="A cookie is missing the SameSite attribute (CSRF risk).",
module="http_checker",
details={"cookie": cookie_str[:100]},
remediation="Add SameSite=Lax or SameSite=Strict to cookies.",
))
7. SSL/TLS Configuration Analyzer¶
7.1 SSL/TLS Checker¶
"""
scanner/ssl_checker.py - SSL/TLS configuration analyzer.
"""
import socket
import ssl
from datetime import datetime, timezone
from typing import Optional
from scanner.utils import Finding, Severity, RateLimiter
class SSLChecker:
"""
Analyzes SSL/TLS configuration for security issues.
Checks:
- Certificate validity and expiration
- Protocol version support
- Cipher suite strength
- Common misconfigurations
"""
# Weak protocols
WEAK_PROTOCOLS = {
ssl.TLSVersion.SSLv3: "SSLv3 (POODLE vulnerability)",
ssl.TLSVersion.TLSv1: "TLS 1.0 (deprecated, multiple vulnerabilities)",
ssl.TLSVersion.TLSv1_1: "TLS 1.1 (deprecated)",
}
# Weak cipher keywords
WEAK_CIPHER_KEYWORDS = [
"RC4", "DES", "3DES", "MD5", "NULL", "EXPORT",
"anon", "RC2", "SEED", "IDEA",
]
def __init__(
self,
timeout: float = 10.0,
rate_limiter: Optional[RateLimiter] = None,
):
self.timeout = timeout
self.rate_limiter = rate_limiter or RateLimiter(5.0)
def check(self, host: str, port: int = 443) -> list[Finding]:
"""
Check SSL/TLS configuration of a host.
Args:
host: Target hostname.
port: HTTPS port (default 443).
Returns:
List of findings.
"""
self.rate_limiter.acquire()
findings = []
print(f"[*] Checking SSL/TLS configuration for {host}:{port}...")
# 1. Check certificate
cert_findings = self._check_certificate(host, port)
findings.extend(cert_findings)
# 2. Check supported protocols
protocol_findings = self._check_protocols(host, port)
findings.extend(protocol_findings)
# 3. Check cipher suites
cipher_findings = self._check_ciphers(host, port)
findings.extend(cipher_findings)
print(f" Found {len(findings)} SSL/TLS issues")
return findings
def _check_certificate(self, host: str, port: int) -> list[Finding]:
"""Check SSL certificate validity."""
findings = []
try:
context = ssl.create_default_context()
with socket.create_connection(
(host, port), timeout=self.timeout
) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
cert = ssock.getpeercert()
if not cert:
findings.append(Finding(
title="No SSL Certificate",
severity=Severity.CRITICAL,
description="No SSL certificate presented by server.",
module="ssl_checker",
remediation="Install a valid SSL certificate.",
))
return findings
# Check expiration
not_after = datetime.strptime(
cert['notAfter'], '%b %d %H:%M:%S %Y %Z'
).replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
days_until_expiry = (not_after - now).days
if days_until_expiry < 0:
findings.append(Finding(
title="Expired SSL Certificate",
severity=Severity.CRITICAL,
description=f"Certificate expired {abs(days_until_expiry)} days ago.",
module="ssl_checker",
details={
"expiry_date": cert['notAfter'],
"days_expired": abs(days_until_expiry),
},
remediation="Renew the SSL certificate immediately.",
))
elif days_until_expiry < 30:
findings.append(Finding(
title="SSL Certificate Expiring Soon",
severity=Severity.HIGH,
description=f"Certificate expires in {days_until_expiry} days.",
module="ssl_checker",
details={
"expiry_date": cert['notAfter'],
"days_remaining": days_until_expiry,
},
remediation="Renew the SSL certificate before expiration.",
))
elif days_until_expiry < 90:
findings.append(Finding(
title="SSL Certificate Expiring Within 90 Days",
severity=Severity.MEDIUM,
description=f"Certificate expires in {days_until_expiry} days.",
module="ssl_checker",
details={
"expiry_date": cert['notAfter'],
"days_remaining": days_until_expiry,
},
remediation="Plan certificate renewal.",
))
# Check subject alternative names
sans = []
for type_name, value in cert.get('subjectAltName', []):
if type_name == 'DNS':
sans.append(value)
if host not in sans and f"*.{'.'.join(host.split('.')[1:])}" not in sans:
findings.append(Finding(
title="Certificate Hostname Mismatch",
severity=Severity.HIGH,
description=(
f"Certificate SANs {sans} do not include {host}."
),
module="ssl_checker",
remediation="Reissue certificate with correct hostname.",
))
# Check issuer (self-signed)
issuer = dict(x[0] for x in cert.get('issuer', []))
subject = dict(x[0] for x in cert.get('subject', []))
if issuer == subject:
findings.append(Finding(
title="Self-Signed Certificate",
severity=Severity.MEDIUM,
description="The certificate is self-signed (not trusted by browsers).",
module="ssl_checker",
details={"issuer": str(issuer)},
remediation="Use a certificate from a trusted CA (e.g., Let's Encrypt).",
))
except ssl.SSLCertVerificationError as e:
findings.append(Finding(
title="SSL Certificate Verification Failed",
severity=Severity.HIGH,
description=f"Certificate verification error: {str(e)[:200]}",
module="ssl_checker",
remediation="Fix certificate chain or install valid certificate.",
))
except (socket.timeout, ConnectionRefusedError, OSError) as e:
findings.append(Finding(
title="SSL Connection Failed",
severity=Severity.INFO,
description=f"Could not establish SSL connection: {str(e)[:200]}",
module="ssl_checker",
))
return findings
def _check_protocols(self, host: str, port: int) -> list[Finding]:
"""Check which TLS protocol versions are supported."""
findings = []
# Test for weak protocols
for protocol_version, description in self.WEAK_PROTOCOLS.items():
try:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.minimum_version = protocol_version
context.maximum_version = protocol_version
with socket.create_connection(
(host, port), timeout=self.timeout
) as sock:
with context.wrap_socket(sock) as ssock:
# If we get here, the weak protocol is supported
findings.append(Finding(
title=f"Weak TLS Protocol Supported",
severity=Severity.HIGH if "SSLv3" in description
or "TLS 1.0" in description
else Severity.MEDIUM,
description=f"Server supports {description}.",
module="ssl_checker",
details={"protocol": str(protocol_version)},
remediation=f"Disable {description}. Only allow TLS 1.2+.",
))
except (ssl.SSLError, socket.timeout, ConnectionRefusedError, OSError):
pass # Protocol not supported (good)
return findings
def _check_ciphers(self, host: str, port: int) -> list[Finding]:
"""Check for weak cipher suites."""
findings = []
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection(
(host, port), timeout=self.timeout
) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
cipher = ssock.cipher()
if cipher:
cipher_name, protocol, bits = cipher
# Check for weak ciphers
for weak_keyword in self.WEAK_CIPHER_KEYWORDS:
if weak_keyword in cipher_name.upper():
findings.append(Finding(
title=f"Weak Cipher Suite: {cipher_name}",
severity=Severity.HIGH,
description=(
f"Server negotiated weak cipher: {cipher_name} "
f"({bits} bits, {protocol})"
),
module="ssl_checker",
details={
"cipher": cipher_name,
"protocol": protocol,
"bits": bits,
},
remediation="Disable weak cipher suites. Use ECDHE-RSA-AES256-GCM-SHA384 or similar.",
))
break
# Check key length
if bits and bits < 128:
findings.append(Finding(
title=f"Weak Cipher Key Length: {bits} bits",
severity=Severity.HIGH,
description=f"Cipher key length is only {bits} bits.",
module="ssl_checker",
remediation="Use cipher suites with at least 128-bit keys.",
))
except (ssl.SSLError, socket.timeout, ConnectionRefusedError, OSError):
pass
return findings
8. Directory Scanner Module¶
8.1 Path Discovery¶
"""
scanner/dir_scanner.py - Directory and path discovery.
This module performs dictionary-based path discovery to find
hidden endpoints, backup files, and sensitive paths.
For educational purposes only. Never brute force paths without permission.
"""
import requests
from typing import Optional
from pathlib import Path
from scanner.utils import Finding, Severity, RateLimiter
# Built-in small wordlist for common paths
DEFAULT_WORDLIST = [
# Configuration and environment files
".env", ".env.backup", ".env.production",
".git/config", ".git/HEAD",
".svn/entries",
".htaccess", ".htpasswd",
"web.config",
"robots.txt", "sitemap.xml",
# Admin panels
"admin", "admin/login", "administrator",
"dashboard", "manage", "panel",
"wp-admin", "wp-login.php",
"phpmyadmin", "adminer.php",
# API documentation
"api", "api/v1", "api/docs",
"swagger.json", "openapi.json",
"api-docs", "graphql",
# Backup and debug files
"backup", "backup.zip", "backup.sql",
"database.sql", "dump.sql",
"debug", "debug.log",
"error.log", "access.log",
# Common application paths
"server-status", "server-info",
"info.php", "phpinfo.php",
"test", "test.html", "test.php",
"status", "health", "healthcheck",
"metrics", "prometheus",
# User-related paths
"login", "register", "signup",
"reset-password", "forgot-password",
"profile", "account", "settings",
# Static/asset paths
"static", "assets", "uploads",
"media", "files", "documents",
"images", "img", "css", "js",
# Development artifacts
".DS_Store",
"composer.json", "package.json",
"Gemfile", "requirements.txt",
"Dockerfile", "docker-compose.yml",
]
# Paths that indicate security issues when accessible
SENSITIVE_PATHS = {
".env": "Environment file may contain secrets (API keys, passwords)",
".git/config": "Git configuration exposed - entire source code may be downloadable",
".git/HEAD": "Git repository exposed",
".svn/entries": "SVN repository exposed",
".htpasswd": "Apache password file exposed",
"phpinfo.php": "PHP info page exposes server configuration details",
"info.php": "PHP info page exposes server configuration details",
"backup.sql": "Database backup file publicly accessible",
"database.sql": "Database dump file publicly accessible",
"dump.sql": "Database dump file publicly accessible",
"backup.zip": "Backup archive publicly accessible",
"debug.log": "Debug log may contain sensitive information",
"server-status": "Apache server-status page exposed",
"server-info": "Apache server-info page exposed",
".DS_Store": "macOS directory metadata may reveal file structure",
"web.config": "IIS configuration file exposed",
"composer.json": "PHP dependency file reveals technology stack",
"package.json": "Node.js dependency file reveals technology stack",
"requirements.txt": "Python dependency file reveals technology stack",
"Dockerfile": "Docker configuration may reveal architecture",
"swagger.json": "API specification publicly accessible",
"openapi.json": "API specification publicly accessible",
"graphql": "GraphQL endpoint may allow introspection",
}
class DirectoryScanner:
"""
Dictionary-based directory and path discovery.
Uses a wordlist to find accessible endpoints.
"""
def __init__(
self,
timeout: float = 5.0,
rate_limiter: Optional[RateLimiter] = None,
wordlist_file: Optional[str] = None,
):
self.timeout = timeout
self.rate_limiter = rate_limiter or RateLimiter(10.0)
# Load wordlist
if wordlist_file and Path(wordlist_file).exists():
self.wordlist = Path(wordlist_file).read_text().splitlines()
self.wordlist = [w.strip() for w in self.wordlist if w.strip() and not w.startswith('#')]
else:
self.wordlist = DEFAULT_WORDLIST
def scan(self, base_url: str) -> list[Finding]:
"""
Scan for accessible directories and files.
Args:
base_url: Target base URL (e.g., https://example.com).
Returns:
List of findings.
"""
findings = []
discovered = []
# Clean base URL
base_url = base_url.rstrip('/')
print(f"[*] Scanning {len(self.wordlist)} paths on {base_url}...")
for i, path in enumerate(self.wordlist):
if i > 0 and i % 50 == 0:
print(f" Progress: {i}/{len(self.wordlist)} paths checked, "
f"{len(discovered)} found")
self.rate_limiter.acquire()
url = f"{base_url}/{path}"
try:
response = requests.get(
url,
timeout=self.timeout,
allow_redirects=False, # Don't follow redirects
verify=True,
headers={
"User-Agent": "SecurityScanner/1.0 (Authorized Scan)"
},
)
status = response.status_code
# 200: directly accessible
# 301/302: redirects (may still be interesting)
# 403: forbidden (exists but restricted)
if status == 200:
size = len(response.content)
discovered.append({
"path": path,
"status": status,
"size": size,
})
print(f" [+] FOUND: /{path} (200 OK, {size} bytes)")
# Check if this is a sensitive path
if path in SENSITIVE_PATHS:
findings.append(Finding(
title=f"Sensitive Path Accessible: /{path}",
severity=Severity.HIGH,
description=SENSITIVE_PATHS[path],
module="dir_scanner",
details={
"path": path,
"url": url,
"status_code": status,
"size": size,
},
remediation=(
f"Restrict access to /{path}. "
f"Remove from web root or deny in server configuration."
),
))
else:
findings.append(Finding(
title=f"Discovered Path: /{path}",
severity=Severity.INFO,
description=f"Path /{path} is accessible (HTTP 200).",
module="dir_scanner",
details={
"path": path,
"url": url,
"status_code": status,
"size": size,
},
))
elif status == 403:
# Exists but forbidden - note for manual testing
discovered.append({
"path": path,
"status": status,
"size": 0,
})
if path in SENSITIVE_PATHS:
findings.append(Finding(
title=f"Restricted Path Detected: /{path}",
severity=Severity.LOW,
description=(
f"Path /{path} exists but returns 403 Forbidden. "
f"Verify access controls are correct."
),
module="dir_scanner",
details={"path": path, "status_code": status},
))
except requests.exceptions.RequestException:
pass # Connection error, skip this path
print(f"[*] Directory scan complete. Found {len(discovered)} paths, "
f"{len(findings)} findings.")
return findings
9. CVE Database Lookup¶
9.1 CVE Lookup Module¶
"""
scanner/cve_lookup.py - CVE (Common Vulnerabilities and Exposures) lookup.
Queries public CVE databases to find known vulnerabilities
for discovered services and versions.
"""
import re
import json
import requests
from typing import Optional
from scanner.utils import Finding, Severity, PortInfo, RateLimiter
# Simplified local CVE database for common services
# In production, query NVD API, VulnDB, or similar
LOCAL_CVE_DB = {
"apache": {
"2.4.49": [
{
"id": "CVE-2021-41773",
"severity": "CRITICAL",
"description": "Path traversal and RCE in Apache HTTP Server 2.4.49",
"fix": "Upgrade to Apache 2.4.51+",
},
],
"2.4.50": [
{
"id": "CVE-2021-42013",
"severity": "CRITICAL",
"description": "Path traversal in Apache HTTP Server 2.4.50 (bypass of CVE-2021-41773)",
"fix": "Upgrade to Apache 2.4.51+",
},
],
},
"openssh": {
"8.5": [
{
"id": "CVE-2021-41617",
"severity": "HIGH",
"description": "Privilege escalation in OpenSSH 6.2-8.7",
"fix": "Upgrade to OpenSSH 8.8+",
},
],
},
"nginx": {
"1.20.0": [
{
"id": "CVE-2021-23017",
"severity": "HIGH",
"description": "DNS resolver vulnerabilities in nginx 0.6.18-1.20.0",
"fix": "Upgrade to nginx 1.20.1+",
},
],
},
"mysql": {
"5.7": [
{
"id": "CVE-2023-21912",
"severity": "HIGH",
"description": "MySQL Server 5.7 multiple vulnerabilities",
"fix": "Upgrade to MySQL 8.0+",
},
],
},
"redis": {
"6.0": [
{
"id": "CVE-2022-24735",
"severity": "HIGH",
"description": "Redis Lua script sandbox escape",
"fix": "Upgrade to Redis 6.2.7+ or 7.0+",
},
],
},
}
class CVELookup:
"""
Look up known CVEs for discovered services.
Uses both a local database and optional online API queries.
"""
NVD_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
def __init__(
self,
use_online: bool = False,
rate_limiter: Optional[RateLimiter] = None,
):
"""
Args:
use_online: Whether to query online NVD API.
rate_limiter: Rate limiter for API calls.
"""
self.use_online = use_online
self.rate_limiter = rate_limiter or RateLimiter(1.0) # NVD rate limits
def lookup(self, open_ports: list[PortInfo]) -> list[Finding]:
"""
Look up CVEs for all discovered services.
Args:
open_ports: List of open ports with service information.
Returns:
List of CVE findings.
"""
findings = []
print(f"[*] Looking up known CVEs for {len(open_ports)} services...")
for port_info in open_ports:
if not port_info.version:
continue
# Parse service name and version
service_name, version = self._parse_service_version(
port_info.service, port_info.version, port_info.banner
)
if not service_name or not version:
continue
# Check local database
local_cves = self._lookup_local(service_name, version)
for cve in local_cves:
severity_map = {
"CRITICAL": Severity.CRITICAL,
"HIGH": Severity.HIGH,
"MEDIUM": Severity.MEDIUM,
"LOW": Severity.LOW,
}
findings.append(Finding(
title=f"Known CVE: {cve['id']}",
severity=severity_map.get(cve['severity'], Severity.MEDIUM),
description=cve['description'],
module="cve_lookup",
details={
"cve_id": cve['id'],
"service": service_name,
"version": version,
"port": port_info.port,
},
remediation=cve.get('fix', 'Upgrade to the latest version.'),
reference=f"https://nvd.nist.gov/vuln/detail/{cve['id']}",
))
print(f" [!] {cve['id']}: {service_name} {version} "
f"on port {port_info.port}")
# Query online NVD if enabled
if self.use_online:
online_cves = self._lookup_nvd(service_name, version)
for cve in online_cves:
if not any(f.details.get('cve_id') == cve['id']
for f in findings):
findings.append(Finding(
title=f"Known CVE (NVD): {cve['id']}",
severity=Severity.MEDIUM, # Default, refine from CVSS
description=cve['description'][:200],
module="cve_lookup",
details={
"cve_id": cve['id'],
"service": service_name,
"version": version,
"port": port_info.port,
},
reference=f"https://nvd.nist.gov/vuln/detail/{cve['id']}",
))
print(f"[*] CVE lookup complete. Found {len(findings)} known vulnerabilities.")
return findings
def _parse_service_version(
self, service: str, version: str, banner: str
) -> tuple[str, str]:
"""Extract standardized service name and version."""
version_lower = version.lower()
banner_lower = banner.lower()
# Apache
if "apache" in version_lower or "apache" in banner_lower:
match = re.search(r'apache[/ ](\d+\.\d+\.\d+)', version_lower + " " + banner_lower)
if match:
return "apache", match.group(1)
# nginx
if "nginx" in version_lower or "nginx" in banner_lower:
match = re.search(r'nginx[/ ](\d+\.\d+\.\d+)', version_lower + " " + banner_lower)
if match:
return "nginx", match.group(1)
# OpenSSH
if "openssh" in version_lower or "openssh" in banner_lower:
match = re.search(r'openssh[_/ ](\d+\.\d+)', version_lower + " " + banner_lower)
if match:
return "openssh", match.group(1)
# MySQL
if "mysql" in version_lower or "mysql" in banner_lower:
match = re.search(r'mysql[/ ]*(\d+\.\d+)', version_lower + " " + banner_lower)
if match:
return "mysql", match.group(1)
# Redis
if "redis" in version_lower or "redis" in banner_lower:
match = re.search(r'redis[/ ]*(\d+\.\d+)', version_lower + " " + banner_lower)
if match:
return "redis", match.group(1)
return "", ""
def _lookup_local(self, service: str, version: str) -> list[dict]:
"""Look up CVEs in local database."""
service_db = LOCAL_CVE_DB.get(service.lower(), {})
results = []
for db_version, cves in service_db.items():
# Check if the detected version matches
if version.startswith(db_version):
results.extend(cves)
return results
def _lookup_nvd(self, service: str, version: str) -> list[dict]:
"""Query NVD API for CVEs (rate-limited)."""
self.rate_limiter.acquire()
try:
response = requests.get(
self.NVD_API_URL,
params={
"keywordSearch": f"{service} {version}",
"resultsPerPage": 5,
},
timeout=15,
headers={"User-Agent": "SecurityScanner/1.0"},
)
if response.status_code == 200:
data = response.json()
results = []
for vuln in data.get("vulnerabilities", []):
cve = vuln.get("cve", {})
desc = ""
for d in cve.get("descriptions", []):
if d.get("lang") == "en":
desc = d.get("value", "")
break
results.append({
"id": cve.get("id", ""),
"description": desc,
})
return results
except (requests.exceptions.RequestException, json.JSONDecodeError):
pass
return []
10. Report Generation¶
10.1 Report Engine¶
"""
scanner/report.py - Generate scan reports in multiple formats.
"""
import json
from dataclasses import asdict
from datetime import datetime
from scanner.utils import ScanResult, Finding, Severity
class ReportGenerator:
"""Generate vulnerability scan reports."""
def generate_text(self, result: ScanResult) -> str:
"""Generate a text report."""
lines = []
lines.append("=" * 70)
lines.append(" VULNERABILITY SCAN REPORT")
lines.append("=" * 70)
lines.append(f" Target: {result.target.host} ({result.target.ip})")
lines.append(f" Scan Date: {result.start_time}")
lines.append(f" Duration: {result.duration_seconds:.1f} seconds")
# Summary
severity_counts = {}
for f in result.findings:
severity_counts[f.severity.value] = \
severity_counts.get(f.severity.value, 0) + 1
lines.append(f"\n Open Ports: {len(result.open_ports)}")
lines.append(f" Total Findings: {len(result.findings)}")
for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
count = severity_counts.get(sev, 0)
if count > 0:
lines.append(f" {sev:10s}: {count}")
# Open Ports
if result.open_ports:
lines.append(f"\n{'β' * 70}")
lines.append(" OPEN PORTS")
lines.append(f"{'β' * 70}")
lines.append(f" {'Port':>7s} {'State':8s} {'Service':15s} {'Version'}")
lines.append(f" {'β'*7} {'β'*8} {'β'*15} {'β'*30}")
for port in result.open_ports:
lines.append(
f" {port.port:>7d} {port.state:8s} "
f"{port.service:15s} {port.version[:30]}"
)
# Findings by severity
for severity in [Severity.CRITICAL, Severity.HIGH, Severity.MEDIUM,
Severity.LOW, Severity.INFO]:
findings = [f for f in result.findings if f.severity == severity]
if not findings:
continue
lines.append(f"\n{'β' * 70}")
lines.append(f" {severity.value} FINDINGS ({len(findings)})")
lines.append(f"{'β' * 70}")
for i, finding in enumerate(findings, 1):
lines.append(f"\n {i}. [{finding.module}] {finding.title}")
lines.append(f" {finding.description}")
if finding.remediation:
lines.append(f" Fix: {finding.remediation}")
if finding.reference:
lines.append(f" Ref: {finding.reference}")
if finding.details:
for key, value in finding.details.items():
lines.append(f" {key}: {value}")
# Errors
if result.errors:
lines.append(f"\n{'β' * 70}")
lines.append(" ERRORS")
lines.append(f"{'β' * 70}")
for error in result.errors:
lines.append(f" [!] {error}")
# Footer
lines.append(f"\n{'=' * 70}")
if severity_counts.get("CRITICAL", 0) > 0:
lines.append(" OVERALL: CRITICAL vulnerabilities found!")
lines.append(" Immediate remediation required.")
elif severity_counts.get("HIGH", 0) > 0:
lines.append(" OVERALL: HIGH severity vulnerabilities found.")
lines.append(" Remediation recommended within 7 days.")
elif severity_counts.get("MEDIUM", 0) > 0:
lines.append(" OVERALL: MEDIUM severity issues found.")
lines.append(" Remediation recommended within 30 days.")
else:
lines.append(" OVERALL: No critical vulnerabilities found.")
lines.append("=" * 70)
return "\n".join(lines)
def generate_json(self, result: ScanResult) -> str:
"""Generate a JSON report."""
report = {
"scan_metadata": {
"target": result.target.host,
"ip": result.target.ip,
"start_time": result.start_time,
"end_time": result.end_time,
"duration_seconds": result.duration_seconds,
},
"summary": {
"open_ports": len(result.open_ports),
"total_findings": len(result.findings),
"by_severity": {},
},
"open_ports": [
{
"port": p.port,
"state": p.state,
"service": p.service,
"version": p.version,
"banner": p.banner[:200],
}
for p in result.open_ports
],
"findings": [
{
"title": f.title,
"severity": f.severity.value,
"description": f.description,
"module": f.module,
"details": f.details,
"remediation": f.remediation,
"reference": f.reference,
}
for f in result.findings
],
"errors": result.errors,
}
# Add severity counts
for f in result.findings:
sev = f.severity.value
report["summary"]["by_severity"][sev] = \
report["summary"]["by_severity"].get(sev, 0) + 1
return json.dumps(report, indent=2)
def generate_html(self, result: ScanResult) -> str:
"""Generate an HTML report."""
severity_colors = {
"CRITICAL": "#dc3545",
"HIGH": "#fd7e14",
"MEDIUM": "#ffc107",
"LOW": "#17a2b8",
"INFO": "#6c757d",
}
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Vulnerability Scan Report - {result.target.host}</title>
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, sans-serif;
max-width: 900px; margin: 0 auto; padding: 20px;
background: #f8f9fa; }}
h1 {{ color: #212529; border-bottom: 2px solid #dee2e6; padding-bottom: 10px; }}
h2 {{ color: #495057; margin-top: 30px; }}
.summary {{ background: #fff; padding: 20px; border-radius: 8px;
box-shadow: 0 1px 3px rgba(0,0,0,0.1); }}
.finding {{ background: #fff; padding: 15px; margin: 10px 0;
border-radius: 8px; border-left: 4px solid #dee2e6;
box-shadow: 0 1px 3px rgba(0,0,0,0.1); }}
.severity {{ display: inline-block; padding: 2px 8px; border-radius: 4px;
color: white; font-size: 0.85em; font-weight: bold; }}
table {{ width: 100%; border-collapse: collapse; background: #fff;
border-radius: 8px; overflow: hidden; }}
th, td {{ padding: 10px 15px; text-align: left; border-bottom: 1px solid #dee2e6; }}
th {{ background: #495057; color: white; }}
.fix {{ color: #28a745; font-style: italic; }}
</style>
</head>
<body>
<h1>Vulnerability Scan Report</h1>
<div class="summary">
<p><strong>Target:</strong> {result.target.host} ({result.target.ip})</p>
<p><strong>Date:</strong> {result.start_time}</p>
<p><strong>Duration:</strong> {result.duration_seconds:.1f} seconds</p>
<p><strong>Open Ports:</strong> {len(result.open_ports)} |
<strong>Findings:</strong> {len(result.findings)}</p>
</div>
<h2>Open Ports</h2>
<table>
<tr><th>Port</th><th>State</th><th>Service</th><th>Version</th></tr>
"""
for port in result.open_ports:
html += f" <tr><td>{port.port}</td><td>{port.state}</td>"
html += f"<td>{port.service}</td><td>{port.version[:50]}</td></tr>\n"
html += " </table>\n\n <h2>Findings</h2>\n"
for severity_name in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
findings = [f for f in result.findings
if f.severity.value == severity_name]
if not findings:
continue
color = severity_colors.get(severity_name, "#6c757d")
for finding in findings:
html += f""" <div class="finding" style="border-left-color: {color}">
<span class="severity" style="background: {color}">{severity_name}</span>
<strong>{finding.title}</strong>
<p>{finding.description}</p>
"""
if finding.remediation:
html += f' <p class="fix">Fix: {finding.remediation}</p>\n'
if finding.reference:
html += f' <p>Reference: <a href="{finding.reference}">{finding.reference}</a></p>\n'
html += " </div>\n"
html += f"""
<hr>
<p style="color: #6c757d; font-size: 0.85em;">
Generated by Vulnerability Scanner on {datetime.now().isoformat()}
</p>
</body>
</html>"""
return html
11. Scan Controller (Orchestrator)¶
11.1 Main Controller¶
"""
scanner/controller.py - Scan orchestration controller.
"""
import time
from typing import Optional
from scanner.utils import ScanTarget, ScanResult, RateLimiter, COMMON_PORTS
from scanner.port_scanner import PortScanner
from scanner.banner_grabber import BannerGrabber
from scanner.http_checker import HTTPChecker
from scanner.ssl_checker import SSLChecker
from scanner.dir_scanner import DirectoryScanner
from scanner.cve_lookup import CVELookup
from scanner.report import ReportGenerator
class ScanController:
"""
Orchestrates all scan modules and generates reports.
"""
def __init__(
self,
rate_limit: float = 50.0,
timeout: float = 3.0,
max_workers: int = 50,
):
"""
Args:
rate_limit: Maximum requests per second.
timeout: Default timeout for connections.
max_workers: Maximum concurrent threads for port scanning.
"""
self.rate_limiter = RateLimiter(rate_limit)
self.timeout = timeout
self.max_workers = max_workers
self.report_generator = ReportGenerator()
def scan(
self,
host: str,
ports: Optional[list[int]] = None,
scan_http: bool = True,
scan_ssl: bool = True,
scan_dirs: bool = False,
lookup_cves: bool = True,
wordlist: Optional[str] = None,
) -> ScanResult:
"""
Perform a comprehensive vulnerability scan.
Args:
host: Target hostname or IP.
ports: Specific ports to scan (default: common ports).
scan_http: Whether to check HTTP security headers.
scan_ssl: Whether to check SSL/TLS configuration.
scan_dirs: Whether to perform directory scanning.
lookup_cves: Whether to look up CVEs for found services.
wordlist: Path to custom wordlist for directory scanning.
Returns:
Complete scan result.
"""
# Initialize target
target = ScanTarget(host=host)
if not target.resolve():
print(f"[!] Cannot resolve hostname: {host}")
result = ScanResult(target=target)
result.errors.append(f"DNS resolution failed for {host}")
return result
print(f"\n{'='*60}")
print(f" VULNERABILITY SCAN")
print(f" Target: {host} ({target.ip})")
print(f" Rate Limit: {self.rate_limiter.rate} req/s")
print(f"{'='*60}\n")
result = ScanResult(target=target)
start_time = time.time()
# Phase 1: Port Scanning
print("\n[Phase 1/5] Port Scanning")
print("-" * 40)
port_scanner = PortScanner(
timeout=self.timeout,
rate_limiter=self.rate_limiter,
max_workers=self.max_workers,
)
result.open_ports = port_scanner.scan_ports(host, ports or COMMON_PORTS)
result.findings.extend(
port_scanner.generate_findings(result.open_ports)
)
if not result.open_ports:
print("[!] No open ports found. Scan complete.")
result.end_time = time.strftime("%Y-%m-%dT%H:%M:%S")
result.duration_seconds = time.time() - start_time
return result
# Phase 2: Banner Grabbing
print("\n[Phase 2/5] Banner Grabbing")
print("-" * 40)
banner_grabber = BannerGrabber(
timeout=self.timeout,
rate_limiter=self.rate_limiter,
)
banner_grabber.grab_all_banners(host, result.open_ports)
result.findings.extend(
banner_grabber.generate_findings(result.open_ports)
)
# Phase 3: HTTP Security Headers
if scan_http:
print("\n[Phase 3/5] HTTP Security Headers")
print("-" * 40)
http_checker = HTTPChecker(
timeout=self.timeout,
rate_limiter=self.rate_limiter,
)
# Check HTTP and HTTPS
http_ports = [p for p in result.open_ports
if p.port in (80, 8080, 8000)]
https_ports = [p for p in result.open_ports
if p.port in (443, 8443)]
for port_info in http_ports:
url = f"http://{host}:{port_info.port}" if port_info.port != 80 \
else f"http://{host}"
result.findings.extend(http_checker.check(url))
for port_info in https_ports:
url = f"https://{host}:{port_info.port}" if port_info.port != 443 \
else f"https://{host}"
result.findings.extend(http_checker.check(url))
else:
print("\n[Phase 3/5] HTTP Security Headers (skipped)")
# Phase 4: SSL/TLS Check
if scan_ssl:
print("\n[Phase 4/5] SSL/TLS Configuration")
print("-" * 40)
ssl_checker = SSLChecker(
timeout=self.timeout,
rate_limiter=self.rate_limiter,
)
ssl_ports = [p for p in result.open_ports
if p.port in (443, 8443, 465, 993, 995)]
for port_info in ssl_ports:
result.findings.extend(
ssl_checker.check(host, port_info.port)
)
else:
print("\n[Phase 4/5] SSL/TLS Configuration (skipped)")
# Phase 5: Directory Scanning (optional)
if scan_dirs:
print("\n[Phase 5/5] Directory Discovery")
print("-" * 40)
dir_scanner = DirectoryScanner(
timeout=self.timeout,
rate_limiter=self.rate_limiter,
wordlist_file=wordlist,
)
web_ports = [p for p in result.open_ports
if p.port in (80, 443, 8080, 8443)]
for port_info in web_ports:
scheme = "https" if port_info.port in (443, 8443) else "http"
port_suffix = f":{port_info.port}" \
if port_info.port not in (80, 443) else ""
url = f"{scheme}://{host}{port_suffix}"
result.findings.extend(dir_scanner.scan(url))
else:
print("\n[Phase 5/5] Directory Discovery (skipped)")
# CVE Lookup
if lookup_cves:
print("\n[Bonus] CVE Database Lookup")
print("-" * 40)
cve_lookup = CVELookup(use_online=False)
result.findings.extend(cve_lookup.lookup(result.open_ports))
# Finalize
result.end_time = time.strftime("%Y-%m-%dT%H:%M:%S")
result.duration_seconds = time.time() - start_time
# Sort findings by severity
severity_order = {
"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4
}
result.findings.sort(
key=lambda f: severity_order.get(f.severity.value, 5)
)
return result
12. CLI Entry Point¶
12.1 Main Script¶
"""
main.py - CLI entry point for the vulnerability scanner.
Usage:
python main.py example.com
python main.py example.com --ports 80,443,8080
python main.py example.com --full --output report.json
python main.py example.com --dirs --wordlist wordlists/common_paths.txt
"""
import argparse
import sys
from scanner.controller import ScanController
from scanner.report import ReportGenerator
from scanner.utils import is_valid_target, COMMON_PORTS
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Vulnerability Scanner - Authorized security scanning tool",
epilog=(
"IMPORTANT: Only scan targets you own or have explicit written "
"permission to test. Unauthorized scanning is illegal."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
# Target
parser.add_argument(
"target",
help="Target hostname or IP address to scan",
)
# Port options
port_group = parser.add_mutually_exclusive_group()
port_group.add_argument(
"-p", "--ports",
help="Comma-separated list of ports to scan (e.g., 80,443,8080)",
)
port_group.add_argument(
"--top-ports",
type=int,
default=None,
help="Scan top N most common ports (default: all common ports)",
)
port_group.add_argument(
"--all-ports",
action="store_true",
help="Scan all ports (1-65535). VERY SLOW.",
)
# Scan options
parser.add_argument(
"--full",
action="store_true",
help="Full scan: ports + HTTP + SSL + dirs + CVEs",
)
parser.add_argument(
"--no-http",
action="store_true",
help="Skip HTTP security header check",
)
parser.add_argument(
"--no-ssl",
action="store_true",
help="Skip SSL/TLS configuration check",
)
parser.add_argument(
"--dirs",
action="store_true",
help="Enable directory/path discovery",
)
parser.add_argument(
"--no-cve",
action="store_true",
help="Skip CVE database lookup",
)
parser.add_argument(
"--wordlist",
default=None,
help="Custom wordlist file for directory scanning",
)
# Performance options
parser.add_argument(
"--rate",
type=float,
default=50.0,
help="Maximum requests per second (default: 50)",
)
parser.add_argument(
"--timeout",
type=float,
default=3.0,
help="Connection timeout in seconds (default: 3)",
)
parser.add_argument(
"--threads",
type=int,
default=50,
help="Maximum concurrent threads for port scanning (default: 50)",
)
# Output options
parser.add_argument(
"-o", "--output",
help="Output file path (format detected from extension: .json, .html, .txt)",
)
parser.add_argument(
"-f", "--format",
choices=["text", "json", "html"],
default="text",
help="Output format (default: text)",
)
parser.add_argument(
"-q", "--quiet",
action="store_true",
help="Suppress progress output (only show results)",
)
# Confirmation
parser.add_argument(
"--confirm",
action="store_true",
help="Skip authorization confirmation prompt",
)
return parser.parse_args()
def parse_ports(ports_str: str) -> list[int]:
"""Parse a comma-separated port list, supporting ranges."""
ports = set()
for part in ports_str.split(","):
part = part.strip()
if "-" in part:
start, end = part.split("-", 1)
ports.update(range(int(start), int(end) + 1))
else:
ports.add(int(part))
return sorted(ports)
def main():
args = parse_args()
# Authorization confirmation
if not args.confirm:
print("=" * 60)
print(" VULNERABILITY SCANNER")
print("=" * 60)
print(f"\n Target: {args.target}")
print("\n WARNING: Unauthorized scanning is ILLEGAL.")
print(" Only scan targets you own or have written permission")
print(" to test.\n")
try:
response = input(" Do you have authorization to scan this target? [y/N]: ")
if response.lower() not in ('y', 'yes'):
print("\n Scan cancelled. Get proper authorization first.")
sys.exit(0)
except KeyboardInterrupt:
print("\n\n Scan cancelled.")
sys.exit(0)
# Validate target
if not is_valid_target(args.target):
print(f"[!] Invalid or potentially dangerous target: {args.target}")
sys.exit(1)
# Parse ports
if args.ports:
ports = parse_ports(args.ports)
elif args.all_ports:
ports = list(range(1, 65536))
elif args.top_ports:
ports = COMMON_PORTS[:args.top_ports]
else:
ports = COMMON_PORTS
# Determine scan options
scan_http = not args.no_http
scan_ssl = not args.no_ssl
scan_dirs = args.dirs or args.full
lookup_cves = not args.no_cve
if args.full:
scan_http = True
scan_ssl = True
scan_dirs = True
lookup_cves = True
# Run scan
controller = ScanController(
rate_limit=args.rate,
timeout=args.timeout,
max_workers=args.threads,
)
try:
result = controller.scan(
host=args.target,
ports=ports,
scan_http=scan_http,
scan_ssl=scan_ssl,
scan_dirs=scan_dirs,
lookup_cves=lookup_cves,
wordlist=args.wordlist,
)
except KeyboardInterrupt:
print("\n\n[!] Scan interrupted by user.")
sys.exit(130)
# Generate report
report_gen = ReportGenerator()
# Determine output format
output_format = args.format
if args.output:
if args.output.endswith('.json'):
output_format = 'json'
elif args.output.endswith('.html'):
output_format = 'html'
elif args.output.endswith('.txt'):
output_format = 'text'
if output_format == 'json':
report = report_gen.generate_json(result)
elif output_format == 'html':
report = report_gen.generate_html(result)
else:
report = report_gen.generate_text(result)
# Output
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"\n[*] Report saved to: {args.output}")
else:
print("\n" + report)
# Exit code based on findings
critical = sum(1 for f in result.findings if f.severity.value == "CRITICAL")
high = sum(1 for f in result.findings if f.severity.value == "HIGH")
if critical > 0:
sys.exit(2)
elif high > 0:
sys.exit(1)
else:
sys.exit(0)
if __name__ == "__main__":
main()
13. Usage Examples¶
13.1 Basic Scan¶
# Scan common ports on a target
python main.py example.com
# Scan specific ports
python main.py example.com -p 80,443,8080,3306
# Scan top 20 ports only
python main.py example.com --top-ports 20
13.2 Full Scan¶
# Full scan with all modules
python main.py example.com --full
# Full scan with custom rate limit and output
python main.py example.com --full --rate 20 -o report.html
# Full scan with custom wordlist
python main.py example.com --full --wordlist wordlists/large.txt
13.3 Targeted Scans¶
# HTTP headers only (web server ports)
python main.py example.com -p 80,443 --no-ssl --no-cve
# SSL/TLS only
python main.py example.com -p 443 --no-http --no-cve
# Directory scan only
python main.py example.com -p 80,443 --dirs --no-ssl --no-cve
13.4 Output Formats¶
# JSON output (for automation)
python main.py example.com -o scan_results.json
# HTML report (for sharing)
python main.py example.com --full -o report.html
# Text report to file
python main.py example.com -o report.txt
# Quiet mode with JSON to stdout
python main.py example.com -q -f json --confirm
14. Exercises¶
Exercise 1: Run the Scanner¶
Set up and run the scanner against a locally hosted test application:
1. Install DVWA or WebGoat in a Docker container
2. Run the scanner with --full against localhost
3. Analyze the report and categorize all findings
4. Determine which findings are true vs. false positives
Exercise 2: Add UDP Scanning¶
Extend the port scanner to support basic UDP scanning:
1. Implement UDP port scanning using SOCK_DGRAM
2. Handle the differences (no handshake, less reliable)
3. Add support for common UDP services (DNS/53, SNMP/161, NTP/123)
4. Update the CLI to accept --udp flag
Exercise 3: Implement a Web Crawler¶
Add a web crawler module that: 1. Starts from the base URL and discovers linked pages 2. Extracts all forms and their parameters 3. Identifies input fields that could be attack surfaces 4. Respects robots.txt 5. Limits crawl depth and follows the rate limiter
Exercise 4: Add Authentication Support¶
Extend the HTTP checker to support authenticated scanning: 1. Accept login credentials via CLI 2. Perform authentication (form-based or cookie-based) 3. Scan authenticated pages that require login 4. Support Bearer token authentication for APIs
Exercise 5: Custom CVE Database¶
Build a more comprehensive CVE lookup system: 1. Download CVE data from NVD (NIST) 2. Store in a local SQLite database 3. Implement version range matching (not just exact match) 4. Add CVSS score parsing and severity mapping 5. Support automatic database updates
Exercise 6: Parallel Module Execution¶
Optimize the scanner for speed: 1. Run independent modules in parallel (e.g., HTTP check and SSL check) 2. Implement a progress bar showing overall completion 3. Add support for scanning multiple targets from a file 4. Benchmark and compare sequential vs. parallel execution times
Summary¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Vulnerability Scanner Key Takeaways β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Ethics first: ALWAYS get authorization before scanning β
β 2. Rate limiting: Responsible scanning respects target β
β resources and network capacity β
β 3. Modular design: Each scan type is an independent module β
β that can be run separately or combined β
β 4. Defense in depth: Port scanning + banner grabbing + HTTP β
β headers + SSL + directory scanning + CVE lookup gives β
β comprehensive coverage β
β 5. False positives: Scanners produce noise; human analysis β
β is needed to verify findings β
β 6. Version detection: Accurate service/version identification β
β is critical for CVE matching β
β 7. Reporting: Clear, actionable reports with severity ratings β
β and remediation advice are as important as detection β
β 8. Automation: CLI interface with structured output enables β
β integration into CI/CD and regular security assessments β
β 9. Continuous: Security scanning should be regular, not β
β one-time -- integrate into your development workflow β
β 10. Limitations: Automated scanners find common issues but β
β cannot replace manual penetration testing for logic flaws β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Previous: 15. Project: Building a Secure REST API