feat(processing_services): v2 worker mode for minimal stub#1252
feat(processing_services): v2 worker mode for minimal stub#1252
Conversation
Adds a design document for extending processing_services/minimal/ with a v2 worker mode so agents and CI can exercise the pull path without spinning up the heavyweight ami-data-companion worker. Co-Authored-By: Claude <noreply@anthropic.com>
Extends processing_services/minimal/ to support Antenna's v2 pull/async/worker
processing-service paradigm alongside the existing v1 push mode. One container
image, three modes via the MODE env var:
MODE=api v1 FastAPI only (unchanged, CI default)
MODE=worker v2 HTTP poll loop (reserve tasks -> run stub pipeline -> post results)
MODE=api+worker both (local dev default)
The worker talks HTTP-only to Antenna's job queue API (POST /jobs/{id}/tasks/,
POST /jobs/{id}/result/), matching the ADC contract. Pipelines are the same
stubs used by /process (ConstantPipeline, RandomDetectionRandomSpeciesPipeline),
so v1 and v2 produce identical detections.
Out-of-the-box automation:
- ami/main/management/commands/ensure_default_project.py: idempotent bootstrap
that creates the default superuser + "Default Project" on Django startup.
Guarded by ENSURE_DEFAULT_PROJECT=1, set in .envs/.local/.django and
.envs/.ci/.django.
- compose/local/django/start runs it before starting gunicorn.
- processing_services/minimal/register.py self-provisions a ProcessingService
and registers the stub pipelines with the default project. Supports user
token auth (current main) and has TODOs for API-key auth (PR #1194).
- processing_services/docker-compose.yml sets MODE=api+worker by default.
This unblocks agent-driven e2e work on the async_api path without needing to
stand up the external ami-data-companion worker.
Design doc: docs/claude/planning/2026-04-17-minimal-worker-design.md
Co-Authored-By: Claude <noreply@anthropic.com>
✅ Deploy Preview for antenna-preview canceled.
|
✅ Deploy Preview for antenna-ssec canceled.
|
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
mihow
left a comment
There was a problem hiding this comment.
See my inline comments. Make it clear in the PR description and/or design doc what mirrors or diverges from the implementation in the AMI Data Companion worker. Also review this previous implementation for adding a worker to the docker compose stack, but for connecting to Celery directly. We will stick with the NATS / API version, but I'm curious for you to read the details of this implementation and write up a comparison. #1011
…, simpler loop Apply feedback from PR #1252 review: - Consolidate schemas into `processing_services/minimal/api/schemas.py`. v1 push and v2 worker now share a single source of truth; the separate `worker/schemas.py` mirror is removed. Client/runner/register/loop use real Pydantic types in signatures (PipelineProcessingTask, PipelineTaskResult, ProcessingServiceClientInfo, AsyncPipelineRegistrationRequest). - Move defaults into `processing_services/minimal/.env.dev`. Strip the hard-coded `os.environ.get(..., "<default>")` fallbacks in register.py and worker_main.py; replace with `os.environ[...]`. The inline environment block in docker-compose.yml becomes `env_file:`. - Replace the loop's job→slug reverse-lookup with per-slug iteration: the outer loop variable IS the slug, so `_slug_for_job` is gone. - Fix register.py docstring: the PS is identified by the Authorization header's user (main) or the API key (#1194), not by `processing_service_name`. That field just labels the DB row `get_or_create(name=...)` lands on. - Import pipelines directly in register.py instead of HTTP-GETting /info from the co-located FastAPI. Removes the FastAPI readiness wait entirely, and means register.py works in MODE=worker (where FastAPI isn't running at all). Co-Authored-By: Claude <noreply@anthropic.com>
- Add "Comparison with prior/external implementations" section covering what the stub mirrors vs. diverges from the ADC worker, and why the NATS/API path was picked over the Celery-direct path in PR #1011. - Update the env-var table to reflect the `.env.dev` file as the single source of defaults (no code fallbacks). - Update directory layout and poll-loop pseudocode to match the current per-slug iteration (no job→slug reverse-lookup). Co-Authored-By: Claude <noreply@anthropic.com>
…name my_slugs Follow-up to the previous review round: v2-only classes go in worker/schemas.py so the v1 push path doesn't have to know about pull-mode types. The v1 schemas and v1/v2 shared types (PipelineResultsResponse, PipelineConfigResponse, SourceImageResponse, ...) stay in api/schemas.py. Also rename the awkward `my_slugs` in loop.py. The list comes directly from `pipeline_choices` (the dict of stub pipelines exported by api.api), so iterate it directly instead of copying keys into a local variable. Co-Authored-By: Claude <noreply@anthropic.com>
|
Claude says: Follow-up in
Live-stack e2e is up next. |
The /jobs/ endpoint's `pipeline` filter is a ModelChoiceFilter expecting a DB id, so slug values get rejected with 400 "Select a valid choice". The slug-based alias exposed by JobFilterSet is `pipeline__slug`. Caught by running the minimal worker end-to-end against a live Antenna stack — the poll loop was logging 400 warnings every WORKER_POLL_INTERVAL. After the fix, two async_api test jobs (pipelines `constant` and `random-detection-random-species`) flow through worker → reserve tasks → process → submit results → NATS ACK → Job.SUCCESS. Co-Authored-By: Claude <noreply@anthropic.com>
Summary
Extends
processing_services/minimal/to support Antenna's v2 pull/async/worker paradigm alongside the existing v1 push mode, so agents and CI can exercise the NATS/Redis/Celery lifecycle without spinning up the external ami-data-companion (ADC) worker.One container, three modes via
MODEenv var:MODE=api— v1 FastAPI only (unchanged; CI default)MODE=worker— v2 HTTP poll loopMODE=api+worker— both (local dev default)Intention & context
Antenna supports two processing-service paradigms:
PipelineRequestto/process. Good for interactive single-image inference,/api/v2/docs/schema exposure, and admin smoke tests.POST /api/v2/jobs/{id}/tasks/, process, andPOST /api/v2/jobs/{id}/result/. Antenna proxies NATS internally so workers can live behind firewalls.Today the only v2 implementation is the external ami-data-companion (heavyweight: conda + torch + CUDA warmup). This PR adds a deterministic stub analogous to what
minimal/already is for v1.Design doc:
docs/claude/planning/2026-04-17-minimal-worker-design.md— includes explicit comparison to the ADC worker (what this mirrors vs. diverges) and to PR #1011 (the earlier Celery-direct attempt, and why NATS/API was chosen over it).What changed
Processing service
processing_services/minimal/start.sh— MODE-dispatching orchestrator with signal forwarding forTERM/INT. Inapi+workermode: FastAPI first (so/processstays available while register.py retries), then register.py, then the worker loop.processing_services/minimal/register.py— self-registration. Auth priority:ANTENNA_API_KEY→ANTENNA_API_AUTH_TOKEN→ login withANTENNA_USER/ANTENNA_PASSWORD. Pipelines are imported directly from theapimodule (no HTTP round-trip to self). Sends bothprocessing_service_name(main's serializer expects it as the label forProcessingService.get_or_create(name=...)) andclient_info(read by PR feat: API key auth and identity for processing services #1194).processing_services/minimal/worker_main.py— entry forMODE=worker. Reads the cached auth header that register.py drops at/tmp/antenna_auth_header, falls back to env-based login if absent.processing_services/minimal/worker/— three small modules:client.py(HTTP wrapper with retry),loop.py(per-slug poll),runner.py(task → result via the existingapi/pipelines.pystubs). No duplicated pipeline or schema code.processing_services/minimal/api/schemas.py— v2 classes added alongside v1 (PipelineProcessingTask,PipelineTaskResult,PipelineResultsError,ProcessingServiceClientInfo,AsyncPipelineRegistrationRequest). Single source of truth for both paths.processing_services/minimal/.env.dev— dev defaults for MODE, Antenna target, auth fallback, and worker tuning. Loaded viaenv_file:inprocessing_services/docker-compose.yml. No more duplicated defaults in Python code or inenvironment:blocks.processing_services/minimal/Dockerfile—ENV MODE=api(CI default),CMD ["/app/start.sh"].Out-of-the-box sequencing
ami/main/management/commands/ensure_default_project.py— idempotent bootstrap. Creates the default superuser (createsuperuserfromDJANGO_SUPERUSER_*env vars) and a named project (default"Default Project"). Looks up by name, not PK, so long-lived dev DBs that already have PK 1 claimed are safe.compose/local/django/start— runs the command whenENSURE_DEFAULT_PROJECT=1..envs/.local/.django,.envs/.ci/.django— setENSURE_DEFAULT_PROJECT=1and (for CI)DJANGO_SUPERUSER_*.Review-round changes (2026-04-17)
Applied @mihow's review feedback:
api/schemas.py; the separateworker/schemas.pymirror is deleted.client.py,runner.py, andregister.pynow take/return real Pydantic types rather thandicts.processing_services/minimal/.env.dev.os.environ.get(..., "<default>")fallbacks inregister.pyandworker_main.pyreplaced withos.environ[...]; inlineenvironment:duplication indocker-compose.ymlremoved.loop.pyiteratesfor slug in my_slugsat the outer level sorunner.process_task(task, slug)gets the slug as a loop variable. The prior_slug_for_jobreverse-lookup is gone.processing_service_namejust labels the DB row viaget_or_create(name=...)) and the feat: API key auth and identity for processing services #1194 transition (API-key identifies the PS directly).The flow on a cold boot
ensure_default_projectcreates/finds a superuser + "Default Project".start.shruns FastAPI in the background.register.pyimports its pipelines fromapi.api, logs in to Antenna (or usesANTENNA_API_AUTH_TOKEN), resolves the project by name, and POSTs pipelines to/api/v2/projects/{id}/pipelines/.worker_main.pystarts pollingGET /api/v2/jobs/?pipeline=<slug>&status=STARTED&ids_only=trueonce per slug. When a STARTED async_api job matches, it reserves a batch viaPOST /jobs/{id}/tasks/, runs the stub pipeline, and POSTsPipelineTaskResultitems back.Errors during stub execution are captured and sent as
PipelineResultsErrorso the NATS ACK path still fires — important for exercising retry / stale-job-cutoff paths.What's verified vs. what still needs verification
docker compose -f processing_services/docker-compose.yml build ml_backend_minimal— builds cleanMODE=apicontainer — backward compatible with existing CI (same/process,/info,/livez,/readyz)MODE=api+workercontainer — boots, runs register.py, retries login when Antenna unreachablepython -m py_compile— all new Python compilesconstantorrandom-detection-random-species, confirm worker picks it up, processes it, andprocess_nats_pipeline_resultfires. Not yet run.ensure_default_projectidempotence across repeated Django bootsMODE=apivia ENV (same behavior, different surface)processing_service_name)Test plan (for reviewers)
Out of scope (called out in design doc)
example/toglobal_moths/complete— deferred follow-up.Related
example/service evolution