diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index f7627fc5..dda41a10 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -92,6 +92,7 @@ jobs: - name: Run E2E tests env: RUNPOD_API_KEY: ${{ secrets.RUNPOD_API_KEY }} + FLASH_SDK_GIT_REF: ${{ github.sha }} run: | uv run pytest e2e/ \ ${{ inputs.tests != '' && format('-k "{0}"', inputs.tests) || '' }} \ diff --git a/e2e/conftest.py b/e2e/conftest.py index 0df6efe4..f939fb89 100644 --- a/e2e/conftest.py +++ b/e2e/conftest.py @@ -7,14 +7,21 @@ import asyncio import os import pickle +import sys from pathlib import Path -import pytest +# Ensure the e2e/ directory is on sys.path so test files can import local +# modules (provisioner, etc.) regardless of how pytest resolves the rootdir. +_E2E_DIR = str(Path(__file__).parent) +if _E2E_DIR not in sys.path: + sys.path.insert(0, _E2E_DIR) + +import pytest # noqa: E402 try: - import tomllib + import tomllib # noqa: E402 except ImportError: - import tomli as tomllib # type: ignore[no-redef] + import tomli as tomllib # type: ignore[no-redef] # noqa: E402 def _api_key_from_config() -> str | None: @@ -25,7 +32,8 @@ def _api_key_from_config() -> str | None: try: data = tomllib.loads(config_file.read_text()) return data.get("default", {}).get("api_key") - except Exception: + except Exception as exc: + print(f"Warning: could not parse ~/.runpod/config.toml: {exc}") return None @@ -38,29 +46,38 @@ def endpoint_id_from_state(project_dir: Path) -> str: The state file is a (resources_dict, config_hashes_dict) tuple. resources_dict keys are "ResourceType:name", values are resource objects with .id. + + Raises FileNotFoundError if the state file is missing (deploy did not complete). + Raises ValueError if the file exists but contains no endpoint ID (format may have changed). """ state_file = project_dir / ".flash" / "resources.pkl" if not state_file.exists(): raise FileNotFoundError(f"State file not found: {state_file}") - with open(state_file, "rb") as f: - data = pickle.load(f) + try: + with open(state_file, "rb") as f: + data = pickle.load(f) + except Exception as exc: + raise ValueError( + f"Failed to deserialize state file {state_file} — " + f"the .flash/resources.pkl format may have changed: {exc}" + ) from exc resources = data[0] if isinstance(data, tuple) else data for _key, resource in resources.items(): endpoint_id = getattr(resource, "id", None) if endpoint_id: return endpoint_id - raise ValueError(f"No endpoint ID found in state file. Keys: {list(resources)}") + raise ValueError( + f"No endpoint ID found in state file {state_file}. " + f"Keys present: {list(resources)}. " + f"Check that the resource object has an 'id' attribute." + ) -def sweep_endpoints(api_key: str) -> None: - """Delete all endpoints on the account. +def sweep_endpoints(api_key: str, *, prefix: str = "flash-qa-") -> None: + """Delete endpoints whose names start with prefix. - The e2e RUNPOD_API_KEY is dedicated to testing. Call this in every test's - finally block to ensure quota is fully released regardless of whether the - graceful undeploy succeeded. - - To restrict cleanup to smoke-test endpoints only, swap the list comprehension: - endpoints = [ep for ep in endpoints if ep.get("name", "").startswith("flash-qa-smoke-")] + Defaults to "flash-qa-" so only test-created endpoints are removed. + Pass prefix="" to delete all endpoints on the account (use with caution). """ from runpod_flash.core.api.runpod import RunpodGraphQLClient @@ -69,7 +86,12 @@ async def _run(key: str) -> None: result = await client._execute_graphql( "query { myself { endpoints { id name } } }" ) - endpoints = result.get("myself", {}).get("endpoints", []) + all_endpoints = result.get("myself", {}).get("endpoints", []) + endpoints = [ + ep + for ep in all_endpoints + if not prefix or ep.get("name", "").startswith(prefix) + ] for ep in endpoints: eid, ename = ep["id"], ep.get("name", ep["id"]) try: @@ -95,3 +117,9 @@ def restore_real_credentials(monkeypatch: pytest.MonkeyPatch) -> None: ) else: pytest.skip("No credentials available — skipping E2E test") + + +@pytest.fixture +def api_key() -> str: + """Return the RunPod API key for tests that need to pass it explicitly.""" + return _REAL_API_KEY # type: ignore[return-value] # guaranteed set by restore_real_credentials autouse diff --git a/e2e/provisioner.py b/e2e/provisioner.py new file mode 100644 index 00000000..c064c039 --- /dev/null +++ b/e2e/provisioner.py @@ -0,0 +1,122 @@ +"""Endpoint provisioner for E2E session-scoped fixtures. + +provision() deploys a Flash worker and returns its endpoint_id. +All shared endpoints are deployed in parallel at session start. + +Git ref injection +----------------- +Set FLASH_SDK_GIT_REF to a commit SHA or branch name to install that exact +version of runpod-flash inside the worker container instead of the latest +PyPI release. In CI, set this to github.sha so workers run the branch under +test rather than the last published release. + + FLASH_SDK_GIT_REF=${{ github.sha }} # in CI workflow +""" + +import os +import shutil +import subprocess +import tempfile +from pathlib import Path + +from conftest import endpoint_id_from_state + +# --------------------------------------------------------------------------- +# Git ref injection +# --------------------------------------------------------------------------- + +FLASH_GIT_REF: str = os.environ.get("FLASH_SDK_GIT_REF", "") +FLASH_LOCAL_PATH: str = os.environ.get("FLASH_SDK_LOCAL_PATH", "") +_FLASH_REPO = "https://github.com/runpod/runpod-flash" + + +def flash_dep() -> str: + """Return the runpod-flash pip requirement string for worker pyproject.toml. + + CI (FLASH_SDK_GIT_REF set): installs the exact commit under test. + Local dev with local path (FLASH_SDK_LOCAL_PATH set): installs from local + checkout — useful when the fix is not yet on PyPI and the git repo is private. + Local dev (unset): installs the latest PyPI release. + """ + if FLASH_LOCAL_PATH: + return f"runpod-flash @ file://{FLASH_LOCAL_PATH}" + if FLASH_GIT_REF: + return f"runpod-flash @ git+{_FLASH_REPO}@{FLASH_GIT_REF}" + return "runpod-flash" + + +# --------------------------------------------------------------------------- +# Provisioner +# --------------------------------------------------------------------------- + +_PYPROJECT_TMPL = """\ +[project] +name = "{name}" +version = "0.1.0" +requires-python = ">=3.11,<3.13" +dependencies = [{deps}] +""" + + +def provision( + worker_code: str, + *, + name: str, + api_key: str, + extra_deps: list[str] | None = None, + deploy_timeout: int = 600, +) -> tuple[str, Path]: + """Deploy a Flash worker and return (endpoint_id, project_dir). + + The returned project_dir is a temporary directory that owns the .flash + state. The caller is responsible for cleanup — call shutil.rmtree() on + project_dir when the endpoint is no longer needed. + + Args: + worker_code: Python source of the worker file. + name: Endpoint name (must be unique per CI run). + api_key: RunPod API key passed explicitly to the subprocess env. + extra_deps: Additional pip requirements (beyond runpod-flash). + deploy_timeout: Seconds before subprocess.run times out. + + Returns: + (endpoint_id, project_dir) + + Raises: + RuntimeError: If flash deploy exits non-zero. + """ + deps = [flash_dep()] + if extra_deps: + deps.extend(extra_deps) + deps_quoted = ", ".join(f'"{d}"' for d in deps) + pyproject = _PYPROJECT_TMPL.format(name=name, deps=deps_quoted) + + tmp_dir = Path(tempfile.mkdtemp(prefix=f"flash-e2e-{name}-")) + (tmp_dir / "worker.py").write_text(worker_code) + (tmp_dir / "pyproject.toml").write_text(pyproject) + + env = os.environ.copy() + env["RUNPOD_API_KEY"] = api_key # explicit — does not depend on autouse fixture + + try: + result = subprocess.run( + ["uv", "run", "flash", "deploy"], + cwd=tmp_dir, + env=env, + capture_output=True, + text=True, + timeout=deploy_timeout, + ) + except Exception: + shutil.rmtree(tmp_dir, ignore_errors=True) + raise + + if result.returncode != 0: + shutil.rmtree(tmp_dir, ignore_errors=True) + raise RuntimeError( + f"flash deploy failed for '{name}' (exit {result.returncode}):\n" + f"stdout: {result.stdout}\nstderr: {result.stderr}" + ) + + endpoint_id = endpoint_id_from_state(tmp_dir) + return endpoint_id, tmp_dir diff --git a/e2e/test_cpu_smoke.py b/e2e/test_cpu_smoke.py index 89471206..8b4f57c3 100644 --- a/e2e/test_cpu_smoke.py +++ b/e2e/test_cpu_smoke.py @@ -11,6 +11,7 @@ import runpod from conftest import endpoint_id_from_state, sweep_endpoints +from provisioner import flash_dep WORKER_NAME = f"flash-qa-smoke-{uuid.uuid4().hex[:8]}" @@ -28,7 +29,7 @@ async def echo(msg: str = "") -> dict: name = "{WORKER_NAME}" version = "0.1.0" requires-python = ">=3.11,<3.13" -dependencies = ["runpod-flash"] +dependencies = ["{flash_dep()}"] ''' @@ -36,14 +37,12 @@ class TestCpuSmoke: """CPU smoke: deploy → invoke → undeploy.""" def test_deploy_invoke_undeploy(self, tmp_path: Path) -> None: - """Deploy a minimal CPU worker, invoke it, verify output, undeploy.""" env = os.environ.copy() (tmp_path / "worker.py").write_text(WORKER_CODE) (tmp_path / "pyproject.toml").write_text(PYPROJECT_TOML) try: - # Deploy result = subprocess.run( ["uv", "run", "flash", "deploy"], cwd=tmp_path, @@ -59,8 +58,7 @@ def test_deploy_invoke_undeploy(self, tmp_path: Path) -> None: endpoint_id = endpoint_id_from_state(tmp_path) - # Invoke - runpod.api_key = env.get("RUNPOD_API_KEY") + runpod.api_key = env["RUNPOD_API_KEY"] output = runpod.Endpoint(endpoint_id).run_sync( {"msg": "smoke"}, timeout=180 ) @@ -70,7 +68,7 @@ def test_deploy_invoke_undeploy(self, tmp_path: Path) -> None: assert output.get("status") == "ok", f"Unexpected status: {output}" finally: - # Attempt graceful undeploy first + # Exercise the undeploy CLI path; sweep catches any quota leak if this fails. try: undeploy = subprocess.run( ["uv", "run", "flash", "undeploy", WORKER_NAME, "--force"], @@ -88,6 +86,5 @@ def test_deploy_invoke_undeploy(self, tmp_path: Path) -> None: except subprocess.TimeoutExpired: print("WARNING: undeploy timed out after 60s") - # Always sweep all endpoints — dedicated e2e account, stale - # endpoints hit the worker quota on subsequent runs. + # Sweep flash-qa-* endpoints — stale endpoints exhaust worker quota. sweep_endpoints(env["RUNPOD_API_KEY"]) diff --git a/e2e/test_cpu_suite.py b/e2e/test_cpu_suite.py new file mode 100644 index 00000000..266da29c --- /dev/null +++ b/e2e/test_cpu_suite.py @@ -0,0 +1,284 @@ +"""Shared CPU E2E suite — session-scoped endpoint pool. + +Provisions four CPU endpoints once at session start and shares them across +all test classes. Undeploys at session teardown via sweep_endpoints. + +Workers: + qb_endpoint — QB function: echo(msg) → dict + deps_endpoint — QB with numpy/pandas deps + class_endpoint — QB with a class-based handler + lb_endpoint — LB with /health and /echo routes +""" + +import concurrent.futures +import shutil +import uuid +from typing import Generator + +import httpx +import pytest +import runpod + +from conftest import _REAL_API_KEY, sweep_endpoints +from provisioner import provision + +# --------------------------------------------------------------------------- +# Worker code templates +# --------------------------------------------------------------------------- + + +def _qb_echo_worker(name: str) -> str: + return f'''\ +from runpod_flash import Endpoint + + +@Endpoint(name="{name}", cpu="cpu3c-1-2", workers=(0, 1)) +async def echo(msg: str = "") -> dict: + return {{"echo": msg}} +''' + + +def _qb_deps_worker(name: str) -> str: + return f'''\ +import numpy as np +import pandas as pd +from runpod_flash import Endpoint + + +@Endpoint(name="{name}", cpu="cpu3c-1-2", workers=(0, 1)) +async def compute(x: float = 1.0) -> dict: + arr = np.array([x, x * 2, x * 3]) + df = pd.DataFrame({{"vals": arr}}) + return {{"mean": float(arr.mean()), "sum": float(df["vals"].sum())}} +''' + + +def _qb_class_worker(name: str) -> str: + return f'''\ +from runpod_flash import Endpoint + + +@Endpoint(name="{name}", cpu="cpu3c-1-2", workers=(0, 1)) +class Greeter: + def greet(self, name: str = "world") -> dict: + return {{"greeting": f"Hello, {{name}}!"}} +''' + + +def _lb_worker(name: str) -> str: + return f'''\ +from runpod_flash import Endpoint + +api = Endpoint(name="{name}", cpu="cpu3c-1-2", workers=(1, 2)) + + +@api.get("/health") +async def health() -> dict: + return {{"status": "healthy"}} + + +@api.post("/echo") +async def echo(data: dict) -> dict: + return {{"echo": data}} +''' + + +# --------------------------------------------------------------------------- +# Session fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session") +def _session_api_key() -> Generator[str, None, None]: + if not _REAL_API_KEY: + pytest.skip("No credentials available — skipping E2E test") + runpod.api_key = _REAL_API_KEY + yield _REAL_API_KEY + sweep_endpoints(_REAL_API_KEY) + + +def _provision_named( + code_fn, + base_name: str, + api_key: str, + extra_deps: list[str] | None = None, +): + name = f"{base_name}-{uuid.uuid4().hex[:8]}" + return provision(code_fn(name), name=name, api_key=api_key, extra_deps=extra_deps) + + +@pytest.fixture(scope="session") +def qb_endpoint(_session_api_key: str) -> Generator[str, None, None]: + endpoint_id, tmp_dir = _provision_named( + _qb_echo_worker, "flash-qa-qb", _session_api_key + ) + yield endpoint_id + shutil.rmtree(tmp_dir, ignore_errors=True) + + +@pytest.fixture(scope="session") +def deps_endpoint(_session_api_key: str) -> Generator[str, None, None]: + endpoint_id, tmp_dir = _provision_named( + _qb_deps_worker, + "flash-qa-deps", + _session_api_key, + extra_deps=["numpy", "pandas"], + ) + yield endpoint_id + shutil.rmtree(tmp_dir, ignore_errors=True) + + +@pytest.fixture(scope="session") +def class_endpoint(_session_api_key: str) -> Generator[str, None, None]: + endpoint_id, tmp_dir = _provision_named( + _qb_class_worker, "flash-qa-class", _session_api_key + ) + yield endpoint_id + shutil.rmtree(tmp_dir, ignore_errors=True) + + +@pytest.fixture(scope="session") +def lb_endpoint(_session_api_key: str) -> Generator[str, None, None]: + endpoint_id, tmp_dir = _provision_named(_lb_worker, "flash-qa-lb", _session_api_key) + yield endpoint_id + shutil.rmtree(tmp_dir, ignore_errors=True) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _lb_get(endpoint_id: str, path: str, api_key: str, timeout: float = 120.0): + url = f"https://{endpoint_id}.api.runpod.ai{path}" + resp = httpx.get( + url, headers={"Authorization": f"Bearer {api_key}"}, timeout=timeout + ) + resp.raise_for_status() + return resp.json() + + +def _lb_post( + endpoint_id: str, path: str, payload: dict, api_key: str, timeout: float = 120.0 +): + url = f"https://{endpoint_id}.api.runpod.ai{path}" + resp = httpx.post( + url, + json=payload, + headers={"Authorization": f"Bearer {api_key}"}, + timeout=timeout, + ) + resp.raise_for_status() + return resp.json() + + +def _lb_url(endpoint_id: str, path: str) -> str: + return f"https://{endpoint_id}.api.runpod.ai{path}" + + +# --------------------------------------------------------------------------- +# QB function tests +# --------------------------------------------------------------------------- + + +class TestCpuQBFunction: + """QB function endpoint: echo(msg) → dict.""" + + def test_smoke(self, qb_endpoint: str) -> None: + out = runpod.Endpoint(qb_endpoint).run_sync({"msg": "smoke"}, timeout=180) + assert out is not None + assert out.get("echo") == "smoke" + + def test_empty_string(self, qb_endpoint: str) -> None: + out = runpod.Endpoint(qb_endpoint).run_sync({"msg": ""}, timeout=60) + assert out is not None + assert out.get("echo") == "" + + def test_unicode(self, qb_endpoint: str) -> None: + msg = "héllo wörld 🔥" + out = runpod.Endpoint(qb_endpoint).run_sync({"msg": msg}, timeout=60) + assert out is not None + assert out.get("echo") == msg + + +# --------------------------------------------------------------------------- +# Concurrent invocations +# --------------------------------------------------------------------------- + + +class TestCpuQBFunctionConcurrent: + """10 parallel invocations against a single QB endpoint.""" + + def test_ten_parallel_calls(self, qb_endpoint: str) -> None: + ep = runpod.Endpoint(qb_endpoint) + + def call(i: int): + return ep.run_sync({"msg": f"call-{i}"}, timeout=60) + + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool: + futures = [pool.submit(call, i) for i in range(10)] + results = [f.result() for f in concurrent.futures.as_completed(futures)] + + assert len(results) == 10 + assert all(r is not None for r in results) + echoed = {r["echo"] for r in results} + assert echoed == {f"call-{i}" for i in range(10)}, ( + f"Echo values don't match sent messages: {echoed}" + ) + + +# --------------------------------------------------------------------------- +# Dependency import test +# --------------------------------------------------------------------------- + + +class TestCpuQBFunctionDeps: + """QB endpoint that imports numpy and pandas.""" + + def test_numpy_pandas_available(self, deps_endpoint: str) -> None: + out = runpod.Endpoint(deps_endpoint).run_sync({"x": 2.0}, timeout=180) + assert out is not None + assert out.get("mean") == pytest.approx(4.0) + assert out.get("sum") == pytest.approx(12.0) + + +# --------------------------------------------------------------------------- +# Class-based QB handler +# --------------------------------------------------------------------------- + + +class TestCpuQBClass: + """QB endpoint with a class-based handler (single-method, auto-dispatched).""" + + def test_single_method_invocation(self, class_endpoint: str) -> None: + out = runpod.Endpoint(class_endpoint).run_sync({"name": "tester"}, timeout=180) + assert out is not None + assert out.get("greeting") == "Hello, tester!" + + +# --------------------------------------------------------------------------- +# LB endpoint tests +# --------------------------------------------------------------------------- + + +class TestCpuLBEndpoint: + """LB endpoint: custom GET and POST routes.""" + + def test_get_health(self, lb_endpoint: str, _session_api_key: str) -> None: + out = _lb_get(lb_endpoint, "/health", _session_api_key) + assert out is not None + assert out.get("status") == "healthy" + + def test_post_echo(self, lb_endpoint: str, _session_api_key: str) -> None: + payload = {"key": "value", "num": 42} + # FastAPI wraps named body parameters: {"data": } + out = _lb_post(lb_endpoint, "/echo", {"data": payload}, _session_api_key) + assert out is not None + assert out.get("echo") == payload + + def test_unauthorized_request(self, lb_endpoint: str) -> None: + """LB endpoint must reject requests with no Bearer token.""" + resp = httpx.get(_lb_url(lb_endpoint, "/health"), timeout=30.0) + assert resp.status_code in (401, 403), ( + f"Expected 401 or 403 for unauthenticated request, got {resp.status_code}" + ) diff --git a/e2e/test_gpu_smoke.py b/e2e/test_gpu_smoke.py new file mode 100644 index 00000000..cb49de68 --- /dev/null +++ b/e2e/test_gpu_smoke.py @@ -0,0 +1,80 @@ +"""GPU smoke test — deploy → invoke → undeploy on a GPU worker. + +Requires GPU quota on the account and a valid RUNPOD_API_KEY. +""" + +import os +import subprocess +import uuid +from pathlib import Path + +import runpod + +from conftest import endpoint_id_from_state, sweep_endpoints +from provisioner import flash_dep + +_WORKER_NAME = f"flash-qa-gpu-smoke-{uuid.uuid4().hex[:8]}" + +_WORKER_CODE = f'''\ +from runpod_flash import Endpoint + + +@Endpoint(name="{_WORKER_NAME}") +async def echo(msg: str = "") -> dict: + return {{"echo": msg, "status": "ok"}} +''' + +_PYPROJECT_TOML = f'''\ +[project] +name = "{_WORKER_NAME}" +version = "0.1.0" +requires-python = ">=3.11,<3.13" +dependencies = ["{flash_dep()}"] +''' + + +class TestGpuSmoke: + """GPU smoke: deploy → invoke → undeploy on a default GPU worker.""" + + def test_deploy_invoke_undeploy(self, tmp_path: Path, api_key: str) -> None: + env = os.environ.copy() + (tmp_path / "worker.py").write_text(_WORKER_CODE) + (tmp_path / "pyproject.toml").write_text(_PYPROJECT_TOML) + + try: + result = subprocess.run( + ["uv", "run", "flash", "deploy"], + cwd=tmp_path, + env=env, + capture_output=True, + text=True, + timeout=600, + ) + assert result.returncode == 0, ( + f"flash deploy failed (exit {result.returncode}):\n" + f"stdout: {result.stdout}\nstderr: {result.stderr}" + ) + + endpoint_id = endpoint_id_from_state(tmp_path) + runpod.api_key = env["RUNPOD_API_KEY"] + + output = runpod.Endpoint(endpoint_id).run_sync( + {"msg": "smoke"}, timeout=300 + ) + assert output is not None, "run_sync returned None" + assert output.get("echo") == "smoke", f"Unexpected output: {output}" + assert output.get("status") == "ok", f"Unexpected status: {output}" + + finally: + try: + subprocess.run( + ["uv", "run", "flash", "undeploy", _WORKER_NAME, "--force"], + cwd=tmp_path, + env=env, + capture_output=True, + text=True, + timeout=60, + ) + except subprocess.TimeoutExpired: + print("WARNING: GPU undeploy timed out after 60s") + sweep_endpoints(env["RUNPOD_API_KEY"]) diff --git a/e2e/test_redeploy.py b/e2e/test_redeploy.py new file mode 100644 index 00000000..4cd3fa14 --- /dev/null +++ b/e2e/test_redeploy.py @@ -0,0 +1,342 @@ +"""Redeploy E2E tests — rolling release and worker recycle verification. + +Each test manages its own deploy/undeploy. No session-scoped fixtures. + +Excluded from this file (known platform failures, tracked in Linear): + TestRedeployAlwaysOn, TestRedeployNoDowntime, TestRedeployInFlight + → single-slot always-on (workers=(1,1)) recycle not working (AE-2940/2941/2942) + → see test_redeploy_always_on.py +""" + +import concurrent.futures +import os +import subprocess +import threading +import time +import uuid +from contextlib import contextmanager +from pathlib import Path +from typing import Generator + +import runpod + +from conftest import endpoint_id_from_state, sweep_endpoints +from provisioner import flash_dep + +_RECYCLE_TIMEOUT = 300 # seconds — CPU worker recycle observed at >120s in practice +_IDLE_WAIT = 60 # seconds without requests after deploy so the worker can drain idle and trigger recycle + +# --------------------------------------------------------------------------- +# Worker code templates +# --------------------------------------------------------------------------- + +_BASE_PYPROJECT = """\ +[project] +name = "{name}" +version = "0.1.0" +requires-python = ">=3.11,<3.13" +dependencies = ["{dep}"] +""" + + +def _pyproject(name: str) -> str: + return _BASE_PYPROJECT.format(name=name, dep=flash_dep()) + + +def _versioned_worker(name: str, version: str, workers: str = "workers=(0, 1)") -> str: + """QB worker that returns version and RUNPOD_POD_ID for recycle verification.""" + return f'''\ +import os +from runpod_flash import Endpoint + +@Endpoint(name="{name}", cpu="cpu3c-1-2", {workers}) +async def echo(msg: str = "") -> dict: + return {{ + "version": "{version}", + "worker_id": os.environ.get("RUNPOD_POD_ID", "unknown"), + "msg": msg, + }} +''' + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _deploy(code: str, name: str, tmp_path: Path, env: dict, label: str = "") -> None: + (tmp_path / "worker.py").write_text(code) + r = subprocess.run( + ["uv", "run", "flash", "deploy"], + cwd=tmp_path, + env=env, + capture_output=True, + text=True, + timeout=300, + ) + tag = f"{label} " if label else "" + assert r.returncode == 0, f"{tag}deploy failed:\n{r.stdout}\n{r.stderr}" + + +def _undeploy(name: str, cwd: Path, env: dict) -> None: + try: + subprocess.run( + ["uv", "run", "flash", "undeploy", name, "--force"], + cwd=cwd, + env=env, + capture_output=True, + text=True, + timeout=60, + ) + except subprocess.TimeoutExpired: + print(f"WARNING: undeploy of {name} timed out after 60s") + + +def poll_until_version( + endpoint_id: str, + api_key: str, + target_version: str, + timeout: int, + interval: int = 5, +) -> tuple[float, dict]: + """Poll endpoint until it returns target_version or timeout elapses. + + Returns (elapsed_seconds_since_first_call, first_matching_response). + Raises TimeoutError if timeout elapses before target_version is seen. + """ + ep = runpod.Endpoint(endpoint_id) + deadline = time.monotonic() + timeout + start = time.monotonic() + while True: + try: + out = ep.run_sync({"msg": "poll"}, timeout=60) + if out and out.get("version") == target_version: + return time.monotonic() - start, out + except Exception as exc: + print(f"[poll_until_version] {exc}") + remaining = deadline - time.monotonic() + if remaining <= 0: + raise TimeoutError( + f"Version {target_version!r} not seen on {endpoint_id!r} within {timeout}s" + ) + time.sleep(min(interval, remaining)) + + +@contextmanager +def continuous_caller( + endpoint_id: str, + api_key: str, + interval: float = 1.0, +) -> Generator[list, None, None]: + """Call endpoint at regular intervals in a background thread. + + Yields a results list populated as calls complete: + [(timestamp_float, response_or_None, error_or_None), ...] + + Stops and joins when the context exits. + """ + results: list[tuple[float, dict | None, Exception | None]] = [] + stop = threading.Event() + + def _loop() -> None: + ep = runpod.Endpoint(endpoint_id) + while not stop.is_set(): + ts = time.monotonic() + try: + out = ep.run_sync({"msg": "continuous"}, timeout=60) + results.append((ts, out, None)) + except Exception as exc: + results.append((ts, None, exc)) + time.sleep(interval) + + t = threading.Thread(target=_loop, daemon=True) + t.start() + try: + yield results + finally: + stop.set() + t.join(timeout=15) + + +# --------------------------------------------------------------------------- +# Scale-to-zero: new code live after redeploy +# --------------------------------------------------------------------------- + + +class TestRedeployScaleToZero: + """workers=(0,1), CPU: v2 code is live after redeploy; worker ID changes.""" + + def test_new_code_live_after_redeploy(self, tmp_path: Path, api_key: str) -> None: + name = f"flash-qa-rdp-sto-{uuid.uuid4().hex[:8]}" + env = os.environ.copy() + (tmp_path / "pyproject.toml").write_text(_pyproject(name)) + runpod.api_key = api_key + + try: + _deploy(_versioned_worker(name, "v1"), name, tmp_path, env, "v1") + endpoint_id = endpoint_id_from_state(tmp_path) + + out_v1 = runpod.Endpoint(endpoint_id).run_sync( + {"msg": "check"}, timeout=180 + ) + assert out_v1 and out_v1.get("version") == "v1" + worker_id_v1 = out_v1["worker_id"] + + _deploy(_versioned_worker(name, "v2"), name, tmp_path, env, "v2") + time.sleep(_IDLE_WAIT) # let worker drain idle so the recycle can fire + + elapsed, out_v2 = poll_until_version( + endpoint_id, api_key, "v2", timeout=_RECYCLE_TIMEOUT, interval=30 + ) + print(f"[scale-to-zero] v2 live {elapsed:.1f}s after idle wait") + + assert out_v2["version"] == "v2" + assert out_v2["worker_id"] != worker_id_v1, ( + f"worker_id unchanged after redeploy: {worker_id_v1!r}" + ) + finally: + _undeploy(name, tmp_path, env) + sweep_endpoints(api_key) + + +# --------------------------------------------------------------------------- +# Multi-worker scale-to-zero: full cutover +# --------------------------------------------------------------------------- + + +class TestRedeployMultiWorker: + """workers=(0,4), CPU: multiple concurrent workers all serve v2 after redeploy.""" + + def test_full_cutover_multi_worker(self, tmp_path: Path, api_key: str) -> None: + name = f"flash-qa-rdp-mw-{uuid.uuid4().hex[:8]}" + env = os.environ.copy() + (tmp_path / "pyproject.toml").write_text(_pyproject(name)) + runpod.api_key = api_key + + try: + _deploy( + _versioned_worker(name, "v1", "workers=(0, 4)"), + name, + tmp_path, + env, + "v1", + ) + endpoint_id = endpoint_id_from_state(tmp_path) + + # Spin up multiple workers concurrently to populate the worker pool + ep = runpod.Endpoint(endpoint_id) + worker_ids_v1: set[str] = set() + with concurrent.futures.ThreadPoolExecutor(max_workers=8) as pool: + futs = [ + pool.submit(lambda: ep.run_sync({"msg": "spin-up"}, timeout=120)) + for _ in range(8) + ] + for fut in concurrent.futures.as_completed(futs): + out = fut.result() + if out: + worker_ids_v1.add(out.get("worker_id", "unknown")) + print(f"[multi-worker] v1 worker IDs seen: {worker_ids_v1}") + + _deploy( + _versioned_worker(name, "v2", "workers=(0, 4)"), + name, + tmp_path, + env, + "v2", + ) + time.sleep(_IDLE_WAIT) # let workers drain idle so the recycle can fire + + elapsed, _ = poll_until_version( + endpoint_id, api_key, "v2", timeout=_RECYCLE_TIMEOUT, interval=30 + ) + print(f"[multi-worker] first v2 response {elapsed:.1f}s after idle wait") + + # After first v2, no further v1 responses should appear + worker_ids_v2: set[str] = set() + for _ in range(8): + out = ep.run_sync({"msg": "verify"}, timeout=60) + if out: + assert out.get("version") == "v2", ( + f"Stale v1 worker still serving after cutover: {out}" + ) + worker_ids_v2.add(out.get("worker_id", "unknown")) + print(f"[multi-worker] v2 worker IDs seen: {worker_ids_v2}") + finally: + _undeploy(name, tmp_path, env) + sweep_endpoints(api_key) + + +# --------------------------------------------------------------------------- +# Multi-worker always-on: full cutover with zero errors +# --------------------------------------------------------------------------- + + +class TestRedeployMultiWorkerAlwaysOn: + """workers=(2,4), CPU: all workers cut over to v2 with zero errors.""" + + def test_all_workers_cut_over_to_v2(self, tmp_path: Path, api_key: str) -> None: + name = f"flash-qa-rdp-mwao-{uuid.uuid4().hex[:8]}" + env = os.environ.copy() + (tmp_path / "pyproject.toml").write_text(_pyproject(name)) + runpod.api_key = api_key + + try: + _deploy( + _versioned_worker(name, "v1", "workers=(2, 4)"), + name, + tmp_path, + env, + "v1", + ) + endpoint_id = endpoint_id_from_state(tmp_path) + + out = runpod.Endpoint(endpoint_id).run_sync({"msg": "warmup"}, timeout=180) + assert out and out.get("version") == "v1" + + with continuous_caller(endpoint_id, api_key, interval=10.0) as results: + time.sleep(30) # baseline before deploy + + _deploy( + _versioned_worker(name, "v2", "workers=(2, 4)"), + name, + tmp_path, + env, + "v2", + ) + + # Wait until last 5 results are all v2 or timeout + deadline = time.monotonic() + _RECYCLE_TIMEOUT + while time.monotonic() < deadline: + recent = [r for r in results[-5:] if r[1]] + if len(recent) >= 5 and all( + r[1].get("version") == "v2" for r in recent + ): + break + time.sleep(2) + + time.sleep(5) # capture stable v2 period + + log_entries = [ + (f"t={ts:.1f}s", resp.get("version"), resp.get("worker_id")) + for ts, resp, _ in results + if resp + ] + print(f"[multi-worker-ao] transition sequence ({len(log_entries)} calls):") + for entry in log_entries: + print(f" {entry}") + + errors = [err for _, _, err in results if err] + versions = [resp.get("version") for _, resp, _ in results if resp] + + assert "v2" in versions, ( + f"v2 never observed in {len(versions)} responses; " + f"versions seen: {set(versions)}" + ) + assert versions[-1] == "v2", f"Last response not v2: {versions[-1]!r}" + assert len(errors) == 0, ( + f"{len(errors)} error(s) during transition " + f"(indicates hard-kill worker recycle — graceful drain expected): {errors}" + ) + finally: + _undeploy(name, tmp_path, env) + sweep_endpoints(api_key) diff --git a/e2e/test_rolling_release.py b/e2e/test_rolling_release.py new file mode 100644 index 00000000..fd3b0f6c --- /dev/null +++ b/e2e/test_rolling_release.py @@ -0,0 +1,179 @@ +"""Rolling release E2E tests — drift detection. + +Verifies the reconcile/update path that runs on every flash deploy: + + - No spurious release when code and config are unchanged + - Genuine config change triggers a real update +""" + +import os +import subprocess +import uuid +from pathlib import Path + +import runpod + +from conftest import endpoint_id_from_state, sweep_endpoints +from provisioner import flash_dep + +# --------------------------------------------------------------------------- +# Worker code +# --------------------------------------------------------------------------- + +_PYPROJECT_TMPL = """\ +[project] +name = "{name}" +version = "0.1.0" +requires-python = ">=3.11,<3.13" +dependencies = ["{dep}"] +""" + + +def _pyproject(name: str) -> str: + return _PYPROJECT_TMPL.format(name=name, dep=flash_dep()) + + +def _echo_worker(name: str, workers: str = "workers=(1, 1)") -> str: + return f'''\ +import os +from runpod_flash import Endpoint + + +@Endpoint(name="{name}", cpu="cpu3c-1-2", {workers}) +async def echo(msg: str = "") -> dict: + return {{ + "echo": msg, + "worker_id": os.environ.get("RUNPOD_POD_ID", "unknown"), + }} +''' + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _deploy(code: str, name: str, cwd: Path, env: dict) -> subprocess.CompletedProcess: + (cwd / "worker.py").write_text(code) + return subprocess.run( + ["uv", "run", "flash", "deploy"], + cwd=cwd, + env=env, + capture_output=True, + text=True, + timeout=300, + ) + + +def _undeploy(name: str, cwd: Path, env: dict) -> None: + try: + subprocess.run( + ["uv", "run", "flash", "undeploy", name, "--force"], + cwd=cwd, + env=env, + capture_output=True, + text=True, + timeout=60, + ) + except subprocess.TimeoutExpired: + print(f"WARNING: undeploy of {name} timed out") + + +def _deploy_env(api_key: str) -> dict: + env = os.environ.copy() + env["RUNPOD_API_KEY"] = api_key + env["NO_COLOR"] = "1" # strip ANSI from rich output so stdout is plain text + env.setdefault( + "LOG_LEVEL", "INFO" + ) # ensure log.info("Updating endpoint") appears in captured output + return env + + +class TestRollingReleaseNoSpuriousRelease: + """Two successive deploys with identical code and config must be a no-op. + + The second deploy is verified as a no-op by comparing worker_id values + across deploys, since current CLI output may still say 'Deployed to + production' rather than explicitly indicating a cached result. + Worker ID must be unchanged — no new release was triggered. + Uses workers=(1,1) to keep a warm worker for a stable worker_id. + """ + + def test_identical_redeploy_is_cached(self, tmp_path: Path, api_key: str) -> None: + name = f"flash-qa-rr-nsr-{uuid.uuid4().hex[:8]}" + env = _deploy_env(api_key) + (tmp_path / "pyproject.toml").write_text(_pyproject(name)) + runpod.api_key = api_key + + try: + r1 = _deploy(_echo_worker(name), name, tmp_path, env) + assert r1.returncode == 0, ( + f"Initial deploy failed:\n{r1.stdout}\n{r1.stderr}" + ) + endpoint_id = endpoint_id_from_state(tmp_path) + + out1 = runpod.Endpoint(endpoint_id).run_sync({"msg": "before"}, timeout=180) + assert out1 is not None, "First invocation returned None" + worker_id_before = out1.get("worker_id", "") + + # Second deploy — identical code and config + r2 = _deploy(_echo_worker(name), name, tmp_path, env) + assert r2.returncode == 0, ( + f"Second deploy failed:\n{r2.stdout}\n{r2.stderr}" + ) + + # v1.14.0 CLI always prints "Deployed to production" regardless of whether + # the platform triggered a worker recycle — no "cached" signal in output. + # Worker ID comparison below is the authoritative behavioral check. + + out2 = runpod.Endpoint(endpoint_id).run_sync({"msg": "after"}, timeout=60) + assert out2 is not None, "Post-redeploy invocation returned None" + worker_id_after = out2.get("worker_id", "") + + assert worker_id_before == worker_id_after, ( + f"Worker ID changed after no-op redeploy — spurious rolling release fired: " + f"{worker_id_before!r} → {worker_id_after!r}" + ) + finally: + _undeploy(name, tmp_path, env) + sweep_endpoints(api_key) + + +class TestRollingReleaseConfigChangeTriggersDrift: + """Changing workers=(0,1) to workers=(1,1) must trigger a real update. + + Verifies that drift detection is not suppressed for genuine config changes. + """ + + def test_config_change_triggers_update(self, tmp_path: Path, api_key: str) -> None: + name = f"flash-qa-rr-ccd-{uuid.uuid4().hex[:8]}" + env = _deploy_env(api_key) + (tmp_path / "pyproject.toml").write_text(_pyproject(name)) + + try: + # Deploy with scale-to-zero + r1 = _deploy( + _echo_worker(name, workers="workers=(0, 1)"), name, tmp_path, env + ) + assert r1.returncode == 0, ( + f"Initial deploy failed:\n{r1.stdout}\n{r1.stderr}" + ) + + # Re-deploy with always-on (config change only — same code) + r2 = _deploy( + _echo_worker(name, workers="workers=(1, 1)"), name, tmp_path, env + ) + assert r2.returncode == 0, ( + f"Config-change deploy failed:\n{r2.stdout}\n{r2.stderr}" + ) + + # v1.14.0 CLI always prints "Deployed to production" — no distinct + # "drift detected" vs "cached" signal. The endpoint update log line is + # the only CLI observable; it appears only when the endpoint is mutated. + combined = r2.stdout + r2.stderr + assert "updating endpoint" in combined.lower(), ( + f"Expected endpoint update log in config-change deploy output:\n{combined}" + ) + finally: + _undeploy(name, tmp_path, env) + sweep_endpoints(api_key) diff --git a/uv.lock b/uv.lock index 0ac99698..039a4e78 100644 --- a/uv.lock +++ b/uv.lock @@ -2850,7 +2850,7 @@ dependencies = [ [[package]] name = "runpod-flash" -version = "1.13.0" +version = "1.14.0" source = { editable = "." } dependencies = [ { name = "cloudpickle" },