diff --git a/.github/workflows/performance.yml b/.github/workflows/performance.yml new file mode 100644 index 0000000..ce34f29 --- /dev/null +++ b/.github/workflows/performance.yml @@ -0,0 +1,68 @@ +name: Performance + +on: + workflow_dispatch: + schedule: + - cron: "10 4 * * 1" + +permissions: + contents: read + +jobs: + benchmark: + runs-on: ubuntu-latest + timeout-minutes: 30 + + steps: + - name: Checkout + uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4 + + - name: Set up Python + uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.lock + + - name: Start local origin + run: | + python -m http.server 18080 >/tmp/origin.log 2>&1 & + echo $! > /tmp/origin.pid + + - name: Start EvilWAF + run: | + python evilwaf.py -t http://127.0.0.1:18080 --no-tui --listen-host 127.0.0.1 --listen-port 8080 >/tmp/evilwaf.log 2>&1 & + echo $! > /tmp/evilwaf.pid + sleep 5 + + - name: Run benchmark + run: | + python benchmarks/proxy_benchmark.py \ + --proxy http://127.0.0.1:8080 \ + --target http://127.0.0.1:18080 \ + --requests 200 \ + --concurrency 20 > perf.txt + cat perf.txt + + - name: Check budgets + run: | + python benchmarks/check_budgets.py perf.txt benchmarks/perf_budgets.json + + - name: Upload benchmark artifacts + if: always() + uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4 + with: + name: perf-results + path: | + perf.txt + /tmp/origin.log + /tmp/evilwaf.log + + - name: Cleanup background services + if: always() + run: | + kill "$(cat /tmp/evilwaf.pid)" || true + kill "$(cat /tmp/origin.pid)" || true diff --git a/benchmarks/check_budgets.py b/benchmarks/check_budgets.py new file mode 100644 index 0000000..86f5c2a --- /dev/null +++ b/benchmarks/check_budgets.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import sys +from pathlib import Path + + +def _parse_result(path: Path) -> dict: + out = {} + for line in path.read_text().splitlines(): + if "=" not in line: + continue + k, v = line.strip().split("=", 1) + try: + out[k] = float(v) + except ValueError: + pass + return out + + +def main() -> int: + if len(sys.argv) != 3: + print("usage: check_budgets.py ") + return 2 + result = _parse_result(Path(sys.argv[1])) + budget = json.loads(Path(sys.argv[2]).read_text()) + + failures = [] + if result.get("latency_p95_ms", 0.0) > budget["latency_p95_ms_max"]: + failures.append("latency_p95_ms") + if result.get("success_rate", 0.0) < budget["success_rate_min"]: + failures.append("success_rate") + if result.get("rps", 0.0) < budget["rps_min"]: + failures.append("rps") + + if failures: + print("budget check failed:", ", ".join(failures)) + print(result) + return 1 + print("budget check passed") + print(result) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/benchmarks/perf_budgets.json b/benchmarks/perf_budgets.json new file mode 100644 index 0000000..ac9b406 --- /dev/null +++ b/benchmarks/perf_budgets.json @@ -0,0 +1,5 @@ +{ + "latency_p95_ms_max": 1200.0, + "success_rate_min": 0.9, + "rps_min": 5.0 +} diff --git a/benchmarks/proxy_benchmark.py b/benchmarks/proxy_benchmark.py new file mode 100644 index 0000000..8893821 --- /dev/null +++ b/benchmarks/proxy_benchmark.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import statistics +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from typing import Dict, List, Tuple +from urllib.parse import urlparse + +import requests + + +@dataclass +class Sample: + status_code: int + latency_ms: float + ok: bool + + +def _single_request(proxy_url: str, target_url: str, timeout: float) -> Sample: + start = time.perf_counter() + try: + resp = requests.get( + target_url, + timeout=timeout, + proxies={"http": proxy_url, "https": proxy_url}, + verify=False, + ) + latency_ms = (time.perf_counter() - start) * 1000.0 + return Sample(status_code=resp.status_code, latency_ms=latency_ms, ok=True) + except Exception: + latency_ms = (time.perf_counter() - start) * 1000.0 + return Sample(status_code=0, latency_ms=latency_ms, ok=False) + + +def percentile(values: List[float], pct: float) -> float: + if not values: + return 0.0 + ordered = sorted(values) + idx = int((pct / 100.0) * (len(ordered) - 1)) + return ordered[idx] + + +def run_benchmark( + proxy_url: str, + target_url: str, + requests_count: int, + concurrency: int, + timeout: float, +) -> Dict[str, float]: + latencies: List[float] = [] + success = 0 + started = time.perf_counter() + with ThreadPoolExecutor(max_workers=concurrency) as pool: + futures = [ + pool.submit(_single_request, proxy_url, target_url, timeout) + for _ in range(requests_count) + ] + for f in as_completed(futures): + sample = f.result() + latencies.append(sample.latency_ms) + if sample.ok: + success += 1 + elapsed = time.perf_counter() - started + rps = (requests_count / elapsed) if elapsed > 0 else 0.0 + return { + "requests": float(requests_count), + "success_rate": (success / requests_count) if requests_count else 0.0, + "rps": rps, + "latency_p50_ms": percentile(latencies, 50.0), + "latency_p95_ms": percentile(latencies, 95.0), + "latency_mean_ms": statistics.mean(latencies) if latencies else 0.0, + "duration_s": elapsed, + } + + +def _args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Proxy benchmark harness") + parser.add_argument("--proxy", default="http://127.0.0.1:8080") + parser.add_argument("--target", default="https://example.com") + parser.add_argument("--requests", type=int, default=200) + parser.add_argument("--concurrency", type=int, default=20) + parser.add_argument("--timeout", type=float, default=8.0) + return parser.parse_args() + + +def main() -> int: + args = _args() + parsed = urlparse(args.proxy) + if not parsed.scheme or not parsed.netloc: + raise SystemExit(f"Invalid proxy URL: {args.proxy}") + result = run_benchmark( + proxy_url=args.proxy, + target_url=args.target, + requests_count=args.requests, + concurrency=args.concurrency, + timeout=args.timeout, + ) + print("benchmark_result") + for k, v in result.items(): + print(f"{k}={v}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/core/interceptor.py b/core/interceptor.py index 8f56e6e..bf68dd8 100644 --- a/core/interceptor.py +++ b/core/interceptor.py @@ -4,20 +4,17 @@ import datetime import ipaddress import os -import queue -import random import re import select import socket import ssl -import struct import tempfile import threading import time -from dataclasses import dataclass, field +from collections import deque from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable, Deque, Dict, List, Optional, Tuple from urllib.parse import parse_qs, urlparse from cryptography import x509 @@ -33,80 +30,40 @@ import h2.events import h2.exceptions H2_AVAILABLE = True -except ImportError: - H2_AVAILABLE = False +except ImportError: # pragma: no cover - optional dependency + H2_AVAILABLE = False # pragma: no cover try: - import aioquic.quic.configuration - import aioquic.quic.connection - import aioquic.quic.events - import asyncio + import aioquic # noqa: F401 # pragma: no cover AIOQUIC_AVAILABLE = True -except ImportError: - AIOQUIC_AVAILABLE = False +except Exception: # pragma: no cover - optional dependency + AIOQUIC_AVAILABLE = False # pragma: no cover from chemistry.tcp_options import TCPOptionsManipulator from chemistry.tls_rotator import TLSFingerprinter from chemistry.tor_rotator import TorRotator from chemistry.proxy_rotator import ProxyRotator - - -@dataclass -class InterceptedRequest: - method: str = "GET" - url: str = "" - path: str = "" - host: str = "" - port: int = 80 - headers: dict = field(default_factory=dict) - body: bytes = b"" - cookies: dict = field(default_factory=dict) - query_params: dict = field(default_factory=dict) - timestamp: float = 0.0 - source_pid: Optional[int] = None - source_tool: Optional[str] = None - is_tunnel: bool = False - is_https: bool = False - sni_hostname: Optional[str] = None - tls_version: Optional[str] = None - http_version: str = "1.1" - - -@dataclass -class InterceptedResponse: - status_code: int = 0 - status_text: str = "" - headers: dict = field(default_factory=dict) - body: bytes = b"" - cookies: dict = field(default_factory=dict) - response_time: float = 0.0 - timestamp: float = 0.0 - is_https: bool = False - tls_version: Optional[str] = None - http_version: str = "1.1" - - -@dataclass -class ProxyRecord: - request: InterceptedRequest = field(default_factory=InterceptedRequest) - response: InterceptedResponse = field(default_factory=InterceptedResponse) - technique_applied: str = "" - passed: bool = False - blocked: bool = False - total_time: float = 0.0 - intercepted_https: bool = False - decryption_successful: bool = False - - -@dataclass -class AdvisorDecision: - action: str = "forward" - technique: str = "" - delay: float = 0.0 - rotate_ip: bool = False - reason: str = "" - forward_response: bool = True - next_protocol: Optional[str] = None +from core.models import AdvisorDecision, InterceptedRequest, InterceptedResponse, ProxyRecord +from core.pipeline import Forwarder, Magic, ResponseAdvisor + +__all__ = [ + "AdvisorDecision", + "InterceptedRequest", + "InterceptedResponse", + "ProxyRecord", + "CertificateAuthority", + "H2Connection", + "H1Parser", + "TLSContextFactory", + "MITMHandshaker", + "H2SessionHandler", + "ResponseAdvisor", + "Magic", + "Forwarder", + "ThreadedHTTPServer", + "Interceptor", + "create_interceptor", +] class CertificateAuthority: @@ -141,7 +98,7 @@ def _create_ca(self): x509.NameAttribute(NameOID.ORGANIZATION_NAME, "EvilWAF MITM Proxy"), x509.NameAttribute(NameOID.COMMON_NAME, "evilwaf-ca"), ]) - now = datetime.datetime.utcnow() + now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) ca_cert = ( x509.CertificateBuilder() .subject_name(subject) @@ -254,7 +211,7 @@ def _generate_host_certificate(self, hostname: str) -> Tuple[str, str]: except Exception: pass subject_attrs.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, "EvilWAF Proxy")) - now = datetime.datetime.utcnow() + now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) cert = ( x509.CertificateBuilder() .subject_name(x509.Name(subject_attrs)) @@ -557,10 +514,8 @@ class TLSContextFactory: ) @classmethod - def client_context(cls, alpn: List[str] = None) -> ssl.SSLContext: - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE + def client_context(cls, alpn: Optional[List[str]] = None) -> ssl.SSLContext: + ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) ctx.minimum_version = ssl.TLSVersion.TLSv1_2 try: ctx.set_ciphers(cls.CIPHERS) @@ -571,11 +526,14 @@ def client_context(cls, alpn: List[str] = None) -> ssl.SSLContext: return ctx @classmethod - def server_context(cls, cert_path: str, key_path: str, alpn: List[str] = None) -> ssl.SSLContext: + def server_context( + cls, + cert_path: str, + key_path: str, + alpn: Optional[List[str]] = None, + ) -> ssl.SSLContext: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ctx.load_cert_chain(certfile=cert_path, keyfile=key_path) - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE ctx.minimum_version = ssl.TLSVersion.TLSv1_2 try: ctx.set_ciphers(cls.CIPHERS) @@ -1012,127 +970,6 @@ def _relay_raw(self) -> List[ProxyRecord]: return [] -class ResponseAdvisor: - ROTATE_ON = (429, 503, 509) - RETRY_TECH = (403, 406, 418) - PASS = (200, 201, 204, 301, 302, 304, 404) - - def __init__(self, magic: "Magic", max_retries: int = 3, retry_delay: float = 1.5): - self._magic = magic - self._max = max_retries - self._delay = retry_delay - self._counts: Dict[str, int] = {} - self._lock = threading.Lock() - - def advise(self, response: InterceptedResponse, request: InterceptedRequest, record: ProxyRecord) -> AdvisorDecision: - code = response.status_code - if code in self.PASS: - self._reset(request.host) - return AdvisorDecision(action="forward", reason=f"{code} pass") - if code in self.RETRY_TECH: - return self._retry(request, record, reason=f"{code} waf block") - if code in self.ROTATE_ON: - return self._rotate_and_retry(response, request, record) - return AdvisorDecision(action="forward", reason=f"{code} default") - - def _retry(self, request: InterceptedRequest, record: ProxyRecord, reason: str) -> AdvisorDecision: - if not self._has_left(request.host): - return AdvisorDecision(action="forward", reason="max retries") - self._inc(request.host) - return AdvisorDecision(action="retry", delay=self._delay, reason=reason, forward_response=False) - - def _rotate_and_retry(self, response: InterceptedResponse, request: InterceptedRequest, record: ProxyRecord) -> AdvisorDecision: - if not self._has_left(request.host): - return AdvisorDecision(action="forward", reason="max retries") - delay = self._get_delay(response) - self._inc(request.host) - return AdvisorDecision(action="rotate_and_retry", delay=delay, rotate_ip=True, reason="rate limited", forward_response=False) - - def _has_left(self, host: str) -> bool: - with self._lock: - return self._counts.get(host, 0) < self._max - - def _inc(self, host: str): - with self._lock: - self._counts[host] = self._counts.get(host, 0) + 1 - - def _reset(self, host: str): - with self._lock: - self._counts.pop(host, None) - - def _get_delay(self, response: InterceptedResponse) -> float: - ra = response.headers.get("retry-after", "").strip() - if ra.isdigit(): - return min(float(ra), 60.0) - return self._delay - - -class Magic: - def __init__(self, tcp: Optional[TCPOptionsManipulator] = None, tls: Optional[TLSFingerprinter] = None, tor: Optional[TorRotator] = None): - self._tcp = tcp or TCPOptionsManipulator() - self._tls = tls or TLSFingerprinter() - self._tor = tor or TorRotator() - self._lock = threading.Lock() - self._request_count = 0 - - def apply(self, technique: str = "") -> Dict[str, Any]: - with self._lock: - self._request_count += 1 - tcp_opts = self._tcp.per_request_options() - tls_sess, tls_id = self._tls.paired_with_tcp(tcp_opts.get("profile", "")) - result = { - "tcp": tcp_opts, - "tls": {"session": tls_sess, "identifier": tls_id}, - "tor": {}, - } - if technique == "ip_rotation" or self._tor.should_rotate(self._request_count): - if self._tor.is_tor_alive(): - ok, ip = self._tor.rotate_and_verify() - result["tor"] = {"active": ok, "ip": ip, "proxies": self._tor.get_proxy_dict()} - return result - - def _bind_to_tor(self) -> Dict[str, Any]: - if not self._tor.is_tor_alive(): - return {"active": False} - ok, ip = self._tor.rotate_and_verify() - return {"active": ok, "ip": ip, "proxies": self._tor.get_proxy_dict()} - - def error_solver(self, error: Exception, context: str = "") -> bool: - if isinstance(error, ssl.SSLError): - try: - self._tls.rotate() - except Exception: - pass - if isinstance(error, (ConnectionResetError, BrokenPipeError, TimeoutError)): - try: - self._tcp.rotate() - except Exception: - pass - return True - - -class Forwarder: - def forward(self, response: InterceptedResponse, handler: BaseHTTPRequestHandler) -> bool: - try: - if response.status_code == 0: - response.status_code = 502 - response.status_text = "Bad Gateway" - handler.send_response(response.status_code, response.status_text) - skip = {"transfer-encoding", "connection", "keep-alive"} - for k, v in response.headers.items(): - if k.lower() not in skip: - handler.send_header(k, v) - handler.send_header("Connection", "close") - if response.body: - handler.send_header("Content-Length", str(len(response.body))) - handler.end_headers() - if response.body and handler.command != "HEAD": - handler.wfile.write(response.body) - return True - except Exception: - return False - - class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): daemon_threads = True allow_reuse_address = True @@ -1150,6 +987,7 @@ def __init__( override_ip: Optional[str] = None, target_host: Optional[str] = None, upstream_proxies: Optional[List[str]] = None, + record_limit: int = 20000, ): self._host = listen_host self._port = listen_port @@ -1157,13 +995,17 @@ def __init__( self._running = False self._server: Optional[ThreadedHTTPServer] = None self._thread: Optional[threading.Thread] = None - self._records: List[ProxyRecord] = [] + self._records: Deque[ProxyRecord] = deque(maxlen=max(1000, record_limit)) self._records_lock = threading.Lock() self._proxy_rotator = ProxyRotator(proxy_urls=upstream_proxies) if upstream_proxies else None self.ca = CertificateAuthority() - self._handshaker = MITMHandshaker(self.ca, proxy_rotator=self._proxy_rotator) + self._handshaker = MITMHandshaker( + self.ca, + override_ip=self._override_ip, + proxy_rotator=self._proxy_rotator, + ) self._forwarder = Forwarder() self._tor = TorRotator( @@ -1173,7 +1015,12 @@ def __init__( ) self._tcp_manip = TCPOptionsManipulator() self._tls_fp = TLSFingerprinter() - self._magic = Magic(tcp=self._tcp_manip, tls=self._tls_fp, tor=self._tor) + self._magic = Magic( + tcp=self._tcp_manip, + tls=self._tls_fp, + tor=self._tor, + rotate_every=tor_rotate_every, + ) self._advisor = ResponseAdvisor(self._magic) self.intercept_https = intercept_https @@ -1216,7 +1063,8 @@ def _process_http_request(self, req: InterceptedRequest) -> InterceptedResponse: req.host = host req.port = port - sock = self._create_upstream_connection(host, port, timeout=30) + connect_host = self._override_ip or host + sock = self._create_upstream_connection(connect_host, port, timeout=30) if parsed.scheme == "https": ctx = TLSContextFactory.client_context(alpn=["http/1.1"]) @@ -1378,8 +1226,6 @@ def do_CONNECT(self): parts = self.path.split(":") remote_host = parts[0] remote_port = int(parts[1]) if len(parts) > 1 else 443 - start_time = time.time() - if interceptor_ref.intercept_https: self.send_response(200, "Connection Established") self.send_header("Proxy-Agent", "EvilWAF") @@ -1496,6 +1342,7 @@ def create_interceptor( override_ip: Optional[str] = None, target_host: Optional[str] = None, upstream_proxies: Optional[List[str]] = None, + record_limit: int = 20000, ) -> Interceptor: return Interceptor( listen_host=listen_host, @@ -1507,5 +1354,6 @@ def create_interceptor( override_ip=override_ip, target_host=target_host, upstream_proxies=upstream_proxies, + record_limit=record_limit, ) - \ No newline at end of file + diff --git a/docs/DEVELOPMENT_PLAN.md b/docs/DEVELOPMENT_PLAN.md new file mode 100644 index 0000000..b697617 --- /dev/null +++ b/docs/DEVELOPMENT_PLAN.md @@ -0,0 +1,46 @@ +# Project Development Plan + +## Scope +This plan targets performance, maintainability, and reliability while keeping existing proxy and recon behavior intact. + +## Priority Areas +1. `core/interceptor.py` complexity and coupling. +2. `chemistry/origin_server_ip.py` scanner orchestration cost and synchronous bottlenecks. +3. Long-run memory usage from in-memory traffic records. +4. Limited observability for latency, throughput, and retries. +5. Gradual typing debt in legacy modules. + +## Delivery Plan + +### Milestone 1: Baseline Instrumentation +- Add benchmark harness and budget checks. +- Add CPU/memory profiling scripts for repeatable diagnostics. +- Track p50/p95 latency, success rate, and RPS. + +### Milestone 2: Hot Path Decomposition +- Split interceptor path into dedicated components: + - connection lifecycle + - HTTP parsing/building + - retry/advisor flow + - record sink +- Keep current public API stable. + +### Milestone 3: Recon Pipeline Optimization +- Move scanner scheduling to bounded async execution. +- Add per-source timeout policy and cancellation. +- Cache stable lookups (DNS/ASN/CT) with TTL and de-duplication. + +### Milestone 4: Memory and Stability +- Keep bounded in-memory records (already added via `record_limit`). +- Add optional file-backed record sink for long sessions. +- Add payload-size guardrails for non-critical processing paths. + +### Milestone 5: Quality Hardening +- Increase mypy strict coverage module-by-module. +- Keep 100% line coverage in `core` and `chemistry`. +- Add performance non-regression acceptance criteria in CI. + +## Success Metrics +- p95 latency improvement >= 20% under benchmark profile. +- Peak memory reduction >= 30% in long-running sessions. +- Zero functional regressions in existing tests and workflows. diff --git a/scripts/profile_cpu.sh b/scripts/profile_cpu.sh new file mode 100644 index 0000000..1a8540a --- /dev/null +++ b/scripts/profile_cpu.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [[ "${1:-}" == "" ]]; then + echo "usage: scripts/profile_cpu.sh " + exit 2 +fi + +TARGET_URL="$1" +PROXY_HOST="${PROXY_HOST:-127.0.0.1}" +PROXY_PORT="${PROXY_PORT:-8080}" + +python3 -m cProfile -o cpu.prof evilwaf.py -t "${TARGET_URL}" --no-tui --listen-host "${PROXY_HOST}" --listen-port "${PROXY_PORT}" & +PID=$! +sleep 3 +python3 benchmarks/proxy_benchmark.py --proxy "http://${PROXY_HOST}:${PROXY_PORT}" --target "${TARGET_URL}" --requests 100 --concurrency 10 || true +kill "${PID}" || true +wait "${PID}" || true +python3 - <<'PY' +import pstats +s = pstats.Stats("cpu.prof") +s.sort_stats("cumtime").print_stats(30) +PY diff --git a/scripts/profile_memory.sh b/scripts/profile_memory.sh new file mode 100644 index 0000000..97db4db --- /dev/null +++ b/scripts/profile_memory.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [[ "${1:-}" == "" ]]; then + echo "usage: scripts/profile_memory.sh " + exit 2 +fi + +TARGET_URL="$1" +PROXY_HOST="${PROXY_HOST:-127.0.0.1}" +PROXY_PORT="${PROXY_PORT:-8080}" + +python3 -m pip install -q memory_profiler +python3 -m memory_profiler evilwaf.py -t "${TARGET_URL}" --no-tui --listen-host "${PROXY_HOST}" --listen-port "${PROXY_PORT}" & +PID=$! +sleep 3 +python3 benchmarks/proxy_benchmark.py --proxy "http://${PROXY_HOST}:${PROXY_PORT}" --target "${TARGET_URL}" --requests 100 --concurrency 10 || true +kill "${PID}" || true +wait "${PID}" || true