프로젝트: 취약점 스캐너 구축

프로젝트: 취약점 스캐너 구축

이전: 15. 프로젝트: 보안 REST API 구축


이 프로젝트 레슨에서는 Python으로 포괄적인 취약점 스캐너를 구축하는 과정을 살펴봅니다. 스캐너는 포트 스캔, 서비스 배너 수집, HTTP 보안 헤더 검사, SSL/TLS 구성 분석, 디렉터리 브루트포싱, CVE 데이터베이스 조회를 수행합니다. CLI 인터페이스, 구조화된 출력, 책임 있는 스캔을 보장하는 속도 제한 기능을 갖춘 모듈식의 확장 가능한 도구를 구축합니다. 이 프로젝트는 Security 토픽 전체의 여러 개념을 통합합니다.

중요: 귀하가 소유하거나 명시적인 서면 허가를 받은 시스템만 스캔하십시오. 대부분의 관할권에서 무단 스캔은 불법입니다. 이 도구는 교육 목적 및 승인된 보안 평가용으로만 사용됩니다.

학습 목표

  • Python으로 모듈식 취약점 스캐너 구축
  • 서비스 감지를 포함한 TCP 포트 스캔 구현
  • HTTP 보안 헤더 및 SSL/TLS 구성 분석
  • 디렉터리 및 경로 탐색 수행(교육 목적)
  • CVE 데이터베이스에서 알려진 취약점 조회
  • 구조화된 스캔 리포트 생성
  • 속도 제한 및 책임 있는 스캔 관행 적용
  • 보안 스캔의 윤리적 및 법적 경계 이해
  • argparse로 CLI 도구 설계

1. 프로젝트 개요

1.1 스캐너 아키텍처

┌─────────────────────────────────────────────────────────────────┐
│                 Vulnerability Scanner Architecture                │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  CLI Interface (argparse)                                        │
│       │                                                          │
│       ▼                                                          │
│  ┌──────────────────┐                                           │
│  │  Scan Controller  │  Orchestrates all scan modules           │
│  │  (main.py)        │  Manages rate limiting                   │
│  └────────┬─────────┘  Generates reports                       │
│           │                                                      │
│     ┌─────┼─────────────────────────────┐                       │
│     │     │         │         │          │                       │
│     ▼     ▼         ▼         ▼          ▼                       │
│  ┌─────┐┌──────┐┌────────┐┌──────┐┌──────────┐                │
│  │Port ││Banner││  HTTP  ││ SSL  ││Directory │                │
│  │Scan ││Grab  ││Headers ││/TLS  ││Discovery │                │
│  │     ││      ││Check   ││Check ││          │                │
│  └──┬──┘└──┬───┘└───┬────┘└──┬───┘└────┬─────┘                │
│     │      │        │        │         │                        │
│     └──────┴────────┴────────┴─────────┘                        │
│                      │                                           │
│                      ▼                                           │
│             ┌────────────────┐                                   │
│             │  CVE Lookup    │  Match services to known CVEs    │
│             └────────┬───────┘                                   │
│                      │                                           │
│                      ▼                                           │
│             ┌────────────────┐                                   │
│             │ Report Engine  │  JSON, text, HTML output         │
│             └────────────────┘                                   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

1.2 프로젝트 구조

vuln_scanner/
├── scanner/
│   ├── __init__.py
│   ├── controller.py       # Scan orchestration
│   ├── port_scanner.py     # TCP port scanning
│   ├── banner_grabber.py   # Service banner detection
│   ├── http_checker.py     # HTTP security header analysis
│   ├── ssl_checker.py      # SSL/TLS configuration analysis
│   ├── dir_scanner.py      # Directory/path discovery
│   ├── cve_lookup.py       # CVE database queries
│   ├── report.py           # Report generation
│   └── utils.py            # Shared utilities, rate limiter
├── wordlists/
│   └── common_paths.txt    # Directory wordlist (small, for demo)
├── main.py                 # CLI entry point
├── requirements.txt
└── README.md

1.3 의존성

# requirements.txt
requests==2.31.0
cryptography==42.0.4

2. 윤리적 고려사항 및 법적 경계

2.1 교전 규칙

┌──────────────────────────────────────────────────────────────────┐
          ETHICAL AND LEGAL BOUNDARIES                              
├──────────────────────────────────────────────────────────────────┤
                                                                   
  YOU MUST:                                                        
   Only scan systems you OWN or have WRITTEN permission to test 
   Get explicit, documented authorization before scanning        
   Define scope clearly (which IPs, ports, tests)               
   Respect rate limits and bandwidth constraints                 
   Report findings responsibly to the system owner              
   Stop immediately if you discover you are scanning the        
    wrong target                                                   
   Keep all findings confidential                                
                                                                   
  YOU MUST NOT:                                                    
   Scan systems without authorization                            
   Exploit discovered vulnerabilities (scan, don't attack)      │
   Perform denial of service (even accidentally via scan volume)
   Access, modify, or exfiltrate data                           
   Share findings publicly without owner's consent              │
   Use this tool for competitive intelligence or harassment     
                                                                   
  LEGAL NOTES:                                                     
  - Computer Fraud and Abuse Act (CFAA) - USA                    
  - Computer Misuse Act 1990 - UK                                 
  - StGB §202a-c - Germany                                        
  - Similar laws exist in virtually every country                 
  - Unauthorized scanning can result in criminal charges          
  - "I was just testing" is NOT a legal defense                   
                                                                   
  SAFE TARGETS FOR PRACTICE:                                       
  - Your own machines / VMs                                        
  - Intentionally vulnerable apps:                                
    * DVWA (Damn Vulnerable Web Application)                      
    * HackTheBox, TryHackMe (with permission)                    
    * OWASP WebGoat                                               
    * Metasploitable                                              
  - Bug bounty programs (within scope)                            
                                                                   
└──────────────────────────────────────────────────────────────────┘

3. 공유 유틸리티 및 속도 제한기

3.1 핵심 유틸리티

"""
scanner/utils.py - Shared utilities, rate limiter, and data models.
"""

import time
import socket
import threading
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional


class Severity(str, Enum):
    """Vulnerability severity levels."""
    CRITICAL = "CRITICAL"
    HIGH = "HIGH"
    MEDIUM = "MEDIUM"
    LOW = "LOW"
    INFO = "INFO"


@dataclass
class Finding:
    """A single vulnerability or information finding."""
    title: str
    severity: Severity
    description: str
    module: str            # Which scanner module found this
    details: dict = field(default_factory=dict)
    remediation: str = ""
    reference: str = ""    # URL or CVE reference

    def __str__(self):
        return f"[{self.severity.value}] {self.title}: {self.description}"


@dataclass
class PortInfo:
    """Information about a discovered port."""
    port: int
    state: str             # open, closed, filtered
    service: str = ""      # Detected service name
    version: str = ""      # Service version
    banner: str = ""       # Raw banner text
    protocol: str = "tcp"


@dataclass
class ScanTarget:
    """Scan target specification."""
    host: str
    ip: Optional[str] = None
    ports: list[int] = field(default_factory=list)

    def resolve(self) -> bool:
        """Resolve hostname to IP address."""
        try:
            self.ip = socket.gethostbyname(self.host)
            return True
        except socket.gaierror:
            return False


@dataclass
class ScanResult:
    """Complete scan result for a target."""
    target: ScanTarget
    start_time: str = ""
    end_time: str = ""
    duration_seconds: float = 0.0
    open_ports: list[PortInfo] = field(default_factory=list)
    findings: list[Finding] = field(default_factory=list)
    errors: list[str] = field(default_factory=list)

    def __post_init__(self):
        if not self.start_time:
            self.start_time = datetime.now().isoformat()


class RateLimiter:
    """
    Thread-safe rate limiter using token bucket algorithm.
    Ensures we don't overwhelm the target with requests.
    """

    def __init__(self, requests_per_second: float = 10.0):
        """
        Args:
            requests_per_second: Maximum requests per second.
        """
        self.rate = requests_per_second
        self.tokens = requests_per_second
        self.max_tokens = requests_per_second
        self.last_time = time.monotonic()
        self.lock = threading.Lock()

    def acquire(self) -> None:
        """Block until a token is available."""
        while True:
            with self.lock:
                now = time.monotonic()
                elapsed = now - self.last_time
                self.last_time = now

                # Add tokens based on elapsed time
                self.tokens = min(
                    self.max_tokens,
                    self.tokens + elapsed * self.rate
                )

                if self.tokens >= 1.0:
                    self.tokens -= 1.0
                    return

            # No tokens available, wait a bit
            time.sleep(1.0 / self.rate)

    def set_rate(self, requests_per_second: float) -> None:
        """Update the rate limit."""
        with self.lock:
            self.rate = requests_per_second
            self.max_tokens = requests_per_second


def is_valid_target(host: str) -> bool:
    """
    Validate that a target is potentially valid and not a
    reserved/dangerous address.
    """
    # Block scanning of localhost (unless explicitly intended)
    dangerous_hosts = {
        'localhost', '127.0.0.1', '::1',
        '0.0.0.0',
    }
    if host.lower() in dangerous_hosts:
        return False

    # Block private IP ranges (comment out for internal scanning)
    # This is a safety measure for educational use
    try:
        ip = socket.gethostbyname(host)
        parts = ip.split('.')
        if len(parts) == 4:
            first_octet = int(parts[0])
            second_octet = int(parts[1])

            # 10.0.0.0/8
            if first_octet == 10:
                print(f"    [!] Warning: {host} ({ip}) is a private address")

            # 172.16.0.0/12
            if first_octet == 172 and 16 <= second_octet <= 31:
                print(f"    [!] Warning: {host} ({ip}) is a private address")

            # 192.168.0.0/16
            if first_octet == 192 and second_octet == 168:
                print(f"    [!] Warning: {host} ({ip}) is a private address")

    except socket.gaierror:
        pass

    return True


# Common port list (top 100 most common)
COMMON_PORTS = [
    20, 21, 22, 23, 25, 53, 67, 68, 69, 80,
    110, 111, 119, 123, 135, 137, 138, 139, 143,
    161, 162, 179, 194, 389, 443, 445, 465, 514,
    515, 520, 521, 587, 631, 636, 873, 993, 995,
    1080, 1194, 1433, 1434, 1521, 1723, 2049, 2082,
    2083, 2086, 2087, 2096, 2100, 3128, 3306, 3389,
    5060, 5432, 5900, 5901, 6379, 6667, 8000, 8008,
    8080, 8443, 8888, 9090, 9200, 9300, 10000, 11211,
    27017, 27018, 28017, 50000,
]

# Well-known service names
SERVICE_NAMES = {
    20: "ftp-data", 21: "ftp", 22: "ssh", 23: "telnet",
    25: "smtp", 53: "dns", 80: "http", 110: "pop3",
    111: "rpcbind", 119: "nntp", 123: "ntp", 135: "msrpc",
    139: "netbios-ssn", 143: "imap", 161: "snmp", 389: "ldap",
    443: "https", 445: "microsoft-ds", 465: "smtps", 514: "syslog",
    587: "submission", 636: "ldaps", 993: "imaps", 995: "pop3s",
    1433: "mssql", 1521: "oracle", 3306: "mysql", 3389: "rdp",
    5432: "postgresql", 5900: "vnc", 6379: "redis",
    8080: "http-proxy", 8443: "https-alt", 9200: "elasticsearch",
    11211: "memcached", 27017: "mongodb",
}

4. 포트 스캐너 모듈

4.1 TCP 포트 스캐너

"""
scanner/port_scanner.py - TCP port scanner module.
"""

import socket
import concurrent.futures
from typing import Optional

from scanner.utils import (
    PortInfo, RateLimiter, ScanTarget, Finding, Severity,
    COMMON_PORTS, SERVICE_NAMES,
)


class PortScanner:
    """
    TCP Connect port scanner.

    This performs a full TCP three-way handshake to determine
    if a port is open. It is reliable but easily detectable.

    For educational purposes only. SYN scanning (half-open)
    requires raw socket privileges and is not implemented here.
    """

    def __init__(
        self,
        timeout: float = 2.0,
        rate_limiter: Optional[RateLimiter] = None,
        max_workers: int = 50,
    ):
        """
        Args:
            timeout: Connection timeout in seconds per port.
            rate_limiter: Rate limiter for responsible scanning.
            max_workers: Maximum concurrent connection attempts.
        """
        self.timeout = timeout
        self.rate_limiter = rate_limiter or RateLimiter(100.0)
        self.max_workers = max_workers

    def scan_port(self, host: str, port: int) -> PortInfo:
        """
        Scan a single port on the target host.

        Performs a TCP connect scan (full three-way handshake).

        Args:
            host: Target hostname or IP.
            port: Port number to scan.

        Returns:
            PortInfo with the scan result.
        """
        self.rate_limiter.acquire()

        port_info = PortInfo(
            port=port,
            state="closed",
            service=SERVICE_NAMES.get(port, "unknown"),
        )

        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            result = sock.connect_ex((host, port))

            if result == 0:
                port_info.state = "open"

                # Try to get service banner
                try:
                    # Send a generic probe for some services
                    if port in (80, 8080, 8443):
                        sock.sendall(
                            f"HEAD / HTTP/1.1\r\nHost: {host}\r\n\r\n".encode()
                        )
                    elif port == 22:
                        pass  # SSH sends banner automatically
                    elif port in (21, 25, 110, 143):
                        pass  # These services send banners

                    sock.settimeout(1.5)
                    banner = sock.recv(1024)
                    port_info.banner = banner.decode('utf-8', errors='replace').strip()
                    port_info.version = self._extract_version(port_info.banner)
                except (socket.timeout, ConnectionResetError, OSError):
                    pass

            sock.close()

        except socket.timeout:
            port_info.state = "filtered"
        except ConnectionRefusedError:
            port_info.state = "closed"
        except OSError as e:
            port_info.state = "error"

        return port_info

    def scan_ports(
        self,
        host: str,
        ports: Optional[list[int]] = None,
    ) -> list[PortInfo]:
        """
        Scan multiple ports concurrently.

        Args:
            host: Target hostname or IP.
            ports: List of ports to scan. Defaults to common ports.

        Returns:
            List of PortInfo for open ports.
        """
        if ports is None:
            ports = COMMON_PORTS

        print(f"[*] Scanning {len(ports)} ports on {host}...")

        open_ports = []

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.max_workers
        ) as executor:
            # Submit all port scan tasks
            future_to_port = {
                executor.submit(self.scan_port, host, port): port
                for port in ports
            }

            # Collect results
            completed = 0
            for future in concurrent.futures.as_completed(future_to_port):
                completed += 1
                if completed % 100 == 0:
                    print(f"    Progress: {completed}/{len(ports)} ports scanned")

                try:
                    port_info = future.result()
                    if port_info.state == "open":
                        open_ports.append(port_info)
                        print(f"    [+] Port {port_info.port:5d}/tcp  OPEN  "
                              f"{port_info.service}  {port_info.version}")
                except Exception as e:
                    port = future_to_port[future]
                    print(f"    [!] Error scanning port {port}: {e}")

        open_ports.sort(key=lambda p: p.port)
        print(f"[*] Found {len(open_ports)} open ports")

        return open_ports

    def _extract_version(self, banner: str) -> str:
        """Extract service version from banner string."""
        if not banner:
            return ""

        # Common patterns
        # SSH: SSH-2.0-OpenSSH_8.9
        if banner.startswith("SSH-"):
            parts = banner.split("-")
            if len(parts) >= 3:
                return parts[2].split()[0]

        # HTTP: Server: nginx/1.24.0
        if "Server:" in banner:
            for line in banner.split("\r\n"):
                if line.startswith("Server:"):
                    return line.split(":", 1)[1].strip()

        # FTP: 220 vsftpd 3.0.5
        if banner.startswith("220"):
            parts = banner.split()
            if len(parts) >= 3:
                return " ".join(parts[1:3])

        # SMTP: 220 mail.example.com ESMTP Postfix
        if "ESMTP" in banner or "SMTP" in banner:
            return banner.split("\r\n")[0][:60]

        return banner[:60]

    def generate_findings(self, open_ports: list[PortInfo]) -> list[Finding]:
        """Generate security findings from open ports."""
        findings = []

        # Dangerous/insecure services
        dangerous_ports = {
            21: ("FTP", "FTP transmits credentials in plaintext. Use SFTP instead."),
            23: ("Telnet", "Telnet transmits all data in plaintext. Use SSH instead."),
            69: ("TFTP", "TFTP has no authentication. Restrict access."),
            111: ("RPC", "RPCbind can leak service information. Restrict to trusted networks."),
            135: ("MSRPC", "Microsoft RPC can be exploited for remote code execution."),
            139: ("NetBIOS", "NetBIOS can leak system information. Block from external access."),
            445: ("SMB", "SMB has been target of major exploits (EternalBlue). Restrict access."),
            161: ("SNMP", "SNMP v1/v2c use community strings (plaintext). Use v3 with auth."),
            1433: ("MSSQL", "Database port should not be publicly accessible."),
            3306: ("MySQL", "Database port should not be publicly accessible."),
            5432: ("PostgreSQL", "Database port should not be publicly accessible."),
            6379: ("Redis", "Redis often has no authentication. Never expose publicly."),
            11211: ("Memcached", "Memcached has no built-in auth. Used in amplification attacks."),
            27017: ("MongoDB", "MongoDB should not be publicly accessible."),
            5900: ("VNC", "VNC may use weak authentication. Use VPN for remote access."),
        }

        for port_info in open_ports:
            if port_info.port in dangerous_ports:
                service_name, description = dangerous_ports[port_info.port]
                findings.append(Finding(
                    title=f"Potentially Dangerous Service: {service_name}",
                    severity=Severity.HIGH if port_info.port in (6379, 11211, 27017)
                             else Severity.MEDIUM,
                    description=description,
                    module="port_scanner",
                    details={
                        "port": port_info.port,
                        "service": port_info.service,
                        "banner": port_info.banner[:200],
                    },
                    remediation=(
                        f"Restrict port {port_info.port} to trusted networks using "
                        f"firewall rules. Consider disabling if not needed."
                    ),
                ))

        # Check for unencrypted HTTP (when HTTPS is also available)
        has_http = any(p.port == 80 for p in open_ports)
        has_https = any(p.port == 443 for p in open_ports)

        if has_http and not has_https:
            findings.append(Finding(
                title="HTTP Without HTTPS",
                severity=Severity.HIGH,
                description="HTTP (port 80) is open but HTTPS (port 443) is not detected.",
                module="port_scanner",
                remediation="Enable HTTPS and redirect HTTP to HTTPS.",
            ))
        elif has_http and has_https:
            findings.append(Finding(
                title="HTTP Port Open (Redirect Needed)",
                severity=Severity.LOW,
                description="HTTP (port 80) is open alongside HTTPS. Ensure HTTP redirects to HTTPS.",
                module="port_scanner",
                remediation="Configure HTTP to HTTPS 301 redirect.",
            ))

        return findings

5. 배너 수집기 모듈

5.1 서비스 배너 감지

"""
scanner/banner_grabber.py - Service banner grabbing and version detection.
"""

import socket
import ssl
from typing import Optional

from scanner.utils import PortInfo, Finding, Severity, RateLimiter


class BannerGrabber:
    """
    Grabs service banners to identify running software and versions.

    Service banners often reveal software name, version, and OS,
    which can be used to identify known vulnerabilities.
    """

    # Protocol-specific probes
    PROBES = {
        # HTTP probe
        "http": b"HEAD / HTTP/1.1\r\nHost: {host}\r\nUser-Agent: SecurityScanner/1.0\r\nAccept: */*\r\nConnection: close\r\n\r\n",
        # FTP expects no probe (server sends banner)
        "ftp": b"",
        # SMTP expects no probe
        "smtp": b"",
        # SSH expects no probe
        "ssh": b"",
        # POP3 expects no probe
        "pop3": b"",
        # IMAP expects no probe
        "imap": b"",
        # MySQL probe
        "mysql": b"",
        # Redis probe
        "redis": b"INFO\r\n",
    }

    def __init__(
        self,
        timeout: float = 3.0,
        rate_limiter: Optional[RateLimiter] = None,
    ):
        self.timeout = timeout
        self.rate_limiter = rate_limiter or RateLimiter(10.0)

    def grab_banner(self, host: str, port_info: PortInfo) -> PortInfo:
        """
        Attempt to grab a service banner from an open port.

        Updates the port_info with banner and version information.
        """
        if port_info.state != "open":
            return port_info

        if port_info.banner:
            # Already have a banner from port scan
            return port_info

        self.rate_limiter.acquire()

        # Determine the appropriate probe
        service = port_info.service.lower()
        probe = self.PROBES.get(service, b"")

        # Special handling for HTTP
        if service in ("http", "http-proxy"):
            probe = self.PROBES["http"].replace(b"{host}", host.encode())

        try:
            # Determine if we should use SSL
            use_ssl = port_info.port in (443, 465, 636, 993, 995, 8443)

            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            sock.connect((host, port_info.port))

            if use_ssl:
                context = ssl.create_default_context()
                context.check_hostname = False
                context.verify_mode = ssl.CERT_NONE
                sock = context.wrap_socket(sock, server_hostname=host)

            # Send probe if needed
            if probe:
                sock.sendall(probe)

            # Receive response
            banner = sock.recv(4096)
            port_info.banner = banner.decode('utf-8', errors='replace').strip()

            # Extract version from banner
            port_info.version = self._parse_version(
                port_info.banner, port_info.service
            )

            sock.close()

        except (socket.timeout, ConnectionRefusedError, ConnectionResetError,
                ssl.SSLError, OSError):
            pass

        return port_info

    def grab_all_banners(
        self, host: str, open_ports: list[PortInfo]
    ) -> list[PortInfo]:
        """Grab banners from all open ports."""
        print(f"[*] Grabbing banners from {len(open_ports)} open ports...")

        for port_info in open_ports:
            self.grab_banner(host, port_info)
            if port_info.banner:
                version = port_info.version or "(no version)"
                print(f"    [+] Port {port_info.port}: {version}")

        return open_ports

    def _parse_version(self, banner: str, service: str) -> str:
        """Parse version information from service banner."""
        if not banner:
            return ""

        service = service.lower()

        # SSH
        if service == "ssh" or banner.startswith("SSH-"):
            parts = banner.split("-")
            if len(parts) >= 3:
                return parts[2].split("\r")[0].split("\n")[0]

        # HTTP Server header
        if service in ("http", "https", "http-proxy", "https-alt"):
            for line in banner.split("\r\n"):
                if line.lower().startswith("server:"):
                    return line.split(":", 1)[1].strip()
            return ""

        # FTP
        if service == "ftp" and banner.startswith("220"):
            return banner.split("\r")[0].split("\n")[0][4:].strip()

        # SMTP
        if service in ("smtp", "smtps", "submission"):
            return banner.split("\r")[0].split("\n")[0].strip()

        # MySQL
        if service == "mysql":
            # MySQL banner has version in initial handshake packet
            try:
                # Look for version string pattern (e.g., 8.0.35)
                import re
                match = re.search(r'(\d+\.\d+\.\d+)', banner)
                if match:
                    return f"MySQL {match.group(1)}"
            except Exception:
                pass

        # Redis
        if service == "redis" and "redis_version:" in banner:
            for line in banner.split("\r\n"):
                if line.startswith("redis_version:"):
                    return f"Redis {line.split(':')[1]}"

        # Default: first line, truncated
        return banner.split("\r")[0].split("\n")[0][:80]

    def generate_findings(self, open_ports: list[PortInfo]) -> list[Finding]:
        """Generate findings from banner information."""
        findings = []

        for port_info in open_ports:
            if not port_info.banner:
                continue

            banner_lower = port_info.banner.lower()

            # Check for outdated/vulnerable versions
            version_checks = [
                ("apache/2.2", "Apache 2.2 is end-of-life",
                 Severity.HIGH, "Upgrade to Apache 2.4+"),
                ("apache/2.4.49", "Apache 2.4.49 has path traversal vulnerability (CVE-2021-41773)",
                 Severity.CRITICAL, "Upgrade to Apache 2.4.51+"),
                ("nginx/1.1", "Very old nginx version detected",
                 Severity.MEDIUM, "Upgrade to latest stable nginx"),
                ("openssh_7.", "OpenSSH 7.x may have known vulnerabilities",
                 Severity.MEDIUM, "Upgrade to OpenSSH 9.x+"),
                ("openssh_6.", "OpenSSH 6.x has known vulnerabilities",
                 Severity.HIGH, "Upgrade to OpenSSH 9.x+"),
                ("mysql 5.5", "MySQL 5.5 is end-of-life",
                 Severity.HIGH, "Upgrade to MySQL 8.0+"),
                ("mysql 5.6", "MySQL 5.6 is end-of-life",
                 Severity.HIGH, "Upgrade to MySQL 8.0+"),
            ]

            for pattern, description, severity, fix in version_checks:
                if pattern in banner_lower:
                    findings.append(Finding(
                        title=f"Outdated Software: {port_info.version or pattern}",
                        severity=severity,
                        description=description,
                        module="banner_grabber",
                        details={
                            "port": port_info.port,
                            "banner": port_info.banner[:200],
                        },
                        remediation=fix,
                    ))

            # Check for verbose banners (information disclosure)
            if any(keyword in banner_lower for keyword in
                   ['ubuntu', 'debian', 'centos', 'red hat', 'windows']):
                findings.append(Finding(
                    title="OS Information Disclosure",
                    severity=Severity.LOW,
                    description=(
                        f"Service banner reveals operating system information: "
                        f"{port_info.banner[:100]}"
                    ),
                    module="banner_grabber",
                    details={"port": port_info.port, "banner": port_info.banner[:200]},
                    remediation="Configure service to hide OS details from banner.",
                ))

        return findings

6. HTTP 보안 헤더 검사기

6.1 헤더 분석

"""
scanner/http_checker.py - HTTP security header analysis.
"""

import requests
from typing import Optional
from urllib.parse import urlparse

from scanner.utils import Finding, Severity, RateLimiter


class HTTPChecker:
    """
    Checks HTTP security headers and common web misconfigurations.
    """

    # Required security headers and their expected values
    SECURITY_HEADERS = {
        "Strict-Transport-Security": {
            "description": "HTTP Strict Transport Security (HSTS)",
            "severity": Severity.HIGH,
            "recommendation": "Add: Strict-Transport-Security: max-age=31536000; includeSubDomains",
            "check_value": lambda v: "max-age=" in v and int(
                v.split("max-age=")[1].split(";")[0].strip()
            ) >= 31536000 if "max-age=" in v else False,
        },
        "Content-Security-Policy": {
            "description": "Content Security Policy (CSP)",
            "severity": Severity.HIGH,
            "recommendation": "Add a Content-Security-Policy header. Start with: default-src 'self'",
            "check_value": lambda v: "default-src" in v or "script-src" in v,
        },
        "X-Content-Type-Options": {
            "description": "MIME type sniffing prevention",
            "severity": Severity.MEDIUM,
            "recommendation": "Add: X-Content-Type-Options: nosniff",
            "check_value": lambda v: v.lower() == "nosniff",
        },
        "X-Frame-Options": {
            "description": "Clickjacking protection",
            "severity": Severity.MEDIUM,
            "recommendation": "Add: X-Frame-Options: DENY (or SAMEORIGIN)",
            "check_value": lambda v: v.upper() in ("DENY", "SAMEORIGIN"),
        },
        "Referrer-Policy": {
            "description": "Referrer information control",
            "severity": Severity.LOW,
            "recommendation": "Add: Referrer-Policy: strict-origin-when-cross-origin",
            "check_value": lambda v: v in (
                "no-referrer", "strict-origin",
                "strict-origin-when-cross-origin", "same-origin"
            ),
        },
        "Permissions-Policy": {
            "description": "Browser feature restrictions",
            "severity": Severity.LOW,
            "recommendation": "Add: Permissions-Policy: camera=(), microphone=(), geolocation=()",
            "check_value": lambda v: len(v) > 0,
        },
    }

    # Headers that should NOT be present (information disclosure)
    BAD_HEADERS = {
        "X-Powered-By": {
            "description": "Reveals server technology",
            "severity": Severity.LOW,
            "recommendation": "Remove X-Powered-By header to reduce information disclosure.",
        },
        "Server": {
            "description": "May reveal server software and version",
            "severity": Severity.LOW,
            "recommendation": "Configure server to send minimal Server header.",
            "check_value": lambda v: any(
                keyword in v.lower()
                for keyword in ["apache", "nginx", "iis", "tomcat", "express"]
            ),
        },
        "X-AspNet-Version": {
            "description": "Reveals ASP.NET version",
            "severity": Severity.LOW,
            "recommendation": "Remove X-AspNet-Version header.",
        },
    }

    def __init__(
        self,
        timeout: float = 10.0,
        rate_limiter: Optional[RateLimiter] = None,
        verify_ssl: bool = True,
    ):
        self.timeout = timeout
        self.rate_limiter = rate_limiter or RateLimiter(5.0)
        self.verify_ssl = verify_ssl

    def check(self, url: str) -> list[Finding]:
        """
        Check HTTP security headers for a URL.

        Args:
            url: Target URL (http:// or https://).

        Returns:
            List of findings.
        """
        self.rate_limiter.acquire()
        findings = []

        print(f"[*] Checking HTTP security headers for {url}...")

        try:
            response = requests.get(
                url,
                timeout=self.timeout,
                verify=self.verify_ssl,
                allow_redirects=True,
                headers={
                    "User-Agent": "SecurityScanner/1.0 (Authorized Security Scan)"
                },
            )
        except requests.exceptions.SSLError as e:
            findings.append(Finding(
                title="SSL/TLS Error",
                severity=Severity.HIGH,
                description=f"SSL error connecting to {url}: {str(e)[:200]}",
                module="http_checker",
                remediation="Fix SSL/TLS configuration. Ensure valid certificate.",
            ))
            return findings
        except requests.exceptions.ConnectionError as e:
            findings.append(Finding(
                title="Connection Error",
                severity=Severity.INFO,
                description=f"Could not connect to {url}: {str(e)[:200]}",
                module="http_checker",
            ))
            return findings
        except requests.exceptions.Timeout:
            findings.append(Finding(
                title="Connection Timeout",
                severity=Severity.INFO,
                description=f"Connection to {url} timed out after {self.timeout}s",
                module="http_checker",
            ))
            return findings

        headers = response.headers

        # Check for missing security headers
        for header_name, config in self.SECURITY_HEADERS.items():
            value = headers.get(header_name)

            if not value:
                findings.append(Finding(
                    title=f"Missing Security Header: {header_name}",
                    severity=config["severity"],
                    description=f"{config['description']} header is not set.",
                    module="http_checker",
                    details={"header": header_name, "url": url},
                    remediation=config["recommendation"],
                ))
            elif config.get("check_value"):
                try:
                    if not config["check_value"](value):
                        findings.append(Finding(
                            title=f"Weak Security Header: {header_name}",
                            severity=Severity.LOW,
                            description=(
                                f"{header_name} is set but may be insufficiently configured: "
                                f"{value[:100]}"
                            ),
                            module="http_checker",
                            details={"header": header_name, "value": value, "url": url},
                            remediation=config["recommendation"],
                        ))
                except (ValueError, IndexError):
                    pass

        # Check for information disclosure headers
        for header_name, config in self.BAD_HEADERS.items():
            value = headers.get(header_name)
            if value:
                # For Server header, only flag if verbose
                if header_name == "Server" and config.get("check_value"):
                    if not config["check_value"](value):
                        continue  # Minimal server header, OK

                findings.append(Finding(
                    title=f"Information Disclosure: {header_name}",
                    severity=config["severity"],
                    description=f"{config['description']}: {value}",
                    module="http_checker",
                    details={"header": header_name, "value": value},
                    remediation=config["recommendation"],
                ))

        # Check for HTTPS redirect
        if url.startswith("http://"):
            if not response.url.startswith("https://"):
                findings.append(Finding(
                    title="No HTTP to HTTPS Redirect",
                    severity=Severity.HIGH,
                    description="HTTP requests are not redirected to HTTPS.",
                    module="http_checker",
                    remediation="Configure server to redirect all HTTP requests to HTTPS.",
                ))

        # Check cookie security
        for cookie_header in response.headers.getlist("Set-Cookie") if hasattr(
            response.headers, 'getlist'
        ) else []:
            self._check_cookie_security(cookie_header, findings, url)

        # Check for cookies in raw headers
        raw_cookies = headers.get("Set-Cookie", "")
        if raw_cookies:
            self._check_cookie_security(raw_cookies, findings, url)

        print(f"    Found {len(findings)} HTTP security issues")
        return findings

    def _check_cookie_security(self, cookie_str: str, findings: list[Finding],
                                url: str) -> None:
        """Check cookie security attributes."""
        cookie_lower = cookie_str.lower()

        if "secure" not in cookie_lower and url.startswith("https"):
            findings.append(Finding(
                title="Cookie Without Secure Flag",
                severity=Severity.MEDIUM,
                description="A cookie is set without the Secure flag over HTTPS.",
                module="http_checker",
                details={"cookie": cookie_str[:100]},
                remediation="Add Secure flag to all cookies on HTTPS sites.",
            ))

        if "httponly" not in cookie_lower:
            findings.append(Finding(
                title="Cookie Without HttpOnly Flag",
                severity=Severity.MEDIUM,
                description="A cookie is set without the HttpOnly flag (accessible by JavaScript).",
                module="http_checker",
                details={"cookie": cookie_str[:100]},
                remediation="Add HttpOnly flag to cookies not needed by client-side JavaScript.",
            ))

        if "samesite" not in cookie_lower:
            findings.append(Finding(
                title="Cookie Without SameSite Attribute",
                severity=Severity.LOW,
                description="A cookie is missing the SameSite attribute (CSRF risk).",
                module="http_checker",
                details={"cookie": cookie_str[:100]},
                remediation="Add SameSite=Lax or SameSite=Strict to cookies.",
            ))

7. SSL/TLS 구성 분석기

7.1 SSL/TLS 검사기

"""
scanner/ssl_checker.py - SSL/TLS configuration analyzer.
"""

import socket
import ssl
from datetime import datetime, timezone
from typing import Optional

from scanner.utils import Finding, Severity, RateLimiter


class SSLChecker:
    """
    Analyzes SSL/TLS configuration for security issues.

    Checks:
    - Certificate validity and expiration
    - Protocol version support
    - Cipher suite strength
    - Common misconfigurations
    """

    # Weak protocols
    WEAK_PROTOCOLS = {
        ssl.TLSVersion.SSLv3: "SSLv3 (POODLE vulnerability)",
        ssl.TLSVersion.TLSv1: "TLS 1.0 (deprecated, multiple vulnerabilities)",
        ssl.TLSVersion.TLSv1_1: "TLS 1.1 (deprecated)",
    }

    # Weak cipher keywords
    WEAK_CIPHER_KEYWORDS = [
        "RC4", "DES", "3DES", "MD5", "NULL", "EXPORT",
        "anon", "RC2", "SEED", "IDEA",
    ]

    def __init__(
        self,
        timeout: float = 10.0,
        rate_limiter: Optional[RateLimiter] = None,
    ):
        self.timeout = timeout
        self.rate_limiter = rate_limiter or RateLimiter(5.0)

    def check(self, host: str, port: int = 443) -> list[Finding]:
        """
        Check SSL/TLS configuration of a host.

        Args:
            host: Target hostname.
            port: HTTPS port (default 443).

        Returns:
            List of findings.
        """
        self.rate_limiter.acquire()
        findings = []

        print(f"[*] Checking SSL/TLS configuration for {host}:{port}...")

        # 1. Check certificate
        cert_findings = self._check_certificate(host, port)
        findings.extend(cert_findings)

        # 2. Check supported protocols
        protocol_findings = self._check_protocols(host, port)
        findings.extend(protocol_findings)

        # 3. Check cipher suites
        cipher_findings = self._check_ciphers(host, port)
        findings.extend(cipher_findings)

        print(f"    Found {len(findings)} SSL/TLS issues")
        return findings

    def _check_certificate(self, host: str, port: int) -> list[Finding]:
        """Check SSL certificate validity."""
        findings = []

        try:
            context = ssl.create_default_context()
            with socket.create_connection(
                (host, port), timeout=self.timeout
            ) as sock:
                with context.wrap_socket(sock, server_hostname=host) as ssock:
                    cert = ssock.getpeercert()

                    if not cert:
                        findings.append(Finding(
                            title="No SSL Certificate",
                            severity=Severity.CRITICAL,
                            description="No SSL certificate presented by server.",
                            module="ssl_checker",
                            remediation="Install a valid SSL certificate.",
                        ))
                        return findings

                    # Check expiration
                    not_after = datetime.strptime(
                        cert['notAfter'], '%b %d %H:%M:%S %Y %Z'
                    ).replace(tzinfo=timezone.utc)
                    now = datetime.now(timezone.utc)
                    days_until_expiry = (not_after - now).days

                    if days_until_expiry < 0:
                        findings.append(Finding(
                            title="Expired SSL Certificate",
                            severity=Severity.CRITICAL,
                            description=f"Certificate expired {abs(days_until_expiry)} days ago.",
                            module="ssl_checker",
                            details={
                                "expiry_date": cert['notAfter'],
                                "days_expired": abs(days_until_expiry),
                            },
                            remediation="Renew the SSL certificate immediately.",
                        ))
                    elif days_until_expiry < 30:
                        findings.append(Finding(
                            title="SSL Certificate Expiring Soon",
                            severity=Severity.HIGH,
                            description=f"Certificate expires in {days_until_expiry} days.",
                            module="ssl_checker",
                            details={
                                "expiry_date": cert['notAfter'],
                                "days_remaining": days_until_expiry,
                            },
                            remediation="Renew the SSL certificate before expiration.",
                        ))
                    elif days_until_expiry < 90:
                        findings.append(Finding(
                            title="SSL Certificate Expiring Within 90 Days",
                            severity=Severity.MEDIUM,
                            description=f"Certificate expires in {days_until_expiry} days.",
                            module="ssl_checker",
                            details={
                                "expiry_date": cert['notAfter'],
                                "days_remaining": days_until_expiry,
                            },
                            remediation="Plan certificate renewal.",
                        ))

                    # Check subject alternative names
                    sans = []
                    for type_name, value in cert.get('subjectAltName', []):
                        if type_name == 'DNS':
                            sans.append(value)

                    if host not in sans and f"*.{'.'.join(host.split('.')[1:])}" not in sans:
                        findings.append(Finding(
                            title="Certificate Hostname Mismatch",
                            severity=Severity.HIGH,
                            description=(
                                f"Certificate SANs {sans} do not include {host}."
                            ),
                            module="ssl_checker",
                            remediation="Reissue certificate with correct hostname.",
                        ))

                    # Check issuer (self-signed)
                    issuer = dict(x[0] for x in cert.get('issuer', []))
                    subject = dict(x[0] for x in cert.get('subject', []))
                    if issuer == subject:
                        findings.append(Finding(
                            title="Self-Signed Certificate",
                            severity=Severity.MEDIUM,
                            description="The certificate is self-signed (not trusted by browsers).",
                            module="ssl_checker",
                            details={"issuer": str(issuer)},
                            remediation="Use a certificate from a trusted CA (e.g., Let's Encrypt).",
                        ))

        except ssl.SSLCertVerificationError as e:
            findings.append(Finding(
                title="SSL Certificate Verification Failed",
                severity=Severity.HIGH,
                description=f"Certificate verification error: {str(e)[:200]}",
                module="ssl_checker",
                remediation="Fix certificate chain or install valid certificate.",
            ))
        except (socket.timeout, ConnectionRefusedError, OSError) as e:
            findings.append(Finding(
                title="SSL Connection Failed",
                severity=Severity.INFO,
                description=f"Could not establish SSL connection: {str(e)[:200]}",
                module="ssl_checker",
            ))

        return findings

    def _check_protocols(self, host: str, port: int) -> list[Finding]:
        """Check which TLS protocol versions are supported."""
        findings = []

        # Test for weak protocols
        for protocol_version, description in self.WEAK_PROTOCOLS.items():
            try:
                context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
                context.check_hostname = False
                context.verify_mode = ssl.CERT_NONE
                context.minimum_version = protocol_version
                context.maximum_version = protocol_version

                with socket.create_connection(
                    (host, port), timeout=self.timeout
                ) as sock:
                    with context.wrap_socket(sock) as ssock:
                        # If we get here, the weak protocol is supported
                        findings.append(Finding(
                            title=f"Weak TLS Protocol Supported",
                            severity=Severity.HIGH if "SSLv3" in description
                                     or "TLS 1.0" in description
                                     else Severity.MEDIUM,
                            description=f"Server supports {description}.",
                            module="ssl_checker",
                            details={"protocol": str(protocol_version)},
                            remediation=f"Disable {description}. Only allow TLS 1.2+.",
                        ))

            except (ssl.SSLError, socket.timeout, ConnectionRefusedError, OSError):
                pass  # Protocol not supported (good)

        return findings

    def _check_ciphers(self, host: str, port: int) -> list[Finding]:
        """Check for weak cipher suites."""
        findings = []

        try:
            context = ssl.create_default_context()
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE

            with socket.create_connection(
                (host, port), timeout=self.timeout
            ) as sock:
                with context.wrap_socket(sock, server_hostname=host) as ssock:
                    cipher = ssock.cipher()

                    if cipher:
                        cipher_name, protocol, bits = cipher

                        # Check for weak ciphers
                        for weak_keyword in self.WEAK_CIPHER_KEYWORDS:
                            if weak_keyword in cipher_name.upper():
                                findings.append(Finding(
                                    title=f"Weak Cipher Suite: {cipher_name}",
                                    severity=Severity.HIGH,
                                    description=(
                                        f"Server negotiated weak cipher: {cipher_name} "
                                        f"({bits} bits, {protocol})"
                                    ),
                                    module="ssl_checker",
                                    details={
                                        "cipher": cipher_name,
                                        "protocol": protocol,
                                        "bits": bits,
                                    },
                                    remediation="Disable weak cipher suites. Use ECDHE-RSA-AES256-GCM-SHA384 or similar.",
                                ))
                                break

                        # Check key length
                        if bits and bits < 128:
                            findings.append(Finding(
                                title=f"Weak Cipher Key Length: {bits} bits",
                                severity=Severity.HIGH,
                                description=f"Cipher key length is only {bits} bits.",
                                module="ssl_checker",
                                remediation="Use cipher suites with at least 128-bit keys.",
                            ))

        except (ssl.SSLError, socket.timeout, ConnectionRefusedError, OSError):
            pass

        return findings

8. 디렉토리 스캐너 모듈

8.1 경로 탐색

"""
scanner/dir_scanner.py - Directory and path discovery.

This module performs dictionary-based path discovery to find
hidden endpoints, backup files, and sensitive paths.

For educational purposes only. Never brute force paths without permission.
"""

import requests
from typing import Optional
from pathlib import Path

from scanner.utils import Finding, Severity, RateLimiter


# Built-in small wordlist for common paths
DEFAULT_WORDLIST = [
    # Configuration and environment files
    ".env", ".env.backup", ".env.production",
    ".git/config", ".git/HEAD",
    ".svn/entries",
    ".htaccess", ".htpasswd",
    "web.config",
    "robots.txt", "sitemap.xml",

    # Admin panels
    "admin", "admin/login", "administrator",
    "dashboard", "manage", "panel",
    "wp-admin", "wp-login.php",
    "phpmyadmin", "adminer.php",

    # API documentation
    "api", "api/v1", "api/docs",
    "swagger.json", "openapi.json",
    "api-docs", "graphql",

    # Backup and debug files
    "backup", "backup.zip", "backup.sql",
    "database.sql", "dump.sql",
    "debug", "debug.log",
    "error.log", "access.log",

    # Common application paths
    "server-status", "server-info",
    "info.php", "phpinfo.php",
    "test", "test.html", "test.php",
    "status", "health", "healthcheck",
    "metrics", "prometheus",

    # User-related paths
    "login", "register", "signup",
    "reset-password", "forgot-password",
    "profile", "account", "settings",

    # Static/asset paths
    "static", "assets", "uploads",
    "media", "files", "documents",
    "images", "img", "css", "js",

    # Development artifacts
    ".DS_Store",
    "composer.json", "package.json",
    "Gemfile", "requirements.txt",
    "Dockerfile", "docker-compose.yml",
]

# Paths that indicate security issues when accessible
SENSITIVE_PATHS = {
    ".env": "Environment file may contain secrets (API keys, passwords)",
    ".git/config": "Git configuration exposed - entire source code may be downloadable",
    ".git/HEAD": "Git repository exposed",
    ".svn/entries": "SVN repository exposed",
    ".htpasswd": "Apache password file exposed",
    "phpinfo.php": "PHP info page exposes server configuration details",
    "info.php": "PHP info page exposes server configuration details",
    "backup.sql": "Database backup file publicly accessible",
    "database.sql": "Database dump file publicly accessible",
    "dump.sql": "Database dump file publicly accessible",
    "backup.zip": "Backup archive publicly accessible",
    "debug.log": "Debug log may contain sensitive information",
    "server-status": "Apache server-status page exposed",
    "server-info": "Apache server-info page exposed",
    ".DS_Store": "macOS directory metadata may reveal file structure",
    "web.config": "IIS configuration file exposed",
    "composer.json": "PHP dependency file reveals technology stack",
    "package.json": "Node.js dependency file reveals technology stack",
    "requirements.txt": "Python dependency file reveals technology stack",
    "Dockerfile": "Docker configuration may reveal architecture",
    "swagger.json": "API specification publicly accessible",
    "openapi.json": "API specification publicly accessible",
    "graphql": "GraphQL endpoint may allow introspection",
}


class DirectoryScanner:
    """
    Dictionary-based directory and path discovery.
    Uses a wordlist to find accessible endpoints.
    """

    def __init__(
        self,
        timeout: float = 5.0,
        rate_limiter: Optional[RateLimiter] = None,
        wordlist_file: Optional[str] = None,
    ):
        self.timeout = timeout
        self.rate_limiter = rate_limiter or RateLimiter(10.0)

        # Load wordlist
        if wordlist_file and Path(wordlist_file).exists():
            self.wordlist = Path(wordlist_file).read_text().splitlines()
            self.wordlist = [w.strip() for w in self.wordlist if w.strip() and not w.startswith('#')]
        else:
            self.wordlist = DEFAULT_WORDLIST

    def scan(self, base_url: str) -> list[Finding]:
        """
        Scan for accessible directories and files.

        Args:
            base_url: Target base URL (e.g., https://example.com).

        Returns:
            List of findings.
        """
        findings = []
        discovered = []

        # Clean base URL
        base_url = base_url.rstrip('/')

        print(f"[*] Scanning {len(self.wordlist)} paths on {base_url}...")

        for i, path in enumerate(self.wordlist):
            if i > 0 and i % 50 == 0:
                print(f"    Progress: {i}/{len(self.wordlist)} paths checked, "
                      f"{len(discovered)} found")

            self.rate_limiter.acquire()

            url = f"{base_url}/{path}"

            try:
                response = requests.get(
                    url,
                    timeout=self.timeout,
                    allow_redirects=False,  # Don't follow redirects
                    verify=True,
                    headers={
                        "User-Agent": "SecurityScanner/1.0 (Authorized Scan)"
                    },
                )

                status = response.status_code

                # 200: directly accessible
                # 301/302: redirects (may still be interesting)
                # 403: forbidden (exists but restricted)
                if status == 200:
                    size = len(response.content)
                    discovered.append({
                        "path": path,
                        "status": status,
                        "size": size,
                    })
                    print(f"    [+] FOUND: /{path}  (200 OK, {size} bytes)")

                    # Check if this is a sensitive path
                    if path in SENSITIVE_PATHS:
                        findings.append(Finding(
                            title=f"Sensitive Path Accessible: /{path}",
                            severity=Severity.HIGH,
                            description=SENSITIVE_PATHS[path],
                            module="dir_scanner",
                            details={
                                "path": path,
                                "url": url,
                                "status_code": status,
                                "size": size,
                            },
                            remediation=(
                                f"Restrict access to /{path}. "
                                f"Remove from web root or deny in server configuration."
                            ),
                        ))
                    else:
                        findings.append(Finding(
                            title=f"Discovered Path: /{path}",
                            severity=Severity.INFO,
                            description=f"Path /{path} is accessible (HTTP 200).",
                            module="dir_scanner",
                            details={
                                "path": path,
                                "url": url,
                                "status_code": status,
                                "size": size,
                            },
                        ))

                elif status == 403:
                    # Exists but forbidden - note for manual testing
                    discovered.append({
                        "path": path,
                        "status": status,
                        "size": 0,
                    })
                    if path in SENSITIVE_PATHS:
                        findings.append(Finding(
                            title=f"Restricted Path Detected: /{path}",
                            severity=Severity.LOW,
                            description=(
                                f"Path /{path} exists but returns 403 Forbidden. "
                                f"Verify access controls are correct."
                            ),
                            module="dir_scanner",
                            details={"path": path, "status_code": status},
                        ))

            except requests.exceptions.RequestException:
                pass  # Connection error, skip this path

        print(f"[*] Directory scan complete. Found {len(discovered)} paths, "
              f"{len(findings)} findings.")

        return findings

9. CVE 데이터베이스 조회

9.1 CVE 조회 모듈

"""
scanner/cve_lookup.py - CVE (Common Vulnerabilities and Exposures) lookup.

Queries public CVE databases to find known vulnerabilities
for discovered services and versions.
"""

import re
import json
import requests
from typing import Optional

from scanner.utils import Finding, Severity, PortInfo, RateLimiter


# Simplified local CVE database for common services
# In production, query NVD API, VulnDB, or similar
LOCAL_CVE_DB = {
    "apache": {
        "2.4.49": [
            {
                "id": "CVE-2021-41773",
                "severity": "CRITICAL",
                "description": "Path traversal and RCE in Apache HTTP Server 2.4.49",
                "fix": "Upgrade to Apache 2.4.51+",
            },
        ],
        "2.4.50": [
            {
                "id": "CVE-2021-42013",
                "severity": "CRITICAL",
                "description": "Path traversal in Apache HTTP Server 2.4.50 (bypass of CVE-2021-41773)",
                "fix": "Upgrade to Apache 2.4.51+",
            },
        ],
    },
    "openssh": {
        "8.5": [
            {
                "id": "CVE-2021-41617",
                "severity": "HIGH",
                "description": "Privilege escalation in OpenSSH 6.2-8.7",
                "fix": "Upgrade to OpenSSH 8.8+",
            },
        ],
    },
    "nginx": {
        "1.20.0": [
            {
                "id": "CVE-2021-23017",
                "severity": "HIGH",
                "description": "DNS resolver vulnerabilities in nginx 0.6.18-1.20.0",
                "fix": "Upgrade to nginx 1.20.1+",
            },
        ],
    },
    "mysql": {
        "5.7": [
            {
                "id": "CVE-2023-21912",
                "severity": "HIGH",
                "description": "MySQL Server 5.7 multiple vulnerabilities",
                "fix": "Upgrade to MySQL 8.0+",
            },
        ],
    },
    "redis": {
        "6.0": [
            {
                "id": "CVE-2022-24735",
                "severity": "HIGH",
                "description": "Redis Lua script sandbox escape",
                "fix": "Upgrade to Redis 6.2.7+ or 7.0+",
            },
        ],
    },
}


class CVELookup:
    """
    Look up known CVEs for discovered services.
    Uses both a local database and optional online API queries.
    """

    NVD_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"

    def __init__(
        self,
        use_online: bool = False,
        rate_limiter: Optional[RateLimiter] = None,
    ):
        """
        Args:
            use_online: Whether to query online NVD API.
            rate_limiter: Rate limiter for API calls.
        """
        self.use_online = use_online
        self.rate_limiter = rate_limiter or RateLimiter(1.0)  # NVD rate limits

    def lookup(self, open_ports: list[PortInfo]) -> list[Finding]:
        """
        Look up CVEs for all discovered services.

        Args:
            open_ports: List of open ports with service information.

        Returns:
            List of CVE findings.
        """
        findings = []

        print(f"[*] Looking up known CVEs for {len(open_ports)} services...")

        for port_info in open_ports:
            if not port_info.version:
                continue

            # Parse service name and version
            service_name, version = self._parse_service_version(
                port_info.service, port_info.version, port_info.banner
            )

            if not service_name or not version:
                continue

            # Check local database
            local_cves = self._lookup_local(service_name, version)
            for cve in local_cves:
                severity_map = {
                    "CRITICAL": Severity.CRITICAL,
                    "HIGH": Severity.HIGH,
                    "MEDIUM": Severity.MEDIUM,
                    "LOW": Severity.LOW,
                }
                findings.append(Finding(
                    title=f"Known CVE: {cve['id']}",
                    severity=severity_map.get(cve['severity'], Severity.MEDIUM),
                    description=cve['description'],
                    module="cve_lookup",
                    details={
                        "cve_id": cve['id'],
                        "service": service_name,
                        "version": version,
                        "port": port_info.port,
                    },
                    remediation=cve.get('fix', 'Upgrade to the latest version.'),
                    reference=f"https://nvd.nist.gov/vuln/detail/{cve['id']}",
                ))
                print(f"    [!] {cve['id']}: {service_name} {version} "
                      f"on port {port_info.port}")

            # Query online NVD if enabled
            if self.use_online:
                online_cves = self._lookup_nvd(service_name, version)
                for cve in online_cves:
                    if not any(f.details.get('cve_id') == cve['id']
                               for f in findings):
                        findings.append(Finding(
                            title=f"Known CVE (NVD): {cve['id']}",
                            severity=Severity.MEDIUM,  # Default, refine from CVSS
                            description=cve['description'][:200],
                            module="cve_lookup",
                            details={
                                "cve_id": cve['id'],
                                "service": service_name,
                                "version": version,
                                "port": port_info.port,
                            },
                            reference=f"https://nvd.nist.gov/vuln/detail/{cve['id']}",
                        ))

        print(f"[*] CVE lookup complete. Found {len(findings)} known vulnerabilities.")
        return findings

    def _parse_service_version(
        self, service: str, version: str, banner: str
    ) -> tuple[str, str]:
        """Extract standardized service name and version."""
        version_lower = version.lower()
        banner_lower = banner.lower()

        # Apache
        if "apache" in version_lower or "apache" in banner_lower:
            match = re.search(r'apache[/ ](\d+\.\d+\.\d+)', version_lower + " " + banner_lower)
            if match:
                return "apache", match.group(1)

        # nginx
        if "nginx" in version_lower or "nginx" in banner_lower:
            match = re.search(r'nginx[/ ](\d+\.\d+\.\d+)', version_lower + " " + banner_lower)
            if match:
                return "nginx", match.group(1)

        # OpenSSH
        if "openssh" in version_lower or "openssh" in banner_lower:
            match = re.search(r'openssh[_/ ](\d+\.\d+)', version_lower + " " + banner_lower)
            if match:
                return "openssh", match.group(1)

        # MySQL
        if "mysql" in version_lower or "mysql" in banner_lower:
            match = re.search(r'mysql[/ ]*(\d+\.\d+)', version_lower + " " + banner_lower)
            if match:
                return "mysql", match.group(1)

        # Redis
        if "redis" in version_lower or "redis" in banner_lower:
            match = re.search(r'redis[/ ]*(\d+\.\d+)', version_lower + " " + banner_lower)
            if match:
                return "redis", match.group(1)

        return "", ""

    def _lookup_local(self, service: str, version: str) -> list[dict]:
        """Look up CVEs in local database."""
        service_db = LOCAL_CVE_DB.get(service.lower(), {})

        results = []
        for db_version, cves in service_db.items():
            # Check if the detected version matches
            if version.startswith(db_version):
                results.extend(cves)

        return results

    def _lookup_nvd(self, service: str, version: str) -> list[dict]:
        """Query NVD API for CVEs (rate-limited)."""
        self.rate_limiter.acquire()

        try:
            response = requests.get(
                self.NVD_API_URL,
                params={
                    "keywordSearch": f"{service} {version}",
                    "resultsPerPage": 5,
                },
                timeout=15,
                headers={"User-Agent": "SecurityScanner/1.0"},
            )

            if response.status_code == 200:
                data = response.json()
                results = []
                for vuln in data.get("vulnerabilities", []):
                    cve = vuln.get("cve", {})
                    desc = ""
                    for d in cve.get("descriptions", []):
                        if d.get("lang") == "en":
                            desc = d.get("value", "")
                            break

                    results.append({
                        "id": cve.get("id", ""),
                        "description": desc,
                    })
                return results

        except (requests.exceptions.RequestException, json.JSONDecodeError):
            pass

        return []

10. 리포트 생성

10.1 리포트 엔진

"""
scanner/report.py - Generate scan reports in multiple formats.
"""

import json
from dataclasses import asdict
from datetime import datetime

from scanner.utils import ScanResult, Finding, Severity


class ReportGenerator:
    """Generate vulnerability scan reports."""

    def generate_text(self, result: ScanResult) -> str:
        """Generate a text report."""
        lines = []

        lines.append("=" * 70)
        lines.append("  VULNERABILITY SCAN REPORT")
        lines.append("=" * 70)
        lines.append(f"  Target:    {result.target.host} ({result.target.ip})")
        lines.append(f"  Scan Date: {result.start_time}")
        lines.append(f"  Duration:  {result.duration_seconds:.1f} seconds")

        # Summary
        severity_counts = {}
        for f in result.findings:
            severity_counts[f.severity.value] = \
                severity_counts.get(f.severity.value, 0) + 1

        lines.append(f"\n  Open Ports:   {len(result.open_ports)}")
        lines.append(f"  Total Findings: {len(result.findings)}")

        for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
            count = severity_counts.get(sev, 0)
            if count > 0:
                lines.append(f"    {sev:10s}: {count}")

        # Open Ports
        if result.open_ports:
            lines.append(f"\n{'─' * 70}")
            lines.append("  OPEN PORTS")
            lines.append(f"{'─' * 70}")
            lines.append(f"  {'Port':>7s}  {'State':8s}  {'Service':15s}  {'Version'}")
            lines.append(f"  {'─'*7}  {'─'*8}  {'─'*15}  {'─'*30}")

            for port in result.open_ports:
                lines.append(
                    f"  {port.port:>7d}  {port.state:8s}  "
                    f"{port.service:15s}  {port.version[:30]}"
                )

        # Findings by severity
        for severity in [Severity.CRITICAL, Severity.HIGH, Severity.MEDIUM,
                         Severity.LOW, Severity.INFO]:
            findings = [f for f in result.findings if f.severity == severity]
            if not findings:
                continue

            lines.append(f"\n{'─' * 70}")
            lines.append(f"  {severity.value} FINDINGS ({len(findings)})")
            lines.append(f"{'─' * 70}")

            for i, finding in enumerate(findings, 1):
                lines.append(f"\n  {i}. [{finding.module}] {finding.title}")
                lines.append(f"     {finding.description}")
                if finding.remediation:
                    lines.append(f"     Fix: {finding.remediation}")
                if finding.reference:
                    lines.append(f"     Ref: {finding.reference}")
                if finding.details:
                    for key, value in finding.details.items():
                        lines.append(f"     {key}: {value}")

        # Errors
        if result.errors:
            lines.append(f"\n{'─' * 70}")
            lines.append("  ERRORS")
            lines.append(f"{'─' * 70}")
            for error in result.errors:
                lines.append(f"  [!] {error}")

        # Footer
        lines.append(f"\n{'=' * 70}")
        if severity_counts.get("CRITICAL", 0) > 0:
            lines.append("  OVERALL: CRITICAL vulnerabilities found!")
            lines.append("  Immediate remediation required.")
        elif severity_counts.get("HIGH", 0) > 0:
            lines.append("  OVERALL: HIGH severity vulnerabilities found.")
            lines.append("  Remediation recommended within 7 days.")
        elif severity_counts.get("MEDIUM", 0) > 0:
            lines.append("  OVERALL: MEDIUM severity issues found.")
            lines.append("  Remediation recommended within 30 days.")
        else:
            lines.append("  OVERALL: No critical vulnerabilities found.")
        lines.append("=" * 70)

        return "\n".join(lines)

    def generate_json(self, result: ScanResult) -> str:
        """Generate a JSON report."""
        report = {
            "scan_metadata": {
                "target": result.target.host,
                "ip": result.target.ip,
                "start_time": result.start_time,
                "end_time": result.end_time,
                "duration_seconds": result.duration_seconds,
            },
            "summary": {
                "open_ports": len(result.open_ports),
                "total_findings": len(result.findings),
                "by_severity": {},
            },
            "open_ports": [
                {
                    "port": p.port,
                    "state": p.state,
                    "service": p.service,
                    "version": p.version,
                    "banner": p.banner[:200],
                }
                for p in result.open_ports
            ],
            "findings": [
                {
                    "title": f.title,
                    "severity": f.severity.value,
                    "description": f.description,
                    "module": f.module,
                    "details": f.details,
                    "remediation": f.remediation,
                    "reference": f.reference,
                }
                for f in result.findings
            ],
            "errors": result.errors,
        }

        # Add severity counts
        for f in result.findings:
            sev = f.severity.value
            report["summary"]["by_severity"][sev] = \
                report["summary"]["by_severity"].get(sev, 0) + 1

        return json.dumps(report, indent=2)

    def generate_html(self, result: ScanResult) -> str:
        """Generate an HTML report."""
        severity_colors = {
            "CRITICAL": "#dc3545",
            "HIGH": "#fd7e14",
            "MEDIUM": "#ffc107",
            "LOW": "#17a2b8",
            "INFO": "#6c757d",
        }

        html = f"""<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Vulnerability Scan Report - {result.target.host}</title>
    <style>
        body {{ font-family: -apple-system, BlinkMacSystemFont, sans-serif;
               max-width: 900px; margin: 0 auto; padding: 20px;
               background: #f8f9fa; }}
        h1 {{ color: #212529; border-bottom: 2px solid #dee2e6; padding-bottom: 10px; }}
        h2 {{ color: #495057; margin-top: 30px; }}
        .summary {{ background: #fff; padding: 20px; border-radius: 8px;
                    box-shadow: 0 1px 3px rgba(0,0,0,0.1); }}
        .finding {{ background: #fff; padding: 15px; margin: 10px 0;
                    border-radius: 8px; border-left: 4px solid #dee2e6;
                    box-shadow: 0 1px 3px rgba(0,0,0,0.1); }}
        .severity {{ display: inline-block; padding: 2px 8px; border-radius: 4px;
                     color: white; font-size: 0.85em; font-weight: bold; }}
        table {{ width: 100%; border-collapse: collapse; background: #fff;
                border-radius: 8px; overflow: hidden; }}
        th, td {{ padding: 10px 15px; text-align: left; border-bottom: 1px solid #dee2e6; }}
        th {{ background: #495057; color: white; }}
        .fix {{ color: #28a745; font-style: italic; }}
    </style>
</head>
<body>
    <h1>Vulnerability Scan Report</h1>
    <div class="summary">
        <p><strong>Target:</strong> {result.target.host} ({result.target.ip})</p>
        <p><strong>Date:</strong> {result.start_time}</p>
        <p><strong>Duration:</strong> {result.duration_seconds:.1f} seconds</p>
        <p><strong>Open Ports:</strong> {len(result.open_ports)} |
           <strong>Findings:</strong> {len(result.findings)}</p>
    </div>

    <h2>Open Ports</h2>
    <table>
        <tr><th>Port</th><th>State</th><th>Service</th><th>Version</th></tr>
"""
        for port in result.open_ports:
            html += f"        <tr><td>{port.port}</td><td>{port.state}</td>"
            html += f"<td>{port.service}</td><td>{port.version[:50]}</td></tr>\n"

        html += "    </table>\n\n    <h2>Findings</h2>\n"

        for severity_name in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]:
            findings = [f for f in result.findings
                        if f.severity.value == severity_name]
            if not findings:
                continue

            color = severity_colors.get(severity_name, "#6c757d")
            for finding in findings:
                html += f"""    <div class="finding" style="border-left-color: {color}">
        <span class="severity" style="background: {color}">{severity_name}</span>
        <strong>{finding.title}</strong>
        <p>{finding.description}</p>
"""
                if finding.remediation:
                    html += f'        <p class="fix">Fix: {finding.remediation}</p>\n'
                if finding.reference:
                    html += f'        <p>Reference: <a href="{finding.reference}">{finding.reference}</a></p>\n'
                html += "    </div>\n"

        html += f"""
    <hr>
    <p style="color: #6c757d; font-size: 0.85em;">
        Generated by Vulnerability Scanner on {datetime.now().isoformat()}
    </p>
</body>
</html>"""

        return html

11. 스캔 컨트롤러 (오케스트레이터)

11.1 메인 컨트롤러

"""
scanner/controller.py - Scan orchestration controller.
"""

import time
from typing import Optional

from scanner.utils import ScanTarget, ScanResult, RateLimiter, COMMON_PORTS
from scanner.port_scanner import PortScanner
from scanner.banner_grabber import BannerGrabber
from scanner.http_checker import HTTPChecker
from scanner.ssl_checker import SSLChecker
from scanner.dir_scanner import DirectoryScanner
from scanner.cve_lookup import CVELookup
from scanner.report import ReportGenerator


class ScanController:
    """
    Orchestrates all scan modules and generates reports.
    """

    def __init__(
        self,
        rate_limit: float = 50.0,
        timeout: float = 3.0,
        max_workers: int = 50,
    ):
        """
        Args:
            rate_limit: Maximum requests per second.
            timeout: Default timeout for connections.
            max_workers: Maximum concurrent threads for port scanning.
        """
        self.rate_limiter = RateLimiter(rate_limit)
        self.timeout = timeout
        self.max_workers = max_workers
        self.report_generator = ReportGenerator()

    def scan(
        self,
        host: str,
        ports: Optional[list[int]] = None,
        scan_http: bool = True,
        scan_ssl: bool = True,
        scan_dirs: bool = False,
        lookup_cves: bool = True,
        wordlist: Optional[str] = None,
    ) -> ScanResult:
        """
        Perform a comprehensive vulnerability scan.

        Args:
            host: Target hostname or IP.
            ports: Specific ports to scan (default: common ports).
            scan_http: Whether to check HTTP security headers.
            scan_ssl: Whether to check SSL/TLS configuration.
            scan_dirs: Whether to perform directory scanning.
            lookup_cves: Whether to look up CVEs for found services.
            wordlist: Path to custom wordlist for directory scanning.

        Returns:
            Complete scan result.
        """
        # Initialize target
        target = ScanTarget(host=host)
        if not target.resolve():
            print(f"[!] Cannot resolve hostname: {host}")
            result = ScanResult(target=target)
            result.errors.append(f"DNS resolution failed for {host}")
            return result

        print(f"\n{'='*60}")
        print(f"  VULNERABILITY SCAN")
        print(f"  Target: {host} ({target.ip})")
        print(f"  Rate Limit: {self.rate_limiter.rate} req/s")
        print(f"{'='*60}\n")

        result = ScanResult(target=target)
        start_time = time.time()

        # Phase 1: Port Scanning
        print("\n[Phase 1/5] Port Scanning")
        print("-" * 40)
        port_scanner = PortScanner(
            timeout=self.timeout,
            rate_limiter=self.rate_limiter,
            max_workers=self.max_workers,
        )
        result.open_ports = port_scanner.scan_ports(host, ports or COMMON_PORTS)
        result.findings.extend(
            port_scanner.generate_findings(result.open_ports)
        )

        if not result.open_ports:
            print("[!] No open ports found. Scan complete.")
            result.end_time = time.strftime("%Y-%m-%dT%H:%M:%S")
            result.duration_seconds = time.time() - start_time
            return result

        # Phase 2: Banner Grabbing
        print("\n[Phase 2/5] Banner Grabbing")
        print("-" * 40)
        banner_grabber = BannerGrabber(
            timeout=self.timeout,
            rate_limiter=self.rate_limiter,
        )
        banner_grabber.grab_all_banners(host, result.open_ports)
        result.findings.extend(
            banner_grabber.generate_findings(result.open_ports)
        )

        # Phase 3: HTTP Security Headers
        if scan_http:
            print("\n[Phase 3/5] HTTP Security Headers")
            print("-" * 40)
            http_checker = HTTPChecker(
                timeout=self.timeout,
                rate_limiter=self.rate_limiter,
            )

            # Check HTTP and HTTPS
            http_ports = [p for p in result.open_ports
                          if p.port in (80, 8080, 8000)]
            https_ports = [p for p in result.open_ports
                           if p.port in (443, 8443)]

            for port_info in http_ports:
                url = f"http://{host}:{port_info.port}" if port_info.port != 80 \
                      else f"http://{host}"
                result.findings.extend(http_checker.check(url))

            for port_info in https_ports:
                url = f"https://{host}:{port_info.port}" if port_info.port != 443 \
                      else f"https://{host}"
                result.findings.extend(http_checker.check(url))
        else:
            print("\n[Phase 3/5] HTTP Security Headers (skipped)")

        # Phase 4: SSL/TLS Check
        if scan_ssl:
            print("\n[Phase 4/5] SSL/TLS Configuration")
            print("-" * 40)
            ssl_checker = SSLChecker(
                timeout=self.timeout,
                rate_limiter=self.rate_limiter,
            )

            ssl_ports = [p for p in result.open_ports
                         if p.port in (443, 8443, 465, 993, 995)]
            for port_info in ssl_ports:
                result.findings.extend(
                    ssl_checker.check(host, port_info.port)
                )
        else:
            print("\n[Phase 4/5] SSL/TLS Configuration (skipped)")

        # Phase 5: Directory Scanning (optional)
        if scan_dirs:
            print("\n[Phase 5/5] Directory Discovery")
            print("-" * 40)
            dir_scanner = DirectoryScanner(
                timeout=self.timeout,
                rate_limiter=self.rate_limiter,
                wordlist_file=wordlist,
            )

            web_ports = [p for p in result.open_ports
                         if p.port in (80, 443, 8080, 8443)]
            for port_info in web_ports:
                scheme = "https" if port_info.port in (443, 8443) else "http"
                port_suffix = f":{port_info.port}" \
                    if port_info.port not in (80, 443) else ""
                url = f"{scheme}://{host}{port_suffix}"
                result.findings.extend(dir_scanner.scan(url))
        else:
            print("\n[Phase 5/5] Directory Discovery (skipped)")

        # CVE Lookup
        if lookup_cves:
            print("\n[Bonus] CVE Database Lookup")
            print("-" * 40)
            cve_lookup = CVELookup(use_online=False)
            result.findings.extend(cve_lookup.lookup(result.open_ports))

        # Finalize
        result.end_time = time.strftime("%Y-%m-%dT%H:%M:%S")
        result.duration_seconds = time.time() - start_time

        # Sort findings by severity
        severity_order = {
            "CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4
        }
        result.findings.sort(
            key=lambda f: severity_order.get(f.severity.value, 5)
        )

        return result

12. CLI 진입점

12.1 메인 스크립트

"""
main.py - CLI entry point for the vulnerability scanner.

Usage:
    python main.py example.com
    python main.py example.com --ports 80,443,8080
    python main.py example.com --full --output report.json
    python main.py example.com --dirs --wordlist wordlists/common_paths.txt
"""

import argparse
import sys

from scanner.controller import ScanController
from scanner.report import ReportGenerator
from scanner.utils import is_valid_target, COMMON_PORTS


def parse_args() -> argparse.Namespace:
    """Parse command-line arguments."""
    parser = argparse.ArgumentParser(
        description="Vulnerability Scanner - Authorized security scanning tool",
        epilog=(
            "IMPORTANT: Only scan targets you own or have explicit written "
            "permission to test. Unauthorized scanning is illegal."
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )

    # Target
    parser.add_argument(
        "target",
        help="Target hostname or IP address to scan",
    )

    # Port options
    port_group = parser.add_mutually_exclusive_group()
    port_group.add_argument(
        "-p", "--ports",
        help="Comma-separated list of ports to scan (e.g., 80,443,8080)",
    )
    port_group.add_argument(
        "--top-ports",
        type=int,
        default=None,
        help="Scan top N most common ports (default: all common ports)",
    )
    port_group.add_argument(
        "--all-ports",
        action="store_true",
        help="Scan all ports (1-65535). VERY SLOW.",
    )

    # Scan options
    parser.add_argument(
        "--full",
        action="store_true",
        help="Full scan: ports + HTTP + SSL + dirs + CVEs",
    )
    parser.add_argument(
        "--no-http",
        action="store_true",
        help="Skip HTTP security header check",
    )
    parser.add_argument(
        "--no-ssl",
        action="store_true",
        help="Skip SSL/TLS configuration check",
    )
    parser.add_argument(
        "--dirs",
        action="store_true",
        help="Enable directory/path discovery",
    )
    parser.add_argument(
        "--no-cve",
        action="store_true",
        help="Skip CVE database lookup",
    )
    parser.add_argument(
        "--wordlist",
        default=None,
        help="Custom wordlist file for directory scanning",
    )

    # Performance options
    parser.add_argument(
        "--rate",
        type=float,
        default=50.0,
        help="Maximum requests per second (default: 50)",
    )
    parser.add_argument(
        "--timeout",
        type=float,
        default=3.0,
        help="Connection timeout in seconds (default: 3)",
    )
    parser.add_argument(
        "--threads",
        type=int,
        default=50,
        help="Maximum concurrent threads for port scanning (default: 50)",
    )

    # Output options
    parser.add_argument(
        "-o", "--output",
        help="Output file path (format detected from extension: .json, .html, .txt)",
    )
    parser.add_argument(
        "-f", "--format",
        choices=["text", "json", "html"],
        default="text",
        help="Output format (default: text)",
    )
    parser.add_argument(
        "-q", "--quiet",
        action="store_true",
        help="Suppress progress output (only show results)",
    )

    # Confirmation
    parser.add_argument(
        "--confirm",
        action="store_true",
        help="Skip authorization confirmation prompt",
    )

    return parser.parse_args()


def parse_ports(ports_str: str) -> list[int]:
    """Parse a comma-separated port list, supporting ranges."""
    ports = set()
    for part in ports_str.split(","):
        part = part.strip()
        if "-" in part:
            start, end = part.split("-", 1)
            ports.update(range(int(start), int(end) + 1))
        else:
            ports.add(int(part))
    return sorted(ports)


def main():
    args = parse_args()

    # Authorization confirmation
    if not args.confirm:
        print("=" * 60)
        print("  VULNERABILITY SCANNER")
        print("=" * 60)
        print(f"\n  Target: {args.target}")
        print("\n  WARNING: Unauthorized scanning is ILLEGAL.")
        print("  Only scan targets you own or have written permission")
        print("  to test.\n")

        try:
            response = input("  Do you have authorization to scan this target? [y/N]: ")
            if response.lower() not in ('y', 'yes'):
                print("\n  Scan cancelled. Get proper authorization first.")
                sys.exit(0)
        except KeyboardInterrupt:
            print("\n\n  Scan cancelled.")
            sys.exit(0)

    # Validate target
    if not is_valid_target(args.target):
        print(f"[!] Invalid or potentially dangerous target: {args.target}")
        sys.exit(1)

    # Parse ports
    if args.ports:
        ports = parse_ports(args.ports)
    elif args.all_ports:
        ports = list(range(1, 65536))
    elif args.top_ports:
        ports = COMMON_PORTS[:args.top_ports]
    else:
        ports = COMMON_PORTS

    # Determine scan options
    scan_http = not args.no_http
    scan_ssl = not args.no_ssl
    scan_dirs = args.dirs or args.full
    lookup_cves = not args.no_cve

    if args.full:
        scan_http = True
        scan_ssl = True
        scan_dirs = True
        lookup_cves = True

    # Run scan
    controller = ScanController(
        rate_limit=args.rate,
        timeout=args.timeout,
        max_workers=args.threads,
    )

    try:
        result = controller.scan(
            host=args.target,
            ports=ports,
            scan_http=scan_http,
            scan_ssl=scan_ssl,
            scan_dirs=scan_dirs,
            lookup_cves=lookup_cves,
            wordlist=args.wordlist,
        )
    except KeyboardInterrupt:
        print("\n\n[!] Scan interrupted by user.")
        sys.exit(130)

    # Generate report
    report_gen = ReportGenerator()

    # Determine output format
    output_format = args.format
    if args.output:
        if args.output.endswith('.json'):
            output_format = 'json'
        elif args.output.endswith('.html'):
            output_format = 'html'
        elif args.output.endswith('.txt'):
            output_format = 'text'

    if output_format == 'json':
        report = report_gen.generate_json(result)
    elif output_format == 'html':
        report = report_gen.generate_html(result)
    else:
        report = report_gen.generate_text(result)

    # Output
    if args.output:
        with open(args.output, 'w') as f:
            f.write(report)
        print(f"\n[*] Report saved to: {args.output}")
    else:
        print("\n" + report)

    # Exit code based on findings
    critical = sum(1 for f in result.findings if f.severity.value == "CRITICAL")
    high = sum(1 for f in result.findings if f.severity.value == "HIGH")

    if critical > 0:
        sys.exit(2)
    elif high > 0:
        sys.exit(1)
    else:
        sys.exit(0)


if __name__ == "__main__":
    main()

13. 사용 예시

13.1 기본 스캔

# 대상의 일반적인 포트 스캔
python main.py example.com

# 특정 포트 스캔
python main.py example.com -p 80,443,8080,3306

# 상위 20개 포트만 스캔
python main.py example.com --top-ports 20

13.2 전체 스캔

# 모든 모듈로 전체 스캔
python main.py example.com --full

# 사용자 정의 속도 제한과 출력으로 전체 스캔
python main.py example.com --full --rate 20 -o report.html

# 사용자 정의 워드리스트로 전체 스캔
python main.py example.com --full --wordlist wordlists/large.txt

13.3 타겟 스캔

# HTTP 헤더만 (웹 서버 포트)
python main.py example.com -p 80,443 --no-ssl --no-cve

# SSL/TLS만
python main.py example.com -p 443 --no-http --no-cve

# 디렉토리 스캔만
python main.py example.com -p 80,443 --dirs --no-ssl --no-cve

13.4 출력 형식

# JSON 출력 (자동화용)
python main.py example.com -o scan_results.json

# HTML 리포트 (공유용)
python main.py example.com --full -o report.html

# 텍스트 리포트를 파일로
python main.py example.com -o report.txt

# stdout으로 JSON 조용히 출력
python main.py example.com -q -f json --confirm

14. 연습 문제

연습 1: 스캐너 실행

로컬에 호스팅된 테스트 애플리케이션에 대해 스캐너를 설정하고 실행하세요: 1. Docker 컨테이너에 DVWA 또는 WebGoat를 설치하세요 2. --full 옵션으로 localhost에 대해 스캐너를 실행하세요 3. 리포트를 분석하고 모든 발견 사항을 분류하세요 4. 진양성(true positive) vs 위양성(false positive) 발견 사항을 판별하세요

연습 2: UDP 스캐닝 추가

포트 스캐너를 확장하여 기본 UDP 스캐닝을 지원하세요: 1. SOCK_DGRAM을 사용하여 UDP 포트 스캐닝을 구현하세요 2. 차이점(핸드셰이크 없음, 신뢰성 낮음)을 처리하세요 3. 일반적인 UDP 서비스(DNS/53, SNMP/161, NTP/123) 지원을 추가하세요 4. CLI에 --udp 플래그를 추가하세요

연습 3: 웹 크롤러 구현

다음 기능을 수행하는 웹 크롤러 모듈을 추가하세요: 1. 기본 URL에서 시작하여 링크된 페이지를 발견합니다 2. 모든 폼과 파라미터를 추출합니다 3. 공격 표면이 될 수 있는 입력 필드를 식별합니다 4. robots.txt를 준수합니다 5. 크롤링 깊이를 제한하고 속도 제한기를 따릅니다

연습 4: 인증 지원 추가

HTTP 검사기를 확장하여 인증된 스캐닝을 지원하세요: 1. CLI를 통해 로그인 자격 증명을 받으세요 2. 인증을 수행하세요(폼 기반 또는 쿠키 기반) 3. 로그인이 필요한 인증된 페이지를 스캔하세요 4. API용 Bearer 토큰 인증을 지원하세요

연습 5: 커스텀 CVE 데이터베이스

더 포괄적인 CVE 조회 시스템을 구축하세요: 1. NVD(NIST)에서 CVE 데이터를 다운로드하세요 2. 로컬 SQLite 데이터베이스에 저장하세요 3. 버전 범위 매칭을 구현하세요(정확한 매칭뿐만 아니라) 4. CVSS 점수 파싱과 심각도 매핑을 추가하세요 5. 자동 데이터베이스 업데이트를 지원하세요

연습 6: 병렬 모듈 실행

스캐너를 속도 최적화하세요: 1. 독립적인 모듈을 병렬로 실행하세요(예: HTTP 검사와 SSL 검사) 2. 전체 완료율을 보여주는 프로그레스 바를 구현하세요 3. 파일에서 여러 대상을 읽어 스캐닝하는 기능을 추가하세요 4. 순차 실행과 병렬 실행의 시간을 벤치마크하고 비교하세요


요약

┌──────────────────────────────────────────────────────────────────┐
          취약점 스캐너 핵심 요약                                    
├──────────────────────────────────────────────────────────────────┤
                                                                   
  1. 윤리 우선: 스캐닝  반드시 권한을 획득하세요                
  2. 속도 제한: 책임감 있는 스캐닝은 대상의 자원과              
     네트워크 용량을 존중합니다                                    
  3. 모듈식 설계:  스캔 유형은 독립적으로 실행하거나           
     결합할  있는 독립 모듈입니다                                
  4. 심층 방어: 포트 스캔 + 배너 수집 + HTTP 헤더 +             
     SSL + 디렉토리 스캔 + CVE 조회로 포괄적 커버리지           
  5. 위양성: 스캐너는 노이즈를 생성합니다; 발견 사항 검증에     
     사람의 분석이 필요합니다                                      
  6. 버전 탐지: 정확한 서비스/버전 식별은 CVE 매칭에            
     매우 중요합니다                                               
  7. 리포팅: 심각도 등급과 수정 조언이 포함된 명확하고           
     실행 가능한 리포트는 탐지만큼 중요합니다                     
  8. 자동화: 구조화된 출력의 CLI 인터페이스는 CI/CD           
     정기 보안 평가에 통합할  있습니다                           
  9. 지속성: 보안 스캐닝은 일회성이 아니라 정기적이어야          
     합니다 -- 개발 워크플로우에 통합하세요                        
 10. 한계: 자동화된 스캐너는 일반적인 문제를 찾지만             
     로직 결함에 대한 수동 침투 테스트를 대체할  없습니다      
                                                                   
└──────────────────────────────────────────────────────────────────┘

이전: 15. 프로젝트: 보안 REST API 구축

to navigate between lessons