Skip to content

feat(processing_services): v2 worker mode for minimal stub#1252

Draft
mihow wants to merge 6 commits intomainfrom
worktree-minimal-worker-default-processing-service
Draft

feat(processing_services): v2 worker mode for minimal stub#1252
mihow wants to merge 6 commits intomainfrom
worktree-minimal-worker-default-processing-service

Conversation

@mihow
Copy link
Copy Markdown
Collaborator

@mihow mihow commented Apr 17, 2026

Summary

Extends processing_services/minimal/ to support Antenna's v2 pull/async/worker paradigm alongside the existing v1 push mode, so agents and CI can exercise the NATS/Redis/Celery lifecycle without spinning up the external ami-data-companion (ADC) worker.

One container, three modes via MODE env var:

  • MODE=api — v1 FastAPI only (unchanged; CI default)
  • MODE=worker — v2 HTTP poll loop
  • MODE=api+worker — both (local dev default)

Intention & context

Antenna supports two processing-service paradigms:

  1. v1 (push) — synchronous HTTP: Antenna POSTs PipelineRequest to /process. Good for interactive single-image inference, /api/v2/docs/ schema exposure, and admin smoke tests.
  2. v2 (pull/async/worker) — workers poll POST /api/v2/jobs/{id}/tasks/, process, and POST /api/v2/jobs/{id}/result/. Antenna proxies NATS internally so workers can live behind firewalls.

Today the only v2 implementation is the external ami-data-companion (heavyweight: conda + torch + CUDA warmup). This PR adds a deterministic stub analogous to what minimal/ already is for v1.

Design doc: docs/claude/planning/2026-04-17-minimal-worker-design.md — includes explicit comparison to the ADC worker (what this mirrors vs. diverges) and to PR #1011 (the earlier Celery-direct attempt, and why NATS/API was chosen over it).

What changed

Processing service

  • processing_services/minimal/start.sh — MODE-dispatching orchestrator with signal forwarding for TERM/INT. In api+worker mode: FastAPI first (so /process stays available while register.py retries), then register.py, then the worker loop.
  • processing_services/minimal/register.py — self-registration. Auth priority: ANTENNA_API_KEYANTENNA_API_AUTH_TOKEN → login with ANTENNA_USER/ANTENNA_PASSWORD. Pipelines are imported directly from the api module (no HTTP round-trip to self). Sends both processing_service_name (main's serializer expects it as the label for ProcessingService.get_or_create(name=...)) and client_info (read by PR feat: API key auth and identity for processing services #1194).
  • processing_services/minimal/worker_main.py — entry for MODE=worker. Reads the cached auth header that register.py drops at /tmp/antenna_auth_header, falls back to env-based login if absent.
  • processing_services/minimal/worker/ — three small modules: client.py (HTTP wrapper with retry), loop.py (per-slug poll), runner.py (task → result via the existing api/pipelines.py stubs). No duplicated pipeline or schema code.
  • processing_services/minimal/api/schemas.py — v2 classes added alongside v1 (PipelineProcessingTask, PipelineTaskResult, PipelineResultsError, ProcessingServiceClientInfo, AsyncPipelineRegistrationRequest). Single source of truth for both paths.
  • processing_services/minimal/.env.dev — dev defaults for MODE, Antenna target, auth fallback, and worker tuning. Loaded via env_file: in processing_services/docker-compose.yml. No more duplicated defaults in Python code or in environment: blocks.
  • processing_services/minimal/DockerfileENV MODE=api (CI default), CMD ["/app/start.sh"].

Out-of-the-box sequencing

  • ami/main/management/commands/ensure_default_project.py — idempotent bootstrap. Creates the default superuser (createsuperuser from DJANGO_SUPERUSER_* env vars) and a named project (default "Default Project"). Looks up by name, not PK, so long-lived dev DBs that already have PK 1 claimed are safe.
  • compose/local/django/start — runs the command when ENSURE_DEFAULT_PROJECT=1.
  • .envs/.local/.django, .envs/.ci/.django — set ENSURE_DEFAULT_PROJECT=1 and (for CI) DJANGO_SUPERUSER_*.

Review-round changes (2026-04-17)

Applied @mihow's review feedback:

  • Schema consolidation. v2 worker classes moved into api/schemas.py; the separate worker/schemas.py mirror is deleted. client.py, runner.py, and register.py now take/return real Pydantic types rather than dicts.
  • Env file. All dev defaults centralized in processing_services/minimal/.env.dev. os.environ.get(..., "<default>") fallbacks in register.py and worker_main.py replaced with os.environ[...]; inline environment: duplication in docker-compose.yml removed.
  • Per-slug polling. loop.py iterates for slug in my_slugs at the outer level so runner.process_task(task, slug) gets the slug as a loop variable. The prior _slug_for_job reverse-lookup is gone.
  • register.py docstring. Rewritten to describe current-main identity (PS identified by the Authorization header's user; processing_service_name just labels the DB row via get_or_create(name=...)) and the feat: API key auth and identity for processing services #1194 transition (API-key identifies the PS directly).

The flow on a cold boot

  1. Postgres/RabbitMQ/Redis/NATS come up.
  2. Django runs migrations, then ensure_default_project creates/finds a superuser + "Default Project".
  3. Minimal container starts. start.sh runs FastAPI in the background.
  4. register.py imports its pipelines from api.api, logs in to Antenna (or uses ANTENNA_API_AUTH_TOKEN), resolves the project by name, and POSTs pipelines to /api/v2/projects/{id}/pipelines/.
  5. worker_main.py starts polling GET /api/v2/jobs/?pipeline=<slug>&status=STARTED&ids_only=true once per slug. When a STARTED async_api job matches, it reserves a batch via POST /jobs/{id}/tasks/, runs the stub pipeline, and POSTs PipelineTaskResult items back.

Errors during stub execution are captured and sent as PipelineResultsError so the NATS ACK path still fires — important for exercising retry / stale-job-cutoff paths.

What's verified vs. what still needs verification

  • docker compose -f processing_services/docker-compose.yml build ml_backend_minimal — builds clean
  • MODE=api container — backward compatible with existing CI (same /process, /info, /livez, /readyz)
  • MODE=api+worker container — boots, runs register.py, retries login when Antenna unreachable
  • python -m py_compile — all new Python compiles
  • Pre-commit (black, isort, flake8) passes
  • End-to-end on a live stack — submit an async_api job for constant or random-detection-random-species, confirm worker picks it up, processes it, and process_nats_pipeline_result fires. Not yet run.
  • ensure_default_project idempotence across repeated Django boots
  • CI still green — docker-compose.ci.yml unchanged but the image now defaults to MODE=api via ENV (same behavior, different surface)
  • Confirm PR interop with #1194 when it lands (one-line removal of processing_service_name)

Test plan (for reviewers)

# Bring up Antenna core
docker compose up -d

# Wait for django to apply migrations + run ensure_default_project
docker compose logs django | grep -E 'ensure_default_project|Default Project'

# Bring up the minimal container in api+worker mode
docker compose -f processing_services/docker-compose.yml up -d ml_backend_minimal
docker compose -f processing_services/docker-compose.yml logs -f ml_backend_minimal

# Expected: FastAPI up on :2000, register.py says "Registered N pipelines",
# worker loop says "Polling for jobs on pipelines: ['constant', 'random-detection-random-species']"

# Submit a sync job (v1 path still works)
curl -X POST http://localhost:8000/api/v2/jobs/ ...

# Submit an async_api job (v2 path, exercises the new worker)
curl -X POST http://localhost:8000/api/v2/jobs/?start_now=true ...

Out of scope (called out in design doc)

Related

mihow and others added 2 commits April 17, 2026 13:46
Adds a design document for extending processing_services/minimal/ with a
v2 worker mode so agents and CI can exercise the pull path without spinning
up the heavyweight ami-data-companion worker.

Co-Authored-By: Claude <noreply@anthropic.com>
Extends processing_services/minimal/ to support Antenna's v2 pull/async/worker
processing-service paradigm alongside the existing v1 push mode. One container
image, three modes via the MODE env var:

  MODE=api          v1 FastAPI only (unchanged, CI default)
  MODE=worker       v2 HTTP poll loop (reserve tasks -> run stub pipeline -> post results)
  MODE=api+worker   both (local dev default)

The worker talks HTTP-only to Antenna's job queue API (POST /jobs/{id}/tasks/,
POST /jobs/{id}/result/), matching the ADC contract. Pipelines are the same
stubs used by /process (ConstantPipeline, RandomDetectionRandomSpeciesPipeline),
so v1 and v2 produce identical detections.

Out-of-the-box automation:
- ami/main/management/commands/ensure_default_project.py: idempotent bootstrap
  that creates the default superuser + "Default Project" on Django startup.
  Guarded by ENSURE_DEFAULT_PROJECT=1, set in .envs/.local/.django and
  .envs/.ci/.django.
- compose/local/django/start runs it before starting gunicorn.
- processing_services/minimal/register.py self-provisions a ProcessingService
  and registers the stub pipelines with the default project. Supports user
  token auth (current main) and has TODOs for API-key auth (PR #1194).
- processing_services/docker-compose.yml sets MODE=api+worker by default.

This unblocks agent-driven e2e work on the async_api path without needing to
stand up the external ami-data-companion worker.

Design doc: docs/claude/planning/2026-04-17-minimal-worker-design.md

Co-Authored-By: Claude <noreply@anthropic.com>
@netlify
Copy link
Copy Markdown

netlify bot commented Apr 17, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 442afe5
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69e2ce5a512bdb000893a627

@netlify
Copy link
Copy Markdown

netlify bot commented Apr 17, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 442afe5
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69e2ce5a95fe670007bb2c24

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 17, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8d5bdbac-608c-4577-846a-6ec52badc0c1

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch worktree-minimal-worker-default-processing-service

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment thread processing_services/minimal/worker/client.py Outdated
Comment thread processing_services/minimal/register.py Outdated
Comment thread processing_services/minimal/register.py Outdated
Comment thread processing_services/minimal/worker/loop.py Outdated
Comment thread processing_services/minimal/worker_main.py Outdated
Comment thread processing_services/docker-compose.yml Outdated
Comment thread processing_services/README.md
Copy link
Copy Markdown
Collaborator Author

@mihow mihow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my inline comments. Make it clear in the PR description and/or design doc what mirrors or diverges from the implementation in the AMI Data Companion worker. Also review this previous implementation for adding a worker to the docker compose stack, but for connecting to Celery directly. We will stick with the NATS / API version, but I'm curious for you to read the details of this implementation and write up a comparison. #1011

mihow and others added 3 commits April 17, 2026 16:38
…, simpler loop

Apply feedback from PR #1252 review:

- Consolidate schemas into `processing_services/minimal/api/schemas.py`. v1
  push and v2 worker now share a single source of truth; the separate
  `worker/schemas.py` mirror is removed. Client/runner/register/loop use real
  Pydantic types in signatures (PipelineProcessingTask, PipelineTaskResult,
  ProcessingServiceClientInfo, AsyncPipelineRegistrationRequest).

- Move defaults into `processing_services/minimal/.env.dev`. Strip the
  hard-coded `os.environ.get(..., "<default>")` fallbacks in register.py and
  worker_main.py; replace with `os.environ[...]`. The inline environment
  block in docker-compose.yml becomes `env_file:`.

- Replace the loop's job→slug reverse-lookup with per-slug iteration: the
  outer loop variable IS the slug, so `_slug_for_job` is gone.

- Fix register.py docstring: the PS is identified by the Authorization
  header's user (main) or the API key (#1194), not by
  `processing_service_name`. That field just labels the DB row
  `get_or_create(name=...)` lands on.

- Import pipelines directly in register.py instead of HTTP-GETting
  /info from the co-located FastAPI. Removes the FastAPI readiness wait
  entirely, and means register.py works in MODE=worker (where FastAPI isn't
  running at all).

Co-Authored-By: Claude <noreply@anthropic.com>
- Add "Comparison with prior/external implementations" section covering
  what the stub mirrors vs. diverges from the ADC worker, and why the
  NATS/API path was picked over the Celery-direct path in PR #1011.
- Update the env-var table to reflect the `.env.dev` file as the single
  source of defaults (no code fallbacks).
- Update directory layout and poll-loop pseudocode to match the current
  per-slug iteration (no job→slug reverse-lookup).

Co-Authored-By: Claude <noreply@anthropic.com>
…name my_slugs

Follow-up to the previous review round: v2-only classes go in
worker/schemas.py so the v1 push path doesn't have to know about pull-mode
types. The v1 schemas and v1/v2 shared types (PipelineResultsResponse,
PipelineConfigResponse, SourceImageResponse, ...) stay in api/schemas.py.

Also rename the awkward `my_slugs` in loop.py. The list comes directly from
`pipeline_choices` (the dict of stub pipelines exported by api.api), so
iterate it directly instead of copying keys into a local variable.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow
Copy link
Copy Markdown
Collaborator Author

mihow commented Apr 18, 2026

Claude says: Follow-up in 58b247a9 per your note that the schemas concern was about using Pydantic at all, not about where the v2 classes live:

  • Re-split schemas. v2-only classes moved back out of api/schemas.py into worker/schemas.py (PipelineProcessingTask, PipelineTaskResult, PipelineResultsError, ProcessingServiceClientInfo, AsyncPipelineRegistrationRequest, TasksResponse). The v1 push side no longer imports any pull-mode types. Shared wire-format classes (PipelineResultsResponse, PipelineConfigResponse, SourceImageResponse, ...) stay in api/schemas.py and are imported by worker/schemas.py. Typed signatures throughout client.py / runner.py / loop.py / register.py / worker_main.py are preserved.

  • Dropped my_slugs. It was a redundant local for list(pipeline_choices.keys()). Loop now iterates pipeline_choices directly — for slug in pipeline_choices — so there's no intermediate and no awkward possessive name.

Live-stack e2e is up next.

The /jobs/ endpoint's `pipeline` filter is a ModelChoiceFilter expecting
a DB id, so slug values get rejected with 400 "Select a valid choice".
The slug-based alias exposed by JobFilterSet is `pipeline__slug`.

Caught by running the minimal worker end-to-end against a live Antenna
stack — the poll loop was logging 400 warnings every WORKER_POLL_INTERVAL.
After the fix, two async_api test jobs (pipelines `constant` and
`random-detection-random-species`) flow through worker → reserve tasks →
process → submit results → NATS ACK → Job.SUCCESS.

Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant