Project: Building a Vulnerability Scanner

Project: Building a Vulnerability Scanner

Previous: 15. Project: Building a Secure REST API


This project lesson walks through building a comprehensive vulnerability scanner in Python. The scanner performs port scanning, service banner grabbing, HTTP security header checking, SSL/TLS configuration analysis, directory brute forcing, and CVE database lookups. We will build a modular, extensible tool with a CLI interface, structured output, and built-in rate limiting to ensure responsible scanning. This project ties together many concepts from the entire Security topic.

IMPORTANT: Only scan systems you own or have explicit written permission to test. Unauthorized scanning is illegal in most jurisdictions. This tool is for educational purposes and authorized security assessments only.

Learning Objectives

  • Build a modular vulnerability scanner with Python
  • Implement TCP port scanning with service detection
  • Analyze HTTP security headers and SSL/TLS configurations
  • Perform directory and path discovery (educational)
  • Look up known vulnerabilities in CVE databases
  • Generate structured scan reports
  • Apply rate limiting and responsible scanning practices
  • Understand the ethical and legal boundaries of security scanning
  • Design a CLI tool with argparse

1. Project Overview

1.1 Scanner Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 Vulnerability Scanner Architecture                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                  β”‚
β”‚  CLI Interface (argparse)                                        β”‚
β”‚       β”‚                                                          β”‚
β”‚       β–Ό                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                           β”‚
β”‚  β”‚  Scan Controller  β”‚  Orchestrates all scan modules           β”‚
β”‚  β”‚  (main.py)        β”‚  Manages rate limiting                   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  Generates reports                       β”‚
β”‚           β”‚                                                      β”‚
β”‚     β”Œβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                       β”‚
β”‚     β”‚     β”‚         β”‚         β”‚          β”‚                       β”‚
β”‚     β–Ό     β–Ό         β–Ό         β–Ό          β–Ό                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚
β”‚  β”‚Port β”‚β”‚Bannerβ”‚β”‚  HTTP  β”‚β”‚ SSL  β”‚β”‚Directory β”‚                β”‚
β”‚  β”‚Scan β”‚β”‚Grab  β”‚β”‚Headers β”‚β”‚/TLS  β”‚β”‚Discovery β”‚                β”‚
β”‚  β”‚     β”‚β”‚      β”‚β”‚Check   β”‚β”‚Check β”‚β”‚          β”‚                β”‚
β”‚  β””β”€β”€β”¬β”€β”€β”˜β””β”€β”€β”¬β”€β”€β”€β”˜β””β”€β”€β”€β”¬β”€β”€β”€β”€β”˜β””β”€β”€β”¬β”€β”€β”€β”˜β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜                β”‚
β”‚     β”‚      β”‚        β”‚        β”‚         β”‚                        β”‚
β”‚     β””β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                        β”‚
β”‚                      β”‚                                           β”‚
β”‚                      β–Ό                                           β”‚
β”‚             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                   β”‚
β”‚             β”‚  CVE Lookup    β”‚  Match services to known CVEs    β”‚
β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                                   β”‚
β”‚                      β”‚                                           β”‚
β”‚                      β–Ό                                           β”‚
β”‚             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                   β”‚
β”‚             β”‚ Report Engine  β”‚  JSON, text, HTML output         β”‚
β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                   β”‚
β”‚                                                                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.2 Project Structure

vuln_scanner/
β”œβ”€β”€ scanner/
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ controller.py       # Scan orchestration
β”‚   β”œβ”€β”€ port_scanner.py     # TCP port scanning
β”‚   β”œβ”€β”€ banner_grabber.py   # Service banner detection
β”‚   β”œβ”€β”€ http_checker.py     # HTTP security header analysis
β”‚   β”œβ”€β”€ ssl_checker.py      # SSL/TLS configuration analysis
β”‚   β”œβ”€β”€ dir_scanner.py      # Directory/path discovery
β”‚   β”œβ”€β”€ cve_lookup.py       # CVE database queries
β”‚   β”œβ”€β”€ report.py           # Report generation
β”‚   └── utils.py            # Shared utilities, rate limiter
β”œβ”€β”€ wordlists/
β”‚   └── common_paths.txt    # Directory wordlist (small, for demo)
β”œβ”€β”€ main.py                 # CLI entry point
β”œβ”€β”€ requirements.txt
└── README.md

1.3 Dependencies

# requirements.txt
requests==2.31.0
cryptography==42.0.4

2.1 Rules of Engagement

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚          ETHICAL AND LEGAL BOUNDARIES                              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                   β”‚
β”‚  YOU MUST:                                                        β”‚
β”‚  βœ“ Only scan systems you OWN or have WRITTEN permission to test β”‚
β”‚  βœ“ Get explicit, documented authorization before scanning        β”‚
β”‚  βœ“ Define scope clearly (which IPs, ports, tests)               β”‚
β”‚  βœ“ Respect rate limits and bandwidth constraints                 β”‚
β”‚  βœ“ Report findings responsibly to the system owner              β”‚
β”‚  βœ“ Stop immediately if you discover you are scanning the        β”‚
β”‚    wrong target                                                   β”‚
β”‚  βœ“ Keep all findings confidential                                β”‚
β”‚                                                                   β”‚
β”‚  YOU MUST NOT:                                                    β”‚
β”‚  βœ— Scan systems without authorization                            β”‚
β”‚  βœ— Exploit discovered vulnerabilities (scan, don't attack)      β”‚
β”‚  βœ— Perform denial of service (even accidentally via scan volume)β”‚
β”‚  βœ— Access, modify, or exfiltrate data                           β”‚
β”‚  βœ— Share findings publicly without owner's consent              β”‚
β”‚  βœ— Use this tool for competitive intelligence or harassment     β”‚
β”‚                                                                   β”‚
β”‚  LEGAL NOTES:                                                     β”‚
β”‚  - Computer Fraud and Abuse Act (CFAA) - USA                    β”‚
β”‚  - Computer Misuse Act 1990 - UK                                 β”‚
β”‚  - StGB Β§202a-c - Germany                                        β”‚
β”‚  - Similar laws exist in virtually every country                 β”‚
β”‚  - Unauthorized scanning can result in criminal charges          β”‚
β”‚  - "I was just testing" is NOT a legal defense                   β”‚
β”‚                                                                   β”‚
β”‚  SAFE TARGETS FOR PRACTICE:                                       β”‚
β”‚  - Your own machines / VMs                                        β”‚
β”‚  - Intentionally vulnerable apps:                                β”‚
β”‚    * DVWA (Damn Vulnerable Web Application)                      β”‚
β”‚    * HackTheBox, TryHackMe (with permission)                    β”‚
β”‚    * OWASP WebGoat                                               β”‚
β”‚    * Metasploitable                                              β”‚
β”‚  - Bug bounty programs (within scope)                            β”‚
β”‚                                                                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

3. Shared Utilities and Rate Limiter

3.1 Core Utilities

"""
scanner/utils.py - Shared utilities, rate limiter, and data models.
"""

import time
import socket
import threading
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional


class Severity(str, Enum):
    """Vulnerability severity levels."""
    CRITICAL = "CRITICAL"
    HIGH = "HIGH"
    MEDIUM = "MEDIUM"
    LOW = "LOW"
    INFO = "INFO"


@dataclass
class Finding:
    """A single vulnerability or information finding."""
    title: str
    severity: Severity
    description: str
    module: str            # Which scanner module found this
    details: dict = field(default_factory=dict)
    remediation: str = ""
    reference: str = ""    # URL or CVE reference

    def __str__(self):
        return f"[{self.severity.value}] {self.title}: {self.description}"


@dataclass
class PortInfo:
    """Information about a discovered port."""
    port: int
    state: str             # open, closed, filtered
    service: str = ""      # Detected service name
    version: str = ""      # Service version
    banner: str = ""       # Raw banner text
    protocol: str = "tcp"


@dataclass
class ScanTarget:
    """Scan target specification."""
    host: str
    ip: Optional[str] = None
    ports: list[int] = field(default_factory=list)

    def resolve(self) -> bool:
        """Resolve hostname to IP address."""
        try:
            self.ip = socket.gethostbyname(self.host)
            return True
        except socket.gaierror:
            return False


@dataclass
class ScanResult:
    """Complete scan result for a target."""
    target: ScanTarget
    start_time: str = ""
    end_time: str = ""
    duration_seconds: float = 0.0
    open_ports: list[PortInfo] = field(default_factory=list)
    findings: list[Finding] = field(default_factory=list)
    errors: list[str] = field(default_factory=list)

    def __post_init__(self):
        if not self.start_time:
            self.start_time = datetime.now().isoformat()


class RateLimiter:
    """
    Thread-safe rate limiter using token bucket algorithm.
    Ensures we don't overwhelm the target with requests.
    """

    def __init__(self, requests_per_second: float = 10.0):
        """
        Args:
            requests_per_second: Maximum requests per second.
        """
        self.rate = requests_per_second
        self.tokens = requests_per_second
        self.max_tokens = requests_per_second
        self.last_time = time.monotonic()
        self.lock = threading.Lock()

    def acquire(self) -> None:
        """Block until a token is available."""
        while True:
            with self.lock:
                now = time.monotonic()
                elapsed = now - self.last_time
                self.last_time = now

                # Add tokens based on elapsed time
                self.tokens = min(
                    self.max_tokens,
                    self.tokens + elapsed * self.rate
                )

                if self.tokens >= 1.0:
                    self.tokens -= 1.0
                    return

            # No tokens available, wait a bit
            time.sleep(1.0 / self.rate)

    def set_rate(self, requests_per_second: float) -> None:
        """Update the rate limit."""
        with self.lock:
            self.rate = requests_per_second
            self.max_tokens = requests_per_second


def is_valid_target(host: str) -> bool:
    """
    Validate that a target is potentially valid and not a
    reserved/dangerous address.
    """
    # Block scanning of localhost (unless explicitly intended)
    dangerous_hosts = {
        'localhost', '127.0.0.1', '::1',
        '0.0.0.0',
    }
    if host.lower() in dangerous_hosts:
        return False

    # Block private IP ranges (comment out for internal scanning)
    # This is a safety measure for educational use
    try:
        ip = socket.gethostbyname(host)
        parts = ip.split('.')
        if len(parts) == 4:
            first_octet = int(parts[0])
            second_octet = int(parts[1])

            # 10.0.0.0/8
            if first_octet == 10:
                print(f"    [!] Warning: {host} ({ip}) is a private address")

            # 172.16.0.0/12
            if first_octet == 172 and 16 <= second_octet <= 31:
                print(f"    [!] Warning: {host} ({ip}) is a private address")

            # 192.168.0.0/16
            if first_octet == 192 and second_octet == 168:
                print(f"    [!] Warning: {host} ({ip}) is a private address")

    except socket.gaierror:
        pass

    return True


# Common port list (top 100 most common)
COMMON_PORTS = [
    20, 21, 22, 23, 25, 53, 67, 68, 69, 80,
    110, 111, 119, 123, 135, 137, 138, 139, 143,
    161, 162, 179, 194, 389, 443, 445, 465, 514,
    515, 520, 521, 587, 631, 636, 873, 993, 995,
    1080, 1194, 1433, 1434, 1521, 1723, 2049, 2082,
    2083, 2086, 2087, 2096, 2100, 3128, 3306, 3389,
    5060, 5432, 5900, 5901, 6379, 6667, 8000, 8008,
    8080, 8443, 8888, 9090, 9200, 9300, 10000, 11211,
    27017, 27018, 28017, 50000,
]

# Well-known service names
SERVICE_NAMES = {
    20: "ftp-data", 21: "ftp", 22: "ssh", 23: "telnet",
    25: "smtp", 53: "dns", 80: "http", 110: "pop3",
    111: "rpcbind", 119: "nntp", 123: "ntp", 135: "msrpc",
    139: "netbios-ssn", 143: "imap", 161: "snmp", 389: "ldap",
    443: "https", 445: "microsoft-ds", 465: "smtps", 514: "syslog",
    587: "submission", 636: "ldaps", 993: "imaps", 995: "pop3s",
    1433: "mssql", 1521: "oracle", 3306: "mysql", 3389: "rdp",
    5432: "postgresql", 5900: "vnc", 6379: "redis",
    8080: "http-proxy", 8443: "https-alt", 9200: "elasticsearch",
    11211: "memcached", 27017: "mongodb",
}

4. Port Scanner Module

4.1 TCP Port Scanner

"""
scanner/port_scanner.py - TCP port scanner module.
"""

import socket
import concurrent.futures
from typing import Optional

from scanner.utils import (
    PortInfo, RateLimiter, ScanTarget, Finding, Severity,
    COMMON_PORTS, SERVICE_NAMES,
)


class PortScanner:
    """
    TCP Connect port scanner.

    This performs a full TCP three-way handshake to determine
    if a port is open. It is reliable but easily detectable.

    For educational purposes only. SYN scanning (half-open)
    requires raw socket privileges and is not implemented here.
    """

    def __init__(
        self,
        timeout: float = 2.0,
        rate_limiter: Optional[RateLimiter] = None,
        max_workers: int = 50,
    ):
        """
        Args:
            timeout: Connection timeout in seconds per port.
            rate_limiter: Rate limiter for responsible scanning.
            max_workers: Maximum concurrent connection attempts.
        """
        self.timeout = timeout
        self.rate_limiter = rate_limiter or RateLimiter(100.0)
        self.max_workers = max_workers

    def scan_port(self, host: str, port: int) -> PortInfo:
        """
        Scan a single port on the target host.

        Performs a TCP connect scan (full three-way handshake).

        Args:
            host: Target hostname or IP.
            port: Port number to scan.

        Returns:
            PortInfo with the scan result.
        """
        self.rate_limiter.acquire()

        port_info = PortInfo(
            port=port,
            state="closed",
            service=SERVICE_NAMES.get(port, "unknown"),
        )

        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            result = sock.connect_ex((host, port))

            if result == 0:
                port_info.state = "open"

                # Try to get service banner
                try:
                    # Send a generic probe for some services
                    if port in (80, 8080, 8443):
                        sock.sendall(
                            f"HEAD / HTTP/1.1\r\nHost: {host}\r\n\r\n".encode()
                        )
                    elif port == 22:
                        pass  # SSH sends banner automatically
                    elif port in (21, 25, 110, 143):
                        pass  # These services send banners

                    sock.settimeout(1.5)
                    banner = sock.recv(1024)
                    port_info.banner = banner.decode('utf-8', errors='replace').strip()
                    port_info.version = self._extract_version(port_info.banner)
                except (socket.timeout, ConnectionResetError, OSError):
                    pass

            sock.close()

        except socket.timeout:
            port_info.state = "filtered"
        except ConnectionRefusedError:
            port_info.state = "closed"
        except OSError as e:
            port_info.state = "error"

        return port_info

    def scan_ports(
        self,
        host: str,
        ports: Optional[list[int]] = None,
    ) -> list[PortInfo]:
        """
        Scan multiple ports concurrently.

        Args:
            host: Target hostname or IP.
            ports: List of ports to scan. Defaults to common ports.

        Returns:
            List of PortInfo for open ports.
        """
        if ports is None:
            ports = COMMON_PORTS

        print(f"[*] Scanning {len(ports)} ports on {host}...")

        open_ports = []

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.max_workers
        ) as executor:
            # Submit all port scan tasks
            future_to_port = {
                executor.submit(self.scan_port, host, port): port
                for port in ports
            }

            # Collect results
            completed = 0
            for future in concurrent.futures.as_completed(future_to_port):
                completed += 1
                if completed % 100 == 0:
                    print(f"    Progress: {completed}/{len(ports)} ports scanned")

                try:
                    port_info = future.result()
                    if port_info.state == "open":
                        open_ports.append(port_info)
                        print(f"    [+] Port {port_info.port:5d}/tcp  OPEN  "
                              f"{port_info.service}  {port_info.version}")
                except Exception as e:
                    port = future_to_port[future]
                    print(f"    [!] Error scanning port {port}: {e}")

        open_ports.sort(key=lambda p: p.port)
        print(f"[*] Found {len(open_ports)} open ports")

        return open_ports

    def _extract_version(self, banner: str) -> str:
        """Extract service version from banner string."""
        if not banner:
            return ""

        # Common patterns
        # SSH: SSH-2.0-OpenSSH_8.9
        if banner.startswith("SSH-"):
            parts = banner.split("-")
            if len(parts) >= 3:
                return parts[2].split()[0]

        # HTTP: Server: nginx/1.24.0
        if "Server:" in banner:
            for line in banner.split("\r\n"):
                if line.startswith("Server:"):
                    return line.split(":", 1)[1].strip()

        # FTP: 220 vsftpd 3.0.5
        if banner.startswith("220"):
            parts = banner.split()
            if len(parts) >= 3:
                return " ".join(parts[1:3])

        # SMTP: 220 mail.example.com ESMTP Postfix
        if "ESMTP" in banner or "SMTP" in banner:
            return banner.split("\r\n")[0][:60]

        return banner[:60]

    def generate_findings(self, open_ports: list[PortInfo]) -> list[Finding]:
        """Generate security findings from open ports."""
        findings = []

        # Dangerous/insecure services
        dangerous_ports = {
            21: ("FTP", "FTP transmits credentials in plaintext. Use SFTP instead."),
            23: ("Telnet", "Telnet transmits all data in plaintext. Use SSH instead."),
            69: ("TFTP", "TFTP has no authentication. Restrict access."),
            111: ("RPC", "RPCbind can leak service information. Restrict to trusted networks."),
            135: ("MSRPC", "Microsoft RPC can be exploited for remote code execution."),
            139: ("NetBIOS", "NetBIOS can leak system information. Block from external access."),
            445: ("SMB", "SMB has been target of major exploits (EternalBlue). Restrict access."),
            161: ("SNMP", "SNMP v1/v2c use community strings (plaintext). Use v3 with auth."),
            1433: ("MSSQL", "Database port should not be publicly accessible."),
            3306: ("MySQL", "Database port should not be publicly accessible."),
            5432: ("PostgreSQL", "Database port should not be publicly accessible."),
            6379: ("Redis", "Redis often has no authentication. Never expose publicly."),
            11211: ("Memcached", "Memcached has no built-in auth. Used in amplification attacks."),
            27017: ("MongoDB", "MongoDB should not be publicly accessible."),
            5900: ("VNC", "VNC may use weak authentication. Use VPN for remote access."),
        }

        for port_info in open_ports:
            if port_info.port in dangerous_ports:
                service_name, description = dangerous_ports[port_info.port]
                findings.append(Finding(
                    title=f"Potentially Dangerous Service: {service_name}",
                    severity=Severity.HIGH if port_info.port in (6379, 11211, 27017)
                             else Severity.MEDIUM,
                    description=description,
                    module="port_scanner",
                    details={
                        "port": port_info.port,
                        "service": port_info.service,
                        "banner": port_info.banner[:200],
                    },
                    remediation=(
                        f"Restrict port {port_info.port} to trusted networks using "
                        f"firewall rules. Consider disabling if not needed."
                    ),
                ))

        # Check for unencrypted HTTP (when HTTPS is also available)
        has_http = any(p.port == 80 for p in open_ports)
        has_https = any(p.port == 443 for p in open_ports)

        if has_http and not has_https:
            findings.append(Finding(
                title="HTTP Without HTTPS",
                severity=Severity.HIGH,
                description="HTTP (port 80) is open but HTTPS (port 443) is not detected.",
                module="port_scanner",
                remediation="Enable HTTPS and redirect HTTP to HTTPS.",
            ))
        elif has_http and has_https:
            findings.append(Finding(
                title="HTTP Port Open (Redirect Needed)",
                severity=Severity.LOW,
                description="HTTP (port 80) is open alongside HTTPS. Ensure HTTP redirects to HTTPS.",
                module="port_scanner",
                remediation="Configure HTTP to HTTPS 301 redirect.",
            ))

        return findings

5. Banner Grabber Module

5.1 Service Banner Detection

"""
scanner/banner_grabber.py - Service banner grabbing and version detection.
"""

import socket
import ssl
from typing import Optional

from scanner.utils import PortInfo, Finding, Severity, RateLimiter


class BannerGrabber:
    """
    Grabs service banners to identify running software and versions.

    Service banners often reveal software name, version, and OS,
    which can be used to identify known vulnerabilities.
    """

    # Protocol-specific probes
    PROBES = {
        # HTTP probe
        "http": b"HEAD / HTTP/1.1\r\nHost: {host}\r\nUser-Agent: SecurityScanner/1.0\r\nAccept: */*\r\nConnection: close\r\n\r\n",
        # FTP expects no probe (server sends banner)
        "ftp": b"",
        # SMTP expects no probe
        "smtp": b"",
        # SSH expects no probe
        "ssh": b"",
        # POP3 expects no probe
        "pop3": b"",
        # IMAP expects no probe
        "imap": b"",
        # MySQL probe
        "mysql": b"",
        # Redis probe
        "redis": b"INFO\r\n",
    }

    def __init__(
        self,
        timeout: float = 3.0,
        rate_limiter: Optional[RateLimiter] = None,
    ):
        self.timeout = timeout
        self.rate_limiter = rate_limiter or RateLimiter(10.0)

    def grab_banner(self, host: str, port_info: PortInfo) -> PortInfo:
        """
        Attempt to grab a service banner from an open port.

        Updates the port_info with banner and version information.
        """
        if port_info.state != "open":
            return port_info

        if port_info.banner:
            # Already have a banner from port scan
            return port_info

        self.rate_limiter.acquire()

        # Determine the appropriate probe
        service = port_info.service.lower()
        probe = self.PROBES.get(service, b"")

        # Special handling for HTTP
        if service in ("http", "http-proxy"):
            probe = self.PROBES["http"].replace(b"{host}", host.encode())

        try:
            # Determine if we should use SSL
            use_ssl = port_info.port in (443, 465, 636, 993, 995, 8443)

            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            sock.connect((host, port_info.port))

            if use_ssl:
                context = ssl.create_default_context()
                context.check_hostname = False
                context.verify_mode = ssl.CERT_NONE
                sock = context.wrap_socket(sock, server_hostname=host)

            # Send probe if needed
            if probe:
                sock.sendall(probe)

            # Receive response
            banner = sock.recv(4096)
            port_info.banner = banner.decode('utf-8', errors='replace').strip()

            # Extract version from banner
            port_info.version = self._parse_version(
                port_info.banner, port_info.service
            )

            sock.close()

        except (socket.timeout, ConnectionRefusedError, ConnectionResetError,
                ssl.SSLError, OSError):
            pass

        return port_info

    def grab_all_banners(
        self, host: str, open_ports: list[PortInfo]
    ) -> list[PortInfo]:
        """Grab banners from all open ports."""
        print(f"[*] Grabbing banners from {len(open_ports)} open ports...")

        for port_info in open_ports:
            self.grab_banner(host, port_info)
            if port_info.banner:
                version = port_info.version or "(no version)"
                print(f"    [+] Port {port_info.port}: {version}")

        return open_ports

    def _parse_version(self, banner: str, service: str) -> str:
        """Parse version information from service banner."""
        if not banner:
            return ""

        service = service.lower()

        # SSH
        if service == "ssh" or banner.startswith("SSH-"):
            parts = banner.split("-")
            if len(parts) >= 3:
                return parts[2].split("\r")[0].split("\n")[0]

        # HTTP Server header
        if service in ("http", "https", "http-proxy", "https-alt"):
            for line in banner.split("\r\n"):
                if line.lower().startswith("server:"):
                    return line.split(":", 1)[1].strip()
            return ""

        # FTP
        if service == "ftp" and banner.startswith("220"):
            return banner.split("\r")[0].split("\n")[0][4:].strip()

        # SMTP
        if service in ("smtp", "smtps", "submission"):
            return banner.split("\r")[0].split("\n")[0].strip()

        # MySQL
        if service == "mysql":
            # MySQL banner has version in initial handshake packet
            try:
                # Look for version string pattern (e.g., 8.0.35)
                import re
                match = re.search(r'(\d+\.\d+\.\d+)', banner)
                if match:
                    return f"MySQL {match.group(1)}"
            except Exception:
                pass

        # Redis
        if service == "redis" and "redis_version:" in banner:
            for line in banner.split("\r\n"):
                if line.startswith("redis_version:"):
                    return f"Redis {line.split(':')[1]}"

        # Default: first line, truncated
        return banner.split("\r")[0].split("\n")[0][:80]

    def generate_findings(self, open_ports: list[PortInfo]) -> list[Finding]:
        """Generate findings from banner information."""
        findings = []

        for port_info in open_ports:
            if not port_info.banner:
                continue

            banner_lower = port_info.banner.lower()

            # Check for outdated/vulnerable versions
            version_checks = [
                ("apache/2.2", "Apache 2.2 is end-of-life",
                 Severity.HIGH, "Upgrade to Apache 2.4+"),
                ("apache/2.4.49", "Apache 2.4.49 has path traversal vulnerability (CVE-2021-41773)",
                 Severity.CRITICAL, "Upgrade to Apache 2.4.51+"),
                ("nginx/1.1", "Very old nginx version detected",
                 Severity.MEDIUM, "Upgrade to latest stable nginx"),
                ("openssh_7.", "OpenSSH 7.x may have known vulnerabilities",
                 Severity.MEDIUM, "Upgrade to OpenSSH 9.x+"),
                ("openssh_6.", "OpenSSH 6.x has known vulnerabilities",
                 Severity.HIGH, "Upgrade to OpenSSH 9.x+"),
                ("mysql 5.5", "MySQL 5.5 is end-of-life",
                 Severity.HIGH, "Upgrade to MySQL 8.0+"),
                ("mysql 5.6", "MySQL 5.6 is end-of-life",
                 Severity.HIGH, "Upgrade to MySQL 8.0+"),
            ]

            for pattern, description, severity, fix in version_checks:
                if pattern in banner_lower:
                    findings.append(Finding(
                        title=f"Outdated Software: {port_info.version or pattern}",
                        severity=severity,
                        description=description,
                        module="banner_grabber",
                        details={
                            "port": port_info.port,
                            "banner": port_info.banner[:200],
                        },
                        remediation=fix,
                    ))

            # Check for verbose banners (information disclosure)
            if any(keyword in banner_lower for keyword in
                   ['ubuntu', 'debian', 'centos', 'red hat', 'windows']):
                findings.append(Finding(
                    title="OS Information Disclosure",
                    severity=Severity.LOW,
                    description=(
                        f"Service banner reveals operating system information: "
                        f"{port_info.banner[:100]}"
                    ),
                    module="banner_grabber",
                    details={"port": port_info.port, "banner": port_info.banner[:200]},
                    remediation="Configure service to hide OS details from banner.",
                ))

        return findings

6. HTTP Security Header Checker

6.1 Header Analysis

"""
scanner/http_checker.py - HTTP security header analysis.
"""

import requests
from typing import Optional
from urllib.parse import urlparse

from scanner.utils import Finding, Severity, RateLimiter


class HTTPChecker:
    """
    Checks HTTP security headers and common web misconfigurations.
    """

    # Required security headers and their expected values
    SECURITY_HEADERS = {
        "Strict-Transport-Security": {
            "description": "HTTP Strict Transport Security (HSTS)",
            "severity": Severity.HIGH,
            "recommendation": "Add: Strict-Transport-Security: max-age=31536000; includeSubDomains",
            "check_value": lambda v: "max-age=" in v and int(
                v.split("max-age=")[1].split(";")[0].strip()
            ) >= 31536000 if "max-age=" in v else False,
        },
        "Content-Security-Policy": {
            "description": "Content Security Policy (CSP)",
            "severity": Severity.HIGH,
            "recommendation": "Add a Content-Security-Policy header. Start with: default-src 'self'",
            "check_value": lambda v: "default-src" in v or "script-src" in v,
        },
        "X-Content-Type-Options": {
            "description": "MIME type sniffing prevention",
            "severity": Severity.MEDIUM,
            "recommendation": "Add: X-Content-Type-Options: nosniff",
            "check_value": lambda v: v.lower() == "nosniff",
        },
        "X-Frame-Options": {
            "description": "Clickjacking protection",
            "severity": Severity.MEDIUM,
            "recommendation": "Add: X-Frame-Options: DENY (or SAMEORIGIN)",
            "check_value": lambda v: v.upper() in ("DENY", "SAMEORIGIN"),
        },
        "Referrer-Policy": {
            "description": "Referrer information control",
            "severity": Severity.LOW,
            "recommendation": "Add: Referrer-Policy: strict-origin-when-cross-origin",
            "check_value": lambda v: v in (
                "no-referrer", "strict-origin",
                "strict-origin-when-cross-origin", "same-origin"
            ),
        },
        "Permissions-Policy": {
            "description": "Browser feature restrictions",
            "severity": Severity.LOW,
            "recommendation": "Add: Permissions-Policy: camera=(), microphone=(), geolocation=()",
            "check_value": lambda v: len(v) > 0,
        },
    }

    # Headers that should NOT be present (information disclosure)
    BAD_HEADERS = {
        "X-Powered-By": {
            "description": "Reveals server technology",
            "severity": Severity.LOW,
            "recommendation": "Remove X-Powered-By header to reduce information disclosure.",
        },
        "Server": {
            "description": "May reveal server software and version",
            "severity": Severity.LOW,
            "recommendation": "Configure server to send minimal Server header.",
            "check_value": lambda v: any(
                keyword in v.lower()
                for keyword in ["apache", "nginx", "iis", "tomcat", "express"]
            ),
        },
        "X-AspNet-Version": {
            "description": "Reveals ASP.NET version",
            "severity": Severity.LOW,
            "recommendation": "Remove X-AspNet-Version header.",
        },
    }

    def __init__(
        self,
        timeout: float = 10.0,
        rate_limiter: Optional[RateLimiter] = None,
        verify_ssl: bool = True,
    ):
        self.timeout = timeout
        self.rate_limiter = rate_limiter or RateLimiter(5.0)
        self.verify_ssl = verify_ssl

    def check(self, url: str) -> list[Finding]:
        """
        Check HTTP security headers for a URL.

        Args:
            url: Target URL (http:// or https://).

        Returns:
            List of findings.
        """
        self.rate_limiter.acquire()
        findings = []

        print(f"[*] Checking HTTP security headers for {url}...")

        try:
            response = requests.get(
                url,
                timeout=self.timeout,
                verify=self.verify_ssl,
                allow_redirects=True,
                headers={
                    "User-Agent": "SecurityScanner/1.0 (Authorized Security Scan)"
                },
            )
        except requests.exceptions.SSLError as e:
            findings.append(Finding(
                title="SSL/TLS Error",
                severity=Severity.HIGH,
                description=f"SSL error connecting to {url}: {str(e)[:200]}",
                module="http_checker",
                remediation="Fix SSL/TLS configuration. Ensure valid certificate.",
            ))
            return findings
        except requests.exceptions.ConnectionError as e:
            findings.append(Finding(
                title="Connection Error",
                severity=Severity.INFO,
                description=f"Could not connect to {url}: {str(e)[:200]}",
                module="http_checker",
            ))
            return findings
        except requests.exceptions.Timeout:
            findings.append(Finding(
                title="Connection Timeout",
                severity=Severity.INFO,
                description=f"Connection to {url} timed out after {self.timeout}s",
                module="http_checker",
            ))
            return findings

        headers = response.headers

        # Check for missing security headers
        for header_name, config in self.SECURITY_HEADERS.items():
            value = headers.get(header_name)

            if not value:
                findings.append(Finding(
                    title=f"Missing Security Header: {header_name}",
                    severity=config["severity"],
                    description=f"{config['description']} header is not set.",
                    module="http_checker",
                    details={"header": header_name, "url": url},
                    remediation=config["recommendation"],
                ))
            elif config.get("check_value"):
                try:
                    if not config["check_value"](value):
                        findings.append(Finding(
                            title=f"Weak Security Header: {header_name}",
                            severity=Severity.LOW,
                            description=(
                                f"{header_name} is set but may be insufficiently configured: "
                                f"{value[:100]}"
                            ),
                            module="http_checker",
                            details={"header": header_name, "value": value, "url": url},
                            remediation=config["recommendation"],
                        ))
                except (ValueError, IndexError):
                    pass

        # Check for information disclosure headers
        for header_name, config in self.BAD_HEADERS.items():
            value = headers.get(header_name)
            if value:
                # For Server header, only flag if verbose
                if header_name == "Server" and config.get("check_value"):
                    if not config["check_value"](value):
                        continue  # Minimal server header, OK

                findings.append(Finding(
                    title=f"Information Disclosure: {header_name}",
                    severity=config["severity"],
                    description=f"{config['description']}: {value}",
                    module="http_checker",
                    details={"header": header_name, "value": value},
                    remediation=config["recommendation"],
                ))

        # Check for HTTPS redirect
        if url.startswith("http://"):
            if not response.url.startswith("https://"):
                findings.append(Finding(
                    title="No HTTP to HTTPS Redirect",
                    severity=Severity.HIGH,
                    description="HTTP requests are not redirected to HTTPS.",
                    module="http_checker",
                    remediation="Configure server to redirect all HTTP requests to HTTPS.",
                ))

        # Check cookie security
        for cookie_header in response.headers.getlist("Set-Cookie") if hasattr(
            response.headers, 'getlist'
        ) else []:
            self._check_cookie_security(cookie_header, findings, url)

        # Check for cookies in raw headers
        raw_cookies = headers.get("Set-Cookie", "")
        if raw_cookies:
            self._check_cookie_security(raw_cookies, findings, url)

        print(f"    Found {len(findings)} HTTP security issues")
        return findings

    def _check_cookie_security(self, cookie_str: str, findings: list[Finding],
                                url: str) -> None:
        """Check cookie security attributes."""
        cookie_lower = cookie_str.lower()

        if "secure" not in cookie_lower and url.startswith("https"):
            findings.append(Finding(
                title="Cookie Without Secure Flag",
                severity=Severity.MEDIUM,
                description="A cookie is set without the Secure flag over HTTPS.",
                module="http_checker",
                details={"cookie": cookie_str[:100]},
                remediation="Add Secure flag to all cookies on HTTPS sites.",
            ))

        if "httponly" not in cookie_lower:
            findings.append(Finding(
                title="Cookie Without HttpOnly Flag",
                severity=Severity.MEDIUM,
                description="A cookie is set without the HttpOnly flag (accessible by JavaScript).",
                module="http_checker",
                details={"cookie": cookie_str[:100]},
                remediation="Add HttpOnly flag to cookies not needed by client-side JavaScript.",
            ))

        if "samesite" not in cookie_lower:
            findings.append(Finding(
                title="Cookie Without SameSite Attribute",
                severity=Severity.LOW,
                description="A cookie is missing the SameSite attribute (CSRF risk).",
                module="http_checker",
                details={"cookie": cookie_str[:100]},
                remediation="Add SameSite=Lax or SameSite=Strict to cookies.",
            ))

7. SSL/TLS Configuration Analyzer

7.1 SSL/TLS Checker

"""
scanner/ssl_checker.py - SSL/TLS configuration analyzer.
"""

import socket
import ssl
from datetime import datetime, timezone
from typing import Optional

from scanner.utils import Finding, Severity, RateLimiter


class SSLChecker:
    """
    Analyzes SSL/TLS configuration for security issues.

    Checks:
    - Certificate validity and expiration
    - Protocol version support
    - Cipher suite strength
    - Common misconfigurations
    """

    # Weak protocols
    WEAK_PROTOCOLS = {
        ssl.TLSVersion.SSLv3: "SSLv3 (POODLE vulnerability)",
        ssl.TLSVersion.TLSv1: "TLS 1.0 (deprecated, multiple vulnerabilities)",
        ssl.TLSVersion.TLSv1_1: "TLS 1.1 (deprecated)",
    }

    # Weak cipher keywords
    WEAK_CIPHER_KEYWORDS = [
        "RC4", "DES", "3DES", "MD5", "NULL", "EXPORT",
        "anon", "RC2", "SEED", "IDEA",
    ]

    def __init__(
        self,
        timeout: float = 10.0,
        rate_limiter: Optional[RateLimiter] = None,
    ):
        self.timeout = timeout
        self.rate_limiter = rate_limiter or RateLimiter(5.0)

    def check(self, host: str, port: int = 443) -> list[Finding]:
        """
        Check SSL/TLS configuration of a host.

        Args:
            host: Target hostname.
            port: HTTPS port (default 443).

        Returns:
            List of findings.
        """
        self.rate_limiter.acquire()
        findings = []

        print(f"[*] Checking SSL/TLS configuration for {host}:{port}...")

        # 1. Check certificate
        cert_findings = self._check_certificate(host, port)
        findings.extend(cert_findings)

        # 2. Check supported protocols
        protocol_findings = self._check_protocols(host, port)
        findings.extend(protocol_findings)

        # 3. Check cipher suites
        cipher_findings = self._check_ciphers(host, port)
        findings.extend(cipher_findings)

        print(f"    Found {len(findings)} SSL/TLS issues")
        return findings

    def _check_certificate(self, host: str, port: int) -> list[Finding]:
        """Check SSL certificate validity."""
        findings = []

        try:
            context = ssl.create_default_context()
            with socket.create_connection(
                (host, port), timeout=self.timeout
            ) as sock:
                with context.wrap_socket(sock, server_hostname=host) as ssock:
                    cert = ssock.getpeercert()

                    if not cert:
                        findings.append(Finding(
                            title="No SSL Certificate",
                            severity=Severity.CRITICAL,
                            description="No SSL certificate presented by server.",
                            module="ssl_checker",
                            remediation="Install a valid SSL certificate.",
                        ))
                        return findings

                    # Check expiration
                    not_after = datetime.strptime(
                        cert['notAfter'], '%b %d %H:%M:%S %Y %Z'
                    ).replace(tzinfo=timezone.utc)
                    now = datetime.now(timezone.utc)
                    days_until_expiry = (not_after - now).days

                    if days_until_expiry < 0:
                        findings.append(Finding(
                            title="Expired SSL Certificate",
                            severity=Severity.CRITICAL,
                            description=f"Certificate expired {abs(days_until_expiry)} days ago.",
                            module="ssl_checker",
                            details={
                                "expiry_date": cert['notAfter'],
                                "days_expired": abs(days_until_expiry),
                            },
                            remediation="Renew the SSL certificate immediately.",
                        ))
                    elif days_until_expiry < 30:
                        findings.append(Finding(
                            title="SSL Certificate Expiring Soon",
                            severity=Severity.HIGH,
                            description=f"Certificate expires in {days_until_expiry} days.",
                            module="ssl_checker",
                            details={
                                "expiry_date": cert['notAfter'],
                                "days_remaining": days_until_expiry,
                            },
                            remediation="Renew the SSL certificate before expiration.",
                        ))
                    elif days_until_expiry < 90:
                        findings.append(Finding(
                            title="SSL Certificate Expiring Within 90 Days",
                            severity=Severity.MEDIUM,
                            description=f"Certificate expires in {days_until_expiry} days.",
                            module="ssl_checker",
                            details={
                                "expiry_date": cert['notAfter'],
                                "days_remaining": days_until_expiry,
                            },
                            remediation="Plan certificate renewal.",
                        ))

                    # Check subject alternative names
                    sans = []
                    for type_name, value in cert.get('subjectAltName', []):
                        if type_name == 'DNS':
                            sans.append(value)

                    if host not in sans and f"*.{'.'.join(host.split('.')[1:])}" not in sans:
                        findings.append(Finding(
                            title="Certificate Hostname Mismatch",
                            severity=Severity.HIGH,
                            description=(
                                f"Certificate SANs {sans} do not include {host}."
                            ),
                            module="ssl_checker",
                            remediation="Reissue certificate with correct hostname.",
                        ))

                    # Check issuer (self-signed)
                    issuer = dict(x[0] for x in cert.get('issuer', []))
                    subject = dict(x[0] for x in cert.get('subject', []))
                    if issuer == subject:
                        findings.append(Finding(
                            title="Self-Signed Certificate",
                            severity=Severity.MEDIUM,
                            description="The certificate is self-signed (not trusted by browsers).",
                            module="ssl_checker",
                            details={"issuer": str(issuer)},
                            remediation="Use a certificate from a trusted CA (e.g., Let's Encrypt).",
                        ))

        except ssl.SSLCertVerificationError as e:
            findings.append(Finding(
                title="SSL Certificate Verification Failed",
                severity=Severity.HIGH,
                description=f"Certificate verification error: {str(e)[:200]}",
                module="ssl_checker",
                remediation="Fix certificate chain or install valid certificate.",
            ))
        except (socket.timeout, ConnectionRefusedError, OSError) as e:
            findings.append(Finding(
                title="SSL Connection Failed",
                severity=Severity.INFO,
                description=f"Could not establish SSL connection: {str(e)[:200]}",
                module="ssl_checker",
            ))

        return findings

    def _check_protocols(self, host: str, port: int) -> list[Finding]:
        """Check which TLS protocol versions are supported."""
        findings = []

        # Test for weak protocols
        for protocol_version, description in self.WEAK_PROTOCOLS.items():
            try:
                context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
                context.check_hostname = False
                context.verify_mode = ssl.CERT_NONE
                context.minimum_version = protocol_version
                context.maximum_version = protocol_version

                with socket.create_connection(
                    (host, port), timeout=self.timeout
                ) as sock:
                    with context.wrap_socket(sock) as ssock:
                        # If we get here, the weak protocol is supported
                        findings.append(Finding(
                            title=f"Weak TLS Protocol Supported",
                            severity=Severity.HIGH if "SSLv3" in description
                                     or "TLS 1.0" in description
                                     else Severity.MEDIUM,
                            description=f"Server supports {description}.",
                            module="ssl_checker",
                            details={"protocol": str(protocol_version)},
                            remediation=f"Disable {description}. Only allow TLS 1.2+.",
                        ))

            except (ssl.SSLError, socket.timeout, ConnectionRefusedError, OSError):
                pass  # Protocol not supported (good)

        return findings

    def _check_ciphers(self, host: str, port: int) -> list[Finding]:
        """Check for weak cipher suites."""
        findings = []

        try:
            context = ssl.create_default_context()
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE

            with socket.create_connection(
                (host, port), timeout=self.timeout
            ) as sock:
                with context.wrap_socket(sock, server_hostname=host) as ssock:
                    cipher = ssock.cipher()

                    if cipher:
                        cipher_name, protocol, bits = cipher

                        # Check for weak ciphers
                        for weak_keyword in self.WEAK_CIPHER_KEYWORDS:
                            if weak_keyword in cipher_name.upper():
                                findings.append(Finding(
                                    title=f"Weak Cipher Suite: {cipher_name}",
                                    severity=Severity.HIGH,
                                    description=(
                                        f"Server negotiated weak cipher: {cipher_name} "
                                        f"({bits} bits, {protocol})"
                                    ),
                                    module="ssl_checker",
                                    details={
                                        "cipher": cipher_name,
                                        "protocol": protocol,
                                        "bits": bits,
                                    },
                                    remediation="Disable weak cipher suites. Use ECDHE-RSA-AES256-GCM-SHA384 or similar.",
                                ))
                                break

                        # Check key length
                        if bits and bits < 128:
                            findings.append(Finding(
                                title=f"Weak Cipher Key Length: {bits} bits",
                                severity=Severity.HIGH,
                                description=f"Cipher key length is only {bits} bits.",
                                module="ssl_checker",
                                remediation="Use cipher suites with at least 128-bit keys.",
                            ))

        except (ssl.SSLError, socket.timeout, ConnectionRefusedError, OSError):
            pass

        return findings

8. Directory Scanner Module

8.1 Path Discovery

"""
scanner/dir_scanner.py - Directory and path discovery.

This module performs dictionary-based path discovery to find
hidden endpoints, backup files, and sensitive paths.

For educational purposes only. Never brute force paths without permission.
"""

import requests
from typing import Optional
from pathlib import Path

from scanner.utils import Finding, Severity, RateLimiter


# Built-in small wordlist for common paths
DEFAULT_WORDLIST = [
    # Configuration and environment files
    ".env", ".env.backup", ".env.production",
    ".git/config", ".git/HEAD",
    ".svn/entries",
    ".htaccess", ".htpasswd",
    "web.config",
    "robots.txt", "sitemap.xml",

    # Admin panels
    "admin", "admin/login", "administrator",
    "dashboard", "manage", "panel",
    "wp-admin", "wp-login.php",
    "phpmyadmin", "adminer.php",

    # API documentation
    "api", "api/v1", "api/docs",
    "swagger.json", "openapi.json",
    "api-docs", "graphql",

    # Backup and debug files
    "backup", "backup.zip", "backup.sql",
    "database.sql", "dump.sql",
    "debug", "debug.log",
    "error.log", "access.log",

    # Common application paths
    "server-status", "server-info",
    "info.php", "phpinfo.php",
    "test", "test.html", "test.php",
    "status", "health", "healthcheck",
    "metrics", "prometheus",

    # User-related paths
    "login", "register", "signup",
    "reset-password", "forgot-password",
    "profile", "account", "settings",

    # Static/asset paths
    "static", "assets", "uploads",
    "media", "files", "documents",
    "images", "img", "css", "js",

    # Development artifacts
    ".DS_Store",
    "composer.json", "package.json",
    "Gemfile", "requirements.txt",
    "Dockerfile", "docker-compose.yml",
]

# Paths that indicate security issues when accessible
SENSITIVE_PATHS = {
    ".env": "Environment file may contain secrets (API keys, passwords)",
    ".git/config": "Git configuration exposed - entire source code may be downloadable",
    ".git/HEAD": "Git repository exposed",
    ".svn/entries": "SVN repository exposed",
    ".htpasswd": "Apache password file exposed",
    "phpinfo.php": "PHP info page exposes server configuration details",
    "info.php": "PHP info page exposes server configuration details",
    "backup.sql": "Database backup file publicly accessible",
    "database.sql": "Database dump file publicly accessible",
    "dump.sql": "Database dump file publicly accessible",
    "backup.zip": "Backup archive publicly accessible",
    "debug.log": "Debug log may contain sensitive information",
    "server-status": "Apache server-status page exposed",
    "server-info": "Apache server-info page exposed",
    ".DS_Store": "macOS directory metadata may reveal file structure",
    "web.config": "IIS configuration file exposed",
    "composer.json": "PHP dependency file reveals technology stack",
    "package.json": "Node.js dependency file reveals technology stack",
    "requirements.txt": "Python dependency file reveals technology stack",
    "Dockerfile": "Docker configuration may reveal architecture",
    "swagger.json": "API specification publicly accessible",
    "openapi.json": "API specification publicly accessible",
    "graphql": "GraphQL endpoint may allow introspection",
}


class DirectoryScanner:
    """
    Dictionary-based directory and path discovery.
    Uses a wordlist to find accessible endpoints.
    """

    def __init__(
        self,
        timeout: float = 5.0,
        rate_limiter: Optional[RateLimiter] = None,
        wordlist_file: Optional[str] = None,
    ):
        self.timeout = timeout
        self.rate_limiter = rate_limiter or RateLimiter(10.0)

        # Load wordlist
        if wordlist_file and Path(wordlist_file).exists():
            self.wordlist = Path(wordlist_file).read_text().splitlines()
            self.wordlist = [w.strip() for w in self.wordlist if w.strip() and not w.startswith('#')]
        else:
            self.wordlist = DEFAULT_WORDLIST

    def scan(self, base_url: str) -> list[Finding]:
        """
        Scan for accessible directories and files.

        Args:
            base_url: Target base URL (e.g., https://example.com).

        Returns:
            List of findings.
        """
        findings = []
        discovered = []

        # Clean base URL
        base_url = base_url.rstrip('/')

        print(f"[*] Scanning {len(self.wordlist)} paths on {base_url}...")

        for i, path in enumerate(self.wordlist):
            if i > 0 and i % 50 == 0:
                print(f"    Progress: {i}/{len(self.wordlist)} paths checked, "
                      f"{len(discovered)} found")

            self.rate_limiter.acquire()

            url = f"{base_url}/{path}"

            try:
                response = requests.get(
                    url,
                    timeout=self.timeout,
                    allow_redirects=False,  # Don't follow redirects
                    verify=True,
                    headers={
                        "User-Agent": "SecurityScanner/1.0 (Authorized Scan)"
                    },
                )

                status = response.status_code

                # 200: directly accessible
                # 301/302: redirects (may still be interesting)
                # 403: forbidden (exists but restricted)
                if status == 200:
                    size = len(response.content)
                    discovered.append({
                        "path": path,
                        "status": status,
                        "size": size,
                    })
                    print(f"    [+] FOUND: /{path}  (200 OK, {size} bytes)")

                    # Check if this is a sensitive path
                    if path in SENSITIVE_PATHS:
                        findings.append(Finding(
                            title=f"Sensitive Path Accessible: /{path}",
                            severity=Severity.HIGH,
                            description=SENSITIVE_PATHS[path],
                            module="dir_scanner",
                            details={
                                "path": path,
                                "url": url,
                                "status_code": status,
                                "size": size,
                            },
                            remediation=(
                                f"Restrict access to /{path}. "
                                f"Remove from web root or deny in server configuration."
                            ),
                        ))
                    else:
                        findings.append(Finding(
                            title=f"Discovered Path: /{path}",
                            severity=Severity.INFO,
                            description=f"Path /{path} is accessible (HTTP 200).",
                            module="dir_scanner",
                            details={
                                "path": path,
                                "url": url,
                                "status_code": status,
                                "size": size,
                            },
                        ))

                elif status == 403:
                    # Exists but forbidden - note for manual testing
                    discovered.append({
                        "path": path,
                        "status": status,
                        "size": 0,
                    })
                    if path in SENSITIVE_PATHS:
                        findings.append(Finding(
                            title=f"Restricted Path Detected: /{path}",
                            severity=Severity.LOW,
                            description=(
                                f"Path /{path} exists but returns 403 Forbidden. "
                                f"Verify access controls are correct."
                            ),
                            module="dir_scanner",
                            details={"path": path, "status_code": status},
                        ))

            except requests.exceptions.RequestException:
                pass  # Connection error, skip this path

        print(f"[*] Directory scan complete. Found {len(discovered)} paths, "
              f"{len(findings)} findings.")

        return findings

9. CVE Database Lookup

9.1 CVE Lookup Module

"""
scanner/cve_lookup.py - CVE (Common Vulnerabilities and Exposures) lookup.

Queries public CVE databases to find known vulnerabilities
for discovered services and versions.
"""

import re
import json
import requests
from typing import Optional

from scanner.utils import Finding, Severity, PortInfo, RateLimiter


# Simplified local CVE database for common services
# In production, query NVD API, VulnDB, or similar
LOCAL_CVE_DB = {
    "apache": {
        "2.4.49": [
            {
                "id": "CVE-2021-41773",
                "severity": "CRITICAL",
                "description": "Path traversal and RCE in Apache HTTP Server 2.4.49",
                "fix": "Upgrade to Apache 2.4.51+",
            },
        ],
        "2.4.50": [
            {
                "id": "CVE-2021-42013",
                "severity": "CRITICAL",
                "description": "Path traversal in Apache HTTP Server 2.4.50 (bypass of CVE-2021-41773)",
                "fix": "Upgrade to Apache 2.4.51+",
            },
        ],
    },
    "openssh": {
        "8.5": [
            {
                "id": "CVE-2021-41617",
                "severity": "HIGH",
                "description": "Privilege escalation in OpenSSH 6.2-8.7",
                "fix": "Upgrade to OpenSSH 8.8+",
            },
        ],
    },
    "nginx": {
        "1.20.0": [
            {
                "id": "CVE-2021-23017",
                "severity": "HIGH",
                "description": "DNS resolver vulnerabilities in nginx 0.6.18-1.20.0",
                "fix": "Upgrade to nginx 1.20.1+",
            },
        ],
    },
    "mysql": {
        "5.7": [
            {
                "id": "CVE-2023-21912",
                "severity": "HIGH",
                "description": "MySQL Server 5.7 multiple vulnerabilities",
                "fix": "Upgrade to MySQL 8.0+",
            },
        ],
    },
    "redis": {
        "6.0": [
            {
                "id": "CVE-2022-24735",
                "severity": "HIGH",
                "description": "Redis Lua script sandbox escape",
                "fix": "Upgrade to Redis 6.2.7+ or 7.0+",
            },
        ],
    },
}


class CVELookup:
    """
    Look up known CVEs for discovered services.
    Uses both a local database and optional online API queries.
    """

    NVD_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"

    def __init__(
        self,
        use_online: bool = False,
        rate_limiter: Optional[RateLimiter] = None,
    ):
        """
        Args:
            use_online: Whether to query online NVD API.
            rate_limiter: Rate limiter for API calls.
        """
        self.use_online = use_online
        self.rate_limiter = rate_limiter or RateLimiter(1.0)  # NVD rate limits

    def lookup(self, open_ports: list[PortInfo]) -> list[Finding]:
        """
        Look up CVEs for all discovered services.

        Args:
            open_ports: List of open ports with service information.

        Returns:
            List of CVE findings.
        """
        findings = []

        print(f"[*] Looking up known CVEs for {len(open_ports)} services...")

        for port_info in open_ports:
            if not port_info.version:
                continue

            # Parse service name and version
            service_name, version = self._parse_service_version(
                port_info.service, port_info.version, port_info.banner
            )

            if not service_name or not version:
                continue

            # Check local database
            local_cves = self._lookup_local(service_name, version)
            for cve in local_cves:
                severity_map = {
                    "CRITICAL": Severity.CRITICAL,
                    "HIGH": Severity.HIGH,
                    "MEDIUM": Severity.MEDIUM,
                    "LOW": Severity.LOW,
                }
                findings.append(Finding(
                    title=f"Known CVE: {cve['id']}",
                    severity=severity_map.get(cve['severity'], Severity.MEDIUM),
                    description=cve['description'],
                    module="cve_lookup",
                    details={
                        "cve_id": cve['id'],
                        "service": service_name,
                        "version": version,
                        "port": port_info.port,
                    },
                    remediation=cve.get('fix', 'Upgrade to the latest version.'),
                    reference=f"https://nvd.nist.gov/vuln/detail/{cve['id']}",
                ))
                print(f"    [!] {cve['id']}: {service_name} {version} "
                      f"on port {port_info.port}")

            # Query online NVD if enabled
            if self.use_online:
                online_cves = self._lookup_nvd(service_name, version)
                for cve in online_cves:
                    if not any(f.details.get('cve_id') == cve['id']
                               for f in findings):
                        findings.append(Finding(
                            title=f"Known CVE (NVD): {cve['id']}",
                            severity=Severity.MEDIUM,  # Default, refine from CVSS
                            description=cve['description'][:200],
                            module="cve_lookup",
                            details={
                                "cve_id": cve['id'],
                                "service": service_name,
                                "version": version,
                                "port": port_info.port,
                            },
                            reference=f"https://nvd.nist.gov/vuln/detail/{cve['id']}",
                        ))

        print(f"[*] CVE lookup complete. Found {len(findings)} known vulnerabilities.")
        return findings

    def _parse_service_version(
        self, service: str, version: str, banner: str
    ) -> tuple[str, str]:
        """Extract standardized service name and version."""
        version_lower = version.lower()
        banner_lower = banner.lower()

        # Apache
        if "apache" in version_lower or "apache" in banner_lower:
            match = re.search(r'apache[/ ](\d+\.\d+\.\d+)', version_lower + " " + banner_lower)
            if match:
                return "apache", match.group(1)

        # nginx
        if "nginx" in version_lower or "nginx" in banner_lower:
            match = re.search(r'nginx[/ ](\d+\.\d+\.\d+)', version_lower + " " + banner_lower)
            if match:
                return "nginx", match.group(1)

        # OpenSSH
        if "openssh" in version_lower or "openssh" in banner_lower:
            match = re.search(r'openssh[_/ ](\d+\.\d+)', version_lower + " " + banner_lower)
            if match:
                return "openssh", match.group(1)

        # MySQL
        if "mysql" in version_lower or "mysql" in banner_lower:
            match = re.search(r'mysql[/ ]*(\d+\.\d+)', version_lower + " " + banner_lower)
            if match:
                return "mysql", match.group(1)

        # Redis
        if "redis" in version_lower or "redis" in banner_lower:
            match = re.search(r'redis[/ ]*(\d+\.\d+)', version_lower + " " + banner_lower)
            if match:
                return "redis", match.group(1)

        return "", ""

    def _lookup_local(self, service: str, version: str) -> list[dict]:
        """Look up CVEs in local database."""
        service_db = LOCAL_CVE_DB.get(service.lower(), {})

        results = []
        for db_version, cves in service_db.items():
            # Check if the detected version matches
            if version.startswith(db_version):
                results.extend(cves)

        return results

    def _lookup_nvd(self, service: str, version: str) -> list[dict]:
        """Query NVD API for CVEs (rate-limited)."""
        self.rate_limiter.acquire()

        try:
            response = requests.get(
                self.NVD_API_URL,
                params={
                    "keywordSearch": f"{service} {version}",
                    "resultsPerPage": 5,
                },
                timeout=15,
                headers={"User-Agent": "SecurityScanner/1.0"},
            )

            if response.status_code == 200:
                data = response.json()
                results = []
                for vuln in data.get("vulnerabilities", []):
                    cve = vuln.get("cve", {})
                    desc = ""
                    for d in cve.get("descriptions", []):
                        if d.get("lang") == "en":
                            desc = d.get("value", "")
                            break

                    results.append({
                        "id": cve.get("id", ""),
                        "description": desc,
                    })
                return results

        except (requests.exceptions.RequestException, json.JSONDecodeError):
            pass

        return []

10. Report Generation

10.1 Report Engine

"""
scanner/report.py - Generate scan reports in multiple formats.
"""

import json
from dataclasses import asdict
from datetime import datetime

from scanner.utils import ScanResult, Finding, Severity


class ReportGenerator:
    """Generate vulnerability scan reports."""

    def generate_text(self, result: ScanResult) -> str:
        """Generate a text report."""
        lines = []

        lines.append("=" * 70)
        lines.append("  VULNERABILITY SCAN REPORT")
        lines.append("=" * 70)
        lines.append(f"  Target:    {result.target.host} ({result.target.ip})")
        lines.append(f"  Scan Date: {result.start_time}")
        lines.append(f"  Duration:  {result.duration_seconds:.1f} seconds")

        # Summary
        severity_counts = {}
        for f in result.findings:
            severity_counts[f.severity.value] = \
                severity_counts.get(f.severity.value, 0) + 1

        lines.append(f"\n  Open Ports:   {len(result.open_ports)}")
        lines.append(f"  Total Findings: {len(result.findings)}")

        for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
            count = severity_counts.get(sev, 0)
            if count > 0:
                lines.append(f"    {sev:10s}: {count}")

        # Open Ports
        if result.open_ports:
            lines.append(f"\n{'─' * 70}")
            lines.append("  OPEN PORTS")
            lines.append(f"{'─' * 70}")
            lines.append(f"  {'Port':>7s}  {'State':8s}  {'Service':15s}  {'Version'}")
            lines.append(f"  {'─'*7}  {'─'*8}  {'─'*15}  {'─'*30}")

            for port in result.open_ports:
                lines.append(
                    f"  {port.port:>7d}  {port.state:8s}  "
                    f"{port.service:15s}  {port.version[:30]}"
                )

        # Findings by severity
        for severity in [Severity.CRITICAL, Severity.HIGH, Severity.MEDIUM,
                         Severity.LOW, Severity.INFO]:
            findings = [f for f in result.findings if f.severity == severity]
            if not findings:
                continue

            lines.append(f"\n{'─' * 70}")
            lines.append(f"  {severity.value} FINDINGS ({len(findings)})")
            lines.append(f"{'─' * 70}")

            for i, finding in enumerate(findings, 1):
                lines.append(f"\n  {i}. [{finding.module}] {finding.title}")
                lines.append(f"     {finding.description}")
                if finding.remediation:
                    lines.append(f"     Fix: {finding.remediation}")
                if finding.reference:
                    lines.append(f"     Ref: {finding.reference}")
                if finding.details:
                    for key, value in finding.details.items():
                        lines.append(f"     {key}: {value}")

        # Errors
        if result.errors:
            lines.append(f"\n{'─' * 70}")
            lines.append("  ERRORS")
            lines.append(f"{'─' * 70}")
            for error in result.errors:
                lines.append(f"  [!] {error}")

        # Footer
        lines.append(f"\n{'=' * 70}")
        if severity_counts.get("CRITICAL", 0) > 0:
            lines.append("  OVERALL: CRITICAL vulnerabilities found!")
            lines.append("  Immediate remediation required.")
        elif severity_counts.get("HIGH", 0) > 0:
            lines.append("  OVERALL: HIGH severity vulnerabilities found.")
            lines.append("  Remediation recommended within 7 days.")
        elif severity_counts.get("MEDIUM", 0) > 0:
            lines.append("  OVERALL: MEDIUM severity issues found.")
            lines.append("  Remediation recommended within 30 days.")
        else:
            lines.append("  OVERALL: No critical vulnerabilities found.")
        lines.append("=" * 70)

        return "\n".join(lines)

    def generate_json(self, result: ScanResult) -> str:
        """Generate a JSON report."""
        report = {
            "scan_metadata": {
                "target": result.target.host,
                "ip": result.target.ip,
                "start_time": result.start_time,
                "end_time": result.end_time,
                "duration_seconds": result.duration_seconds,
            },
            "summary": {
                "open_ports": len(result.open_ports),
                "total_findings": len(result.findings),
                "by_severity": {},
            },
            "open_ports": [
                {
                    "port": p.port,
                    "state": p.state,
                    "service": p.service,
                    "version": p.version,
                    "banner": p.banner[:200],
                }
                for p in result.open_ports
            ],
            "findings": [
                {
                    "title": f.title,
                    "severity": f.severity.value,
                    "description": f.description,
                    "module": f.module,
                    "details": f.details,
                    "remediation": f.remediation,
                    "reference": f.reference,
                }
                for f in result.findings
            ],
            "errors": result.errors,
        }

        # Add severity counts
        for f in result.findings:
            sev = f.severity.value
            report["summary"]["by_severity"][sev] = \
                report["summary"]["by_severity"].get(sev, 0) + 1

        return json.dumps(report, indent=2)

    def generate_html(self, result: ScanResult) -> str:
        """Generate an HTML report."""
        severity_colors = {
            "CRITICAL": "#dc3545",
            "HIGH": "#fd7e14",
            "MEDIUM": "#ffc107",
            "LOW": "#17a2b8",
            "INFO": "#6c757d",
        }

        html = f"""<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Vulnerability Scan Report - {result.target.host}</title>
    <style>
        body {{ font-family: -apple-system, BlinkMacSystemFont, sans-serif;
               max-width: 900px; margin: 0 auto; padding: 20px;
               background: #f8f9fa; }}
        h1 {{ color: #212529; border-bottom: 2px solid #dee2e6; padding-bottom: 10px; }}
        h2 {{ color: #495057; margin-top: 30px; }}
        .summary {{ background: #fff; padding: 20px; border-radius: 8px;
                    box-shadow: 0 1px 3px rgba(0,0,0,0.1); }}
        .finding {{ background: #fff; padding: 15px; margin: 10px 0;
                    border-radius: 8px; border-left: 4px solid #dee2e6;
                    box-shadow: 0 1px 3px rgba(0,0,0,0.1); }}
        .severity {{ display: inline-block; padding: 2px 8px; border-radius: 4px;
                     color: white; font-size: 0.85em; font-weight: bold; }}
        table {{ width: 100%; border-collapse: collapse; background: #fff;
                border-radius: 8px; overflow: hidden; }}
        th, td {{ padding: 10px 15px; text-align: left; border-bottom: 1px solid #dee2e6; }}
        th {{ background: #495057; color: white; }}
        .fix {{ color: #28a745; font-style: italic; }}
    </style>
</head>
<body>
    <h1>Vulnerability Scan Report</h1>
    <div class="summary">
        <p><strong>Target:</strong> {result.target.host} ({result.target.ip})</p>
        <p><strong>Date:</strong> {result.start_time}</p>
        <p><strong>Duration:</strong> {result.duration_seconds:.1f} seconds</p>
        <p><strong>Open Ports:</strong> {len(result.open_ports)} |
           <strong>Findings:</strong> {len(result.findings)}</p>
    </div>

    <h2>Open Ports</h2>
    <table>
        <tr><th>Port</th><th>State</th><th>Service</th><th>Version</th></tr>
"""
        for port in result.open_ports:
            html += f"        <tr><td>{port.port}</td><td>{port.state}</td>"
            html += f"<td>{port.service}</td><td>{port.version[:50]}</td></tr>\n"

        html += "    </table>\n\n    <h2>Findings</h2>\n"

        for severity_name in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
            findings = [f for f in result.findings
                        if f.severity.value == severity_name]
            if not findings:
                continue

            color = severity_colors.get(severity_name, "#6c757d")
            for finding in findings:
                html += f"""    <div class="finding" style="border-left-color: {color}">
        <span class="severity" style="background: {color}">{severity_name}</span>
        <strong>{finding.title}</strong>
        <p>{finding.description}</p>
"""
                if finding.remediation:
                    html += f'        <p class="fix">Fix: {finding.remediation}</p>\n'
                if finding.reference:
                    html += f'        <p>Reference: <a href="{finding.reference}">{finding.reference}</a></p>\n'
                html += "    </div>\n"

        html += f"""
    <hr>
    <p style="color: #6c757d; font-size: 0.85em;">
        Generated by Vulnerability Scanner on {datetime.now().isoformat()}
    </p>
</body>
</html>"""

        return html

11. Scan Controller (Orchestrator)

11.1 Main Controller

"""
scanner/controller.py - Scan orchestration controller.
"""

import time
from typing import Optional

from scanner.utils import ScanTarget, ScanResult, RateLimiter, COMMON_PORTS
from scanner.port_scanner import PortScanner
from scanner.banner_grabber import BannerGrabber
from scanner.http_checker import HTTPChecker
from scanner.ssl_checker import SSLChecker
from scanner.dir_scanner import DirectoryScanner
from scanner.cve_lookup import CVELookup
from scanner.report import ReportGenerator


class ScanController:
    """
    Orchestrates all scan modules and generates reports.
    """

    def __init__(
        self,
        rate_limit: float = 50.0,
        timeout: float = 3.0,
        max_workers: int = 50,
    ):
        """
        Args:
            rate_limit: Maximum requests per second.
            timeout: Default timeout for connections.
            max_workers: Maximum concurrent threads for port scanning.
        """
        self.rate_limiter = RateLimiter(rate_limit)
        self.timeout = timeout
        self.max_workers = max_workers
        self.report_generator = ReportGenerator()

    def scan(
        self,
        host: str,
        ports: Optional[list[int]] = None,
        scan_http: bool = True,
        scan_ssl: bool = True,
        scan_dirs: bool = False,
        lookup_cves: bool = True,
        wordlist: Optional[str] = None,
    ) -> ScanResult:
        """
        Perform a comprehensive vulnerability scan.

        Args:
            host: Target hostname or IP.
            ports: Specific ports to scan (default: common ports).
            scan_http: Whether to check HTTP security headers.
            scan_ssl: Whether to check SSL/TLS configuration.
            scan_dirs: Whether to perform directory scanning.
            lookup_cves: Whether to look up CVEs for found services.
            wordlist: Path to custom wordlist for directory scanning.

        Returns:
            Complete scan result.
        """
        # Initialize target
        target = ScanTarget(host=host)
        if not target.resolve():
            print(f"[!] Cannot resolve hostname: {host}")
            result = ScanResult(target=target)
            result.errors.append(f"DNS resolution failed for {host}")
            return result

        print(f"\n{'='*60}")
        print(f"  VULNERABILITY SCAN")
        print(f"  Target: {host} ({target.ip})")
        print(f"  Rate Limit: {self.rate_limiter.rate} req/s")
        print(f"{'='*60}\n")

        result = ScanResult(target=target)
        start_time = time.time()

        # Phase 1: Port Scanning
        print("\n[Phase 1/5] Port Scanning")
        print("-" * 40)
        port_scanner = PortScanner(
            timeout=self.timeout,
            rate_limiter=self.rate_limiter,
            max_workers=self.max_workers,
        )
        result.open_ports = port_scanner.scan_ports(host, ports or COMMON_PORTS)
        result.findings.extend(
            port_scanner.generate_findings(result.open_ports)
        )

        if not result.open_ports:
            print("[!] No open ports found. Scan complete.")
            result.end_time = time.strftime("%Y-%m-%dT%H:%M:%S")
            result.duration_seconds = time.time() - start_time
            return result

        # Phase 2: Banner Grabbing
        print("\n[Phase 2/5] Banner Grabbing")
        print("-" * 40)
        banner_grabber = BannerGrabber(
            timeout=self.timeout,
            rate_limiter=self.rate_limiter,
        )
        banner_grabber.grab_all_banners(host, result.open_ports)
        result.findings.extend(
            banner_grabber.generate_findings(result.open_ports)
        )

        # Phase 3: HTTP Security Headers
        if scan_http:
            print("\n[Phase 3/5] HTTP Security Headers")
            print("-" * 40)
            http_checker = HTTPChecker(
                timeout=self.timeout,
                rate_limiter=self.rate_limiter,
            )

            # Check HTTP and HTTPS
            http_ports = [p for p in result.open_ports
                          if p.port in (80, 8080, 8000)]
            https_ports = [p for p in result.open_ports
                           if p.port in (443, 8443)]

            for port_info in http_ports:
                url = f"http://{host}:{port_info.port}" if port_info.port != 80 \
                      else f"http://{host}"
                result.findings.extend(http_checker.check(url))

            for port_info in https_ports:
                url = f"https://{host}:{port_info.port}" if port_info.port != 443 \
                      else f"https://{host}"
                result.findings.extend(http_checker.check(url))
        else:
            print("\n[Phase 3/5] HTTP Security Headers (skipped)")

        # Phase 4: SSL/TLS Check
        if scan_ssl:
            print("\n[Phase 4/5] SSL/TLS Configuration")
            print("-" * 40)
            ssl_checker = SSLChecker(
                timeout=self.timeout,
                rate_limiter=self.rate_limiter,
            )

            ssl_ports = [p for p in result.open_ports
                         if p.port in (443, 8443, 465, 993, 995)]
            for port_info in ssl_ports:
                result.findings.extend(
                    ssl_checker.check(host, port_info.port)
                )
        else:
            print("\n[Phase 4/5] SSL/TLS Configuration (skipped)")

        # Phase 5: Directory Scanning (optional)
        if scan_dirs:
            print("\n[Phase 5/5] Directory Discovery")
            print("-" * 40)
            dir_scanner = DirectoryScanner(
                timeout=self.timeout,
                rate_limiter=self.rate_limiter,
                wordlist_file=wordlist,
            )

            web_ports = [p for p in result.open_ports
                         if p.port in (80, 443, 8080, 8443)]
            for port_info in web_ports:
                scheme = "https" if port_info.port in (443, 8443) else "http"
                port_suffix = f":{port_info.port}" \
                    if port_info.port not in (80, 443) else ""
                url = f"{scheme}://{host}{port_suffix}"
                result.findings.extend(dir_scanner.scan(url))
        else:
            print("\n[Phase 5/5] Directory Discovery (skipped)")

        # CVE Lookup
        if lookup_cves:
            print("\n[Bonus] CVE Database Lookup")
            print("-" * 40)
            cve_lookup = CVELookup(use_online=False)
            result.findings.extend(cve_lookup.lookup(result.open_ports))

        # Finalize
        result.end_time = time.strftime("%Y-%m-%dT%H:%M:%S")
        result.duration_seconds = time.time() - start_time

        # Sort findings by severity
        severity_order = {
            "CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4
        }
        result.findings.sort(
            key=lambda f: severity_order.get(f.severity.value, 5)
        )

        return result

12. CLI Entry Point

12.1 Main Script

"""
main.py - CLI entry point for the vulnerability scanner.

Usage:
    python main.py example.com
    python main.py example.com --ports 80,443,8080
    python main.py example.com --full --output report.json
    python main.py example.com --dirs --wordlist wordlists/common_paths.txt
"""

import argparse
import sys

from scanner.controller import ScanController
from scanner.report import ReportGenerator
from scanner.utils import is_valid_target, COMMON_PORTS


def parse_args() -> argparse.Namespace:
    """Parse command-line arguments."""
    parser = argparse.ArgumentParser(
        description="Vulnerability Scanner - Authorized security scanning tool",
        epilog=(
            "IMPORTANT: Only scan targets you own or have explicit written "
            "permission to test. Unauthorized scanning is illegal."
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )

    # Target
    parser.add_argument(
        "target",
        help="Target hostname or IP address to scan",
    )

    # Port options
    port_group = parser.add_mutually_exclusive_group()
    port_group.add_argument(
        "-p", "--ports",
        help="Comma-separated list of ports to scan (e.g., 80,443,8080)",
    )
    port_group.add_argument(
        "--top-ports",
        type=int,
        default=None,
        help="Scan top N most common ports (default: all common ports)",
    )
    port_group.add_argument(
        "--all-ports",
        action="store_true",
        help="Scan all ports (1-65535). VERY SLOW.",
    )

    # Scan options
    parser.add_argument(
        "--full",
        action="store_true",
        help="Full scan: ports + HTTP + SSL + dirs + CVEs",
    )
    parser.add_argument(
        "--no-http",
        action="store_true",
        help="Skip HTTP security header check",
    )
    parser.add_argument(
        "--no-ssl",
        action="store_true",
        help="Skip SSL/TLS configuration check",
    )
    parser.add_argument(
        "--dirs",
        action="store_true",
        help="Enable directory/path discovery",
    )
    parser.add_argument(
        "--no-cve",
        action="store_true",
        help="Skip CVE database lookup",
    )
    parser.add_argument(
        "--wordlist",
        default=None,
        help="Custom wordlist file for directory scanning",
    )

    # Performance options
    parser.add_argument(
        "--rate",
        type=float,
        default=50.0,
        help="Maximum requests per second (default: 50)",
    )
    parser.add_argument(
        "--timeout",
        type=float,
        default=3.0,
        help="Connection timeout in seconds (default: 3)",
    )
    parser.add_argument(
        "--threads",
        type=int,
        default=50,
        help="Maximum concurrent threads for port scanning (default: 50)",
    )

    # Output options
    parser.add_argument(
        "-o", "--output",
        help="Output file path (format detected from extension: .json, .html, .txt)",
    )
    parser.add_argument(
        "-f", "--format",
        choices=["text", "json", "html"],
        default="text",
        help="Output format (default: text)",
    )
    parser.add_argument(
        "-q", "--quiet",
        action="store_true",
        help="Suppress progress output (only show results)",
    )

    # Confirmation
    parser.add_argument(
        "--confirm",
        action="store_true",
        help="Skip authorization confirmation prompt",
    )

    return parser.parse_args()


def parse_ports(ports_str: str) -> list[int]:
    """Parse a comma-separated port list, supporting ranges."""
    ports = set()
    for part in ports_str.split(","):
        part = part.strip()
        if "-" in part:
            start, end = part.split("-", 1)
            ports.update(range(int(start), int(end) + 1))
        else:
            ports.add(int(part))
    return sorted(ports)


def main():
    args = parse_args()

    # Authorization confirmation
    if not args.confirm:
        print("=" * 60)
        print("  VULNERABILITY SCANNER")
        print("=" * 60)
        print(f"\n  Target: {args.target}")
        print("\n  WARNING: Unauthorized scanning is ILLEGAL.")
        print("  Only scan targets you own or have written permission")
        print("  to test.\n")

        try:
            response = input("  Do you have authorization to scan this target? [y/N]: ")
            if response.lower() not in ('y', 'yes'):
                print("\n  Scan cancelled. Get proper authorization first.")
                sys.exit(0)
        except KeyboardInterrupt:
            print("\n\n  Scan cancelled.")
            sys.exit(0)

    # Validate target
    if not is_valid_target(args.target):
        print(f"[!] Invalid or potentially dangerous target: {args.target}")
        sys.exit(1)

    # Parse ports
    if args.ports:
        ports = parse_ports(args.ports)
    elif args.all_ports:
        ports = list(range(1, 65536))
    elif args.top_ports:
        ports = COMMON_PORTS[:args.top_ports]
    else:
        ports = COMMON_PORTS

    # Determine scan options
    scan_http = not args.no_http
    scan_ssl = not args.no_ssl
    scan_dirs = args.dirs or args.full
    lookup_cves = not args.no_cve

    if args.full:
        scan_http = True
        scan_ssl = True
        scan_dirs = True
        lookup_cves = True

    # Run scan
    controller = ScanController(
        rate_limit=args.rate,
        timeout=args.timeout,
        max_workers=args.threads,
    )

    try:
        result = controller.scan(
            host=args.target,
            ports=ports,
            scan_http=scan_http,
            scan_ssl=scan_ssl,
            scan_dirs=scan_dirs,
            lookup_cves=lookup_cves,
            wordlist=args.wordlist,
        )
    except KeyboardInterrupt:
        print("\n\n[!] Scan interrupted by user.")
        sys.exit(130)

    # Generate report
    report_gen = ReportGenerator()

    # Determine output format
    output_format = args.format
    if args.output:
        if args.output.endswith('.json'):
            output_format = 'json'
        elif args.output.endswith('.html'):
            output_format = 'html'
        elif args.output.endswith('.txt'):
            output_format = 'text'

    if output_format == 'json':
        report = report_gen.generate_json(result)
    elif output_format == 'html':
        report = report_gen.generate_html(result)
    else:
        report = report_gen.generate_text(result)

    # Output
    if args.output:
        with open(args.output, 'w') as f:
            f.write(report)
        print(f"\n[*] Report saved to: {args.output}")
    else:
        print("\n" + report)

    # Exit code based on findings
    critical = sum(1 for f in result.findings if f.severity.value == "CRITICAL")
    high = sum(1 for f in result.findings if f.severity.value == "HIGH")

    if critical > 0:
        sys.exit(2)
    elif high > 0:
        sys.exit(1)
    else:
        sys.exit(0)


if __name__ == "__main__":
    main()

13. Usage Examples

13.1 Basic Scan

# Scan common ports on a target
python main.py example.com

# Scan specific ports
python main.py example.com -p 80,443,8080,3306

# Scan top 20 ports only
python main.py example.com --top-ports 20

13.2 Full Scan

# Full scan with all modules
python main.py example.com --full

# Full scan with custom rate limit and output
python main.py example.com --full --rate 20 -o report.html

# Full scan with custom wordlist
python main.py example.com --full --wordlist wordlists/large.txt

13.3 Targeted Scans

# HTTP headers only (web server ports)
python main.py example.com -p 80,443 --no-ssl --no-cve

# SSL/TLS only
python main.py example.com -p 443 --no-http --no-cve

# Directory scan only
python main.py example.com -p 80,443 --dirs --no-ssl --no-cve

13.4 Output Formats

# JSON output (for automation)
python main.py example.com -o scan_results.json

# HTML report (for sharing)
python main.py example.com --full -o report.html

# Text report to file
python main.py example.com -o report.txt

# Quiet mode with JSON to stdout
python main.py example.com -q -f json --confirm

14. Exercises

Exercise 1: Run the Scanner

Set up and run the scanner against a locally hosted test application: 1. Install DVWA or WebGoat in a Docker container 2. Run the scanner with --full against localhost 3. Analyze the report and categorize all findings 4. Determine which findings are true vs. false positives

Exercise 2: Add UDP Scanning

Extend the port scanner to support basic UDP scanning: 1. Implement UDP port scanning using SOCK_DGRAM 2. Handle the differences (no handshake, less reliable) 3. Add support for common UDP services (DNS/53, SNMP/161, NTP/123) 4. Update the CLI to accept --udp flag

Exercise 3: Implement a Web Crawler

Add a web crawler module that: 1. Starts from the base URL and discovers linked pages 2. Extracts all forms and their parameters 3. Identifies input fields that could be attack surfaces 4. Respects robots.txt 5. Limits crawl depth and follows the rate limiter

Exercise 4: Add Authentication Support

Extend the HTTP checker to support authenticated scanning: 1. Accept login credentials via CLI 2. Perform authentication (form-based or cookie-based) 3. Scan authenticated pages that require login 4. Support Bearer token authentication for APIs

Exercise 5: Custom CVE Database

Build a more comprehensive CVE lookup system: 1. Download CVE data from NVD (NIST) 2. Store in a local SQLite database 3. Implement version range matching (not just exact match) 4. Add CVSS score parsing and severity mapping 5. Support automatic database updates

Exercise 6: Parallel Module Execution

Optimize the scanner for speed: 1. Run independent modules in parallel (e.g., HTTP check and SSL check) 2. Implement a progress bar showing overall completion 3. Add support for scanning multiple targets from a file 4. Benchmark and compare sequential vs. parallel execution times


Summary

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚          Vulnerability Scanner Key Takeaways                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                   β”‚
β”‚  1. Ethics first: ALWAYS get authorization before scanning      β”‚
β”‚  2. Rate limiting: Responsible scanning respects target          β”‚
β”‚     resources and network capacity                               β”‚
β”‚  3. Modular design: Each scan type is an independent module     β”‚
β”‚     that can be run separately or combined                       β”‚
β”‚  4. Defense in depth: Port scanning + banner grabbing + HTTP    β”‚
β”‚     headers + SSL + directory scanning + CVE lookup gives       β”‚
β”‚     comprehensive coverage                                       β”‚
β”‚  5. False positives: Scanners produce noise; human analysis     β”‚
β”‚     is needed to verify findings                                 β”‚
β”‚  6. Version detection: Accurate service/version identification  β”‚
β”‚     is critical for CVE matching                                 β”‚
β”‚  7. Reporting: Clear, actionable reports with severity ratings  β”‚
β”‚     and remediation advice are as important as detection         β”‚
β”‚  8. Automation: CLI interface with structured output enables    β”‚
β”‚     integration into CI/CD and regular security assessments     β”‚
β”‚  9. Continuous: Security scanning should be regular, not        β”‚
β”‚     one-time -- integrate into your development workflow         β”‚
β”‚ 10. Limitations: Automated scanners find common issues but      β”‚
β”‚     cannot replace manual penetration testing for logic flaws   β”‚
β”‚                                                                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Previous: 15. Project: Building a Secure REST API

to navigate between lessons