feat(jobs): make NATS queue activity visible in async ML job logs#1222
feat(jobs): make NATS queue activity visible in async ML job logs#1222
Conversation
TaskQueueManager now accepts an optional job_logger. When set, lifecycle events (stream/consumer create+reuse with state/config snapshot, publish failures, cleanup deletions, and a forensic consumer-stats line before deletion) are mirrored to the per-job logger in addition to the module logger. The UI job log now reflects what the NATS layer is actually doing for that specific job instead of a silent gap. Per-message and per-poll paths (publish_task success, reserve_tasks, acknowledge_task) intentionally stay on the module logger only — a 10k-image job would otherwise drown its own log. Lifecycle log lines are deduped per manager session so a loop over N images still only emits a single "Created NATS stream" line per job. cleanup_async_job_resources and queue_images_to_nats pass job.logger through to TaskQueueManager so real async_api jobs pick up the new logging without further caller changes. Closes #1220
✅ Deploy Preview for antenna-preview canceled.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughExtended orchestration and TaskQueueManager to accept an optional per-job logger, mirror selected NATS lifecycle and error messages into that job logger, add per-session deduplication of lifecycle messages, propagate Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Job / Caller
participant TQM as TaskQueueManager
participant NATS as NATS JetStream
participant ModLog as Module Logger
participant JobLog as Job Logger
Caller->>TQM: __init__(..., job_logger=job.logger)
Caller->>TQM: publish_task(job_id, payload)
TQM->>NATS: stream_info(job_<id>)
alt stream exists
TQM->>ModLog: log(stream reuse)
TQM->>JobLog: _log(stream reuse + stats)
else stream missing
TQM->>NATS: add_stream(job_<id>)
TQM->>ModLog: log(stream created)
TQM->>JobLog: _log(stream created)
end
TQM->>NATS: consumer_info(job_<id>)
alt consumer exists
TQM->>ModLog: log(consumer reuse)
TQM->>JobLog: _log(consumer reuse + stats)
else consumer missing
TQM->>NATS: add_consumer(job_<id>)
TQM->>ModLog: log(consumer created)
TQM->>JobLog: _log(consumer created + config)
end
TQM->>NATS: publish()
alt publish succeeds
TQM->>ModLog: debug(success)
else publish fails
TQM->>ModLog: error(...)
TQM->>JobLog: _log(ERROR, ...)
end
Caller->>TQM: cleanup_job_resources(job_id)
TQM->>NATS: consumer_info(job_<id>)
TQM->>JobLog: _log_final_consumer_stats(job_id)
TQM->>NATS: delete_consumer / delete_stream
TQM->>ModLog: deletion logs
TQM->>JobLog: _log(deletion logs)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
✅ Deploy Preview for antenna-ssec canceled.
|
TaskQueueManager._log() was calling job_logger.log() synchronously from inside async methods (_ensure_stream, _ensure_consumer, cleanup). That triggers JobLogHandler.emit() which does a Django ORM refresh_from_db + save — forbidden from an event loop. Every lifecycle line was silently dropped with "Failed to save logs for job #N: You cannot call this from an async context", defeating the point of the original change. Convert _log to async and await sync_to_async(job_logger.log)(...) so the ORM work runs in a thread. Update all call sites to await. Apply the same fix to the publish-failure path in queue_images_to_nats. Verified on ami-demo with job #74: lifecycle lines fired on module logger but "Failed to save logs" errors swallowed the job-logger mirror.
There was a problem hiding this comment.
Pull request overview
This PR makes NATS JetStream queue lifecycle events (stream/consumer setup, cleanup snapshots, and publish failures) visible in the per-job UI log by mirroring selected module-level logs to an optional job_logger, with async-safe bridging for Django ORM-backed log handlers.
Changes:
- Add optional
job_loggertoTaskQueueManagerand mirror key lifecycle/error logs via an async_log()helper usingsync_to_async. - Wire
job.loggerinto NATS publishing and cleanup paths so job logs include stream/consumer setup, delete events, and forensic pre-delete consumer stats. - Add unit tests validating mirroring, deduping, reuse stats formatting, cleanup snapshot ordering, and failure surfacing.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| ami/ml/orchestration/nats_queue.py | Adds job_logger support, async-safe log mirroring, lifecycle dedupe, and pre-delete consumer stats logging. |
| ami/ml/orchestration/jobs.py | Passes per-job logger into TaskQueueManager for publish/cleanup and bridges one async error log write via sync_to_async. |
| ami/ml/orchestration/tests/test_nats_queue.py | Adds unit tests covering job-logger mirroring behavior and dedupe/forensics expectations. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ami/ml/orchestration/nats_queue.py (1)
238-254:⚠️ Potential issue | 🟠 MajorOnly catch
NotFoundErrorto detect missing consumers.The current code catches all
Exceptionexceptions, treating anyconsumer_info()failure as "consumer missing". If JetStream returns an auth, API, or transient error here, the code falls through toadd_consumer(), which hides the real failure and can emit misleading lifecycle logs.Change the broad
except Exceptiontoexcept nats.js.errors.NotFoundErrorso other exceptions propagate correctly. This requires adding the import:from nats.js.errors import NotFoundError.Suggested fix
try: info = await asyncio.wait_for( self.js.consumer_info(stream_name, consumer_name), timeout=NATS_JETSTREAM_TIMEOUT, ) if job_id not in self._consumers_logged: await self._log( logging.INFO, f"Reusing NATS consumer {consumer_name} ({self._format_consumer_stats(info)})", ) self._consumers_logged.add(job_id) return except asyncio.TimeoutError: raise # NATS unreachable — let caller handle it - except Exception: + except nats.js.errors.NotFoundError: # Consumer doesn't exist, fall through to create it. passAlso add to imports near the top:
from nats.js.errors import NotFoundError🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 238 - 254, The try/except in the block calling self.js.consumer_info incorrectly swallows all exceptions (masking auth/API/transient errors) — replace the broad "except Exception" with "except NotFoundError" (from nats.js.errors) so only missing-consumer errors fall through to add_consumer; add the import "from nats.js.errors import NotFoundError" near the top and keep the existing asyncio.TimeoutError handling and logging using self._log, consumer_name, _format_consumer_stats, and _consumers_logged unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/ml/orchestration/jobs.py`:
- Around line 47-50: The cleanup function currently forwards whatever _logger it
receives into TaskQueueManager; change cleanup_async_job_resources to accept
job_logger: logging.Logger | None = None and only forward an actual per-job
logger (job.logger) from callers — update callers (e.g., where Job lookup
succeeds) to pass job.logger, and where Job lookup fails pass None; inside
cleanup_async_job_resources use a conditional context: if job_logger is not None
then "async with TaskQueueManager(job_logger=job_logger) as manager" else "async
with TaskQueueManager() as manager" so unrelated module loggers are not used for
per-job NATS lifecycle logging.
---
Outside diff comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 238-254: The try/except in the block calling self.js.consumer_info
incorrectly swallows all exceptions (masking auth/API/transient errors) —
replace the broad "except Exception" with "except NotFoundError" (from
nats.js.errors) so only missing-consumer errors fall through to add_consumer;
add the import "from nats.js.errors import NotFoundError" near the top and keep
the existing asyncio.TimeoutError handling and logging using self._log,
consumer_name, _format_consumer_stats, and _consumers_logged unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ef374171-e1d2-4120-8bd7-e1dce1735926
📒 Files selected for processing (3)
ami/ml/orchestration/jobs.pyami/ml/orchestration/nats_queue.pyami/ml/orchestration/tests/test_nats_queue.py
…gger to TaskQueueManager in cleanup Addresses CodeRabbit review on #1222. 1. _ensure_consumer caught broad Exception when consumer_info() failed, masking auth/API/transient JetStream errors as "consumer missing" and emitting misleading creation logs. Narrowed to NotFoundError to match the pattern already used in _ensure_stream (line 208). 2. cleanup_async_job_resources forwarded its `_logger` argument into TaskQueueManager as `job_logger`. One caller (_fail_job on the Job.DoesNotExist path in ami/jobs/tasks.py:198) passes a plain module logger, which would then have cleanup lifecycle lines mirrored into an unrelated logger via sync_to_async. Added a separate `job_logger` parameter, defaulted to None, and updated the two callers that have real job context (_fail_job happy path, cleanup_async_job_if_needed) to pass `job.logger` explicitly. The DoesNotExist path leaves job_logger=None, so TaskQueueManager falls through to the module logger only. Tests: 18/18 in test_nats_queue.py pass, pre-commit clean. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
ami/ml/orchestration/nats_queue.py (1)
375-395: Consider passingjob_loggerin_ack_task_via_natsfor consistency.The
_ack_task_via_natsfunction inami/jobs/tasks.py(lines 204-211) has access tojob_loggerbut doesn't forward it toTaskQueueManager. Whileacknowledge_taskonly logs at debug/error level and doesn't involve lifecycle events, passingjob_loggerwould ensure any acknowledgment failures surface in the job log.This is a minor consistency improvement and can be deferred if desired.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 375 - 395, Update TaskQueueManager.acknowledge_task to accept an optional job_logger parameter and use it instead of the module-level logger for debug/error outputs; then update the caller _ack_task_via_nats in ami/jobs/tasks.py to forward its job_logger when calling TaskQueueManager.acknowledge_task so acknowledgment failures are logged to the job-specific logger. Ensure the new parameter is optional (defaulting to None) to maintain backward compatibility and only use job_logger if provided.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 375-395: Update TaskQueueManager.acknowledge_task to accept an
optional job_logger parameter and use it instead of the module-level logger for
debug/error outputs; then update the caller _ack_task_via_nats in
ami/jobs/tasks.py to forward its job_logger when calling
TaskQueueManager.acknowledge_task so acknowledgment failures are logged to the
job-specific logger. Ensure the new parameter is optional (defaulting to None)
to maintain backward compatibility and only use job_logger if provided.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3d05cb6e-5e33-404c-926f-5e2a45ac59d3
📒 Files selected for processing (3)
ami/jobs/tasks.pyami/ml/orchestration/jobs.pyami/ml/orchestration/nats_queue.py
…e job_logger pattern Drop the separate `_logger` parameter that was redundant with `job_logger`. Follow the existing convention (e.g. save_results in ami/jobs/tasks.py): `_log = job_logger or logger` — use per-job logger when available, module logger otherwise. Callers now read cleanly: cleanup_async_job_resources(job.pk, job_logger=job.logger) # has job cleanup_async_job_resources(job_id) # no job Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/ml/orchestration/jobs.py`:
- Around line 120-127: The except block that currently awaits
sync_to_async(job.logger.error)(...) should also emit the full traceback to the
module logger so ops logs keep stack traces; add a module-level
logger.exception(...) call (using the module-level logger variable) inside the
except handler (e.g., in the same except Exception as e in publish/publish_task
path) before or immediately after the sync_to_async(job.logger.error) call so
the exception and traceback are recorded, while keeping the existing
job.logger.error call to write the message to job.logs.stdout.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8803dbbe-b042-4439-a8ce-bba8e51ce8c0
📒 Files selected for processing (2)
ami/jobs/tasks.pyami/ml/orchestration/jobs.py
🚧 Files skipped from review as they are similar to previous changes (1)
- ami/jobs/tasks.py
| # Per-message success logs stay at module level (noise in 10k-image | ||
| # jobs), but a failure on even a single publish deserves to surface | ||
| # in the job log — otherwise the failure path is invisible to users. | ||
| await self._log(logging.ERROR, f"Failed to publish task to stream for job '{job_id}': {e}") |
…dcoding in creation log The "Created NATS consumer" log line was hardcoding max_deliver=5 and interpolating TASK_TTR for ack_wait. Now reads from the ConsumerInfo returned by add_consumer(), so the log always reflects what the server accepted. Added _format_consumer_config() alongside the existing _format_consumer_stats() for the two different log contexts (creation vs runtime stats). Updated test mocks to return ConsumerInfo-like objects with a config sub-object from add_consumer. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
ami/ml/orchestration/nats_queue.py (1)
217-226: Consider defensive access forinfo.statefields.While
StreamInfo.stateshould always be present whenstream_info()succeeds, the code accessesinfo.state.messagesandinfo.state.last_seqdirectly. For consistency with the defensive pattern used in_format_consumer_stats(), you might want to handle potentialNonevalues.♻️ Optional defensive access
if job_id not in self._streams_logged: + state = info.state + messages = state.messages if state else "?" + last_seq = state.last_seq if state else "?" await self._log( logging.INFO, f"Reusing NATS stream {stream_name} " - f"(messages={info.state.messages}, last_seq={info.state.last_seq})", + f"(messages={messages}, last_seq={last_seq})", )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 217 - 226, The code accesses info.state.messages and info.state.last_seq directly; make this defensive like _format_consumer_stats by first grabbing state = info.state or a default, then use getattr(state, "messages", 0) and getattr(state, "last_seq", 0) (or sensible string defaults) when building the log message in the block that calls self.js.stream_info(stream_name), and keep using self._log and self._streams_logged as before so the log prints safe values even if info.state is None.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 121-127: The ack_wait value from ConsumerInfo.config (ack_wait) is
in nanoseconds and is being logged directly, producing misleading values; update
the string construction that builds the consumer config summary (the return that
references cfg.ack_wait) to convert ack_wait from nanoseconds to seconds by
dividing by 1e9 when present so the log shows human-readable seconds (e.g., 30s)
instead of nanoseconds.
---
Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 217-226: The code accesses info.state.messages and
info.state.last_seq directly; make this defensive like _format_consumer_stats by
first grabbing state = info.state or a default, then use getattr(state,
"messages", 0) and getattr(state, "last_seq", 0) (or sensible string defaults)
when building the log message in the block that calls
self.js.stream_info(stream_name), and keep using self._log and
self._streams_logged as before so the log prints safe values even if info.state
is None.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d5c4336b-33b5-4131-ae23-a5487ad2a802
📒 Files selected for processing (2)
ami/ml/orchestration/nats_queue.pyami/ml/orchestration/tests/test_nats_queue.py
✅ Files skipped from review due to trivial changes (1)
- ami/ml/orchestration/tests/test_nats_queue.py
…m rendering in config log Three cleanup items found on self-review: 1. _ensure_stream and _ensure_consumer were calling stream_info/consumer_info on every publish_task (once per image). Since the stream and consumer are never deleted mid-flight (cleanup uses a separate manager session), added an early return when job_id is already in the logged set. Saves 2 NATS round-trips per image after the first. 2. _format_consumer_config was rendering enum fields as "DeliverPolicy.ALL" instead of "all" — added a _val() helper that unwraps .value when present. 3. Removed now-redundant inner dedup checks (the early return makes them always-true). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
♻️ Duplicate comments (1)
ami/ml/orchestration/nats_queue.py (1)
126-132:⚠️ Potential issue | 🟡 MinorVerify
ack_waitunits before rendering as seconds.At Line 128,
ack_waitis logged with anssuffix directly fromConsumerInfo.config. If this value is nanoseconds in your nats-py version, the UI log will be misleading.In the exact nats.py version used by this repository, what unit is `ConsumerInfo.config.ack_wait` returned in by `consumer_info()` / `add_consumer()` responses: seconds or nanoseconds?
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b5e8d277-43a6-40ab-88b0-fce7f830e46d
📒 Files selected for processing (1)
ami/ml/orchestration/nats_queue.py
…tate access Addresses two CodeRabbit findings: 1. queue_images_to_nats publish exception path only wrote a stringified error to the job log. Added logger.exception() on the module logger so ops dashboards keep the full traceback. The job.logger.error bridge still writes the user-facing message. 2. _ensure_stream accessed info.state.messages / info.state.last_seq without defending against None, unlike _format_consumer_stats which already does. Match the defensive pattern so the reuse line doesn't blow up if the server ever returns a StreamInfo with no state. Also pushing back on the "ack_wait nanoseconds" finding in the review thread — nats-py does convert it to seconds in from_response via _convert_nanoseconds (source: val / _NANOSECOND where _NANOSECOND = 1e9). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…sources Match the pattern used by save_results in ami/jobs/tasks.py: take only job_id, resolve job.logger internally, fall back to the module logger when the Job row is missing. Keeps call sites consistent across the codebase — cleanup_async_job_resources(job.pk) everywhere — and makes the job object available inside the function for future use. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…h TaskQueueManager._log Two _log identifiers in the same module had incompatible semantics: - nats_queue.TaskQueueManager._log(level, msg) — async coroutine that fans out to module + job loggers with sync_to_async bridging. - jobs.cleanup_async_job_resources local _log = job_logger or logger — a plain Logger instance called as _log.info(...) / .error(...). Rename the local to job_logger and assign directly from `job.logger if job else logger`, matching the save_results pattern. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Drop redundant ternary: cfg = info.config, not `info.config if info.config is not None else None` - Merge two-line f-string in _log_final_consumer_stats - Drop redundant `except asyncio.TimeoutError: raise` in _ensure_consumer (now that the catch is narrowed to NotFoundError, TimeoutError propagates naturally — matches _ensure_stream style) - Explicit comment on the intentionally-broad `except Exception` in _log_final_consumer_stats clarifying it's different from _ensure_consumer Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rename TaskQueueManager._log → log_async and route every log line in the queue_images_to_nats async block through it (debug, error + traceback). Drops the ad-hoc sync_to_async(job.logger.error) bridge and the separate logger.exception call — one consistent API, one place that knows how to bridge JobLogHandler's ORM save through sync_to_async. log_async also now accepts exc_info=True so callers don't need to pair a module-only logger.exception with a job-logger error call. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
log_async currently mirrors granular per-job lifecycle to both the module and job loggers. Document why this is intentional for now (async ML processing still stabilizing, stdout visibility helping us debug) and the eventual target shape (route to job logger only at INFO/DEBUG, mirror at WARNING+). Matches the pattern in ami.jobs.tasks.save_results, where job.logger.propagate=False keeps granular per-job state out of ops logs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…dedup The original docstring claimed the stream/consumer won't be deleted mid-flight because cleanup uses a separate manager session. That's incomplete — Job.cancel() runs cleanup_async_job_resources in the request thread while queue_images_to_nats is still in its publish loop in the Celery worker. So a concurrent delete across manager sessions is possible. The early-return is still safe in that scenario, but for a different reason than the original claim: downstream publish_task fails loudly (returns False, logs ERROR) when the stream is gone, rather than silently recreating an orphan stream without a consumer (which is what the non-deduped baseline would do). Updates _ensure_stream and _ensure_consumer docstrings to describe the actual safety argument accurately. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Without this gate, every log_async call fires the job-logger mirror through sync_to_async — a ThreadPoolExecutor submit per call — regardless of whether the effective level would drop the record. For a 10k-image queue this amounts to 10k unnecessary thread-pool submissions when DEBUG is off. stdlib Logger.log does the same isEnabledFor check internally before formatting. We need to do it explicitly here because the mirror goes through sync_to_async, bypassing the in-logger short-circuit. No behavior change when at least one logger is enabled for the level; pure short-circuit when both are gated out. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…pshots Two Celery Beat periodic tasks so operators can see ML job health without waiting for a job to finish: 1. check_stale_jobs_task (every 15 min) — thin wrapper around the existing check_stale_jobs() that reconciles jobs stuck in running states past FAILED_CUTOFF_HOURS. Previously the function existed but was only reachable via the update_stale_jobs management command, so nothing ran it automatically. 2. log_running_async_job_stats (every 5 min) — for each incomplete async_api job, opens a TaskQueueManager with that job's logger and logs a delivered/ack_floor/num_pending/num_ack_pending/num_redelivered snapshot of the NATS consumer. Read-only; no status changes. Builds on the lifecycle-logging landed in #1222 so long-running jobs now get mid-flight visibility, not just create + cleanup snapshots. Both registered via migration 0020 so existing deployments pick them up on the next migrate without manual Beat configuration. Co-Authored-By: Claude <noreply@anthropic.com>
* feat(jobs): schedule periodic stale-job reconcile + NATS consumer snapshots Two Celery Beat periodic tasks so operators can see ML job health without waiting for a job to finish: 1. check_stale_jobs_task (every 15 min) — thin wrapper around the existing check_stale_jobs() that reconciles jobs stuck in running states past FAILED_CUTOFF_HOURS. Previously the function existed but was only reachable via the update_stale_jobs management command, so nothing ran it automatically. 2. log_running_async_job_stats (every 5 min) — for each incomplete async_api job, opens a TaskQueueManager with that job's logger and logs a delivered/ack_floor/num_pending/num_ack_pending/num_redelivered snapshot of the NATS consumer. Read-only; no status changes. Builds on the lifecycle-logging landed in #1222 so long-running jobs now get mid-flight visibility, not just create + cleanup snapshots. Both registered via migration 0020 so existing deployments pick them up on the next migrate without manual Beat configuration. Co-Authored-By: Claude <noreply@anthropic.com> * fix(jobs): address review comments on PR #1227 - log_running_async_job_stats: reuse a single TaskQueueManager per tick instead of opening N NATS connections. Cost is now O(1) in the number of running async jobs rather than O(n). Guarded outer connection setup so a NATS outage drops the tick cleanly instead of crashing the task. - check_stale_jobs_task: bump expires from 10 to 14 minutes so a delayed copy still runs within the 15-minute schedule instead of expiring before a worker picks it up under broker pressure. - migration 0020: use apps.get_model for django_celery_beat models and declare an explicit migration dependency so the data migration uses historical model state. Co-Authored-By: Claude <noreply@anthropic.com> * fix(jobs): fall back to per-job manager when shared path fails If opening the shared TaskQueueManager raises at setup (or teardown), try each job with its own fresh manager before giving up. Defends against a regression in the shared mutation pattern silently costing us snapshot visibility for every running async job on every tick. If NATS itself is down, the fallback loop will fail too and log once per job — same end-state as before the refactor, just more noisy. Co-Authored-By: Claude <noreply@anthropic.com> * refactor(jobs): rename beat task to jobs_health_check umbrella The periodic task is renamed from `check_stale_jobs_task` to `jobs_health_check` so new job-health checks can share the 15-minute cadence and `expires` guarantees without a new beat entry. Current body runs a single `_run_stale_jobs_check()`; future checks plug in alongside it and return the same `{checked, fixed, unfixable}` shape. Names chosen to parallel #1188's integrity-check pattern: - Beat task = noun phrase (`jobs_health_check`) — reads well in flower - (Future) management command = verb (`manage.py check_jobs`) Migration 0020 updated in place since it hasn't shipped yet. Co-Authored-By: Claude <noreply@anthropic.com> * refactor(jobs): fold snapshot task into umbrella, adopt IntegrityCheckResult Addresses the takeaway-review findings on PR #1227: - Fix reverse migration: delete the row we create. The forward step registers `jobs.health_check` but the old `delete_periodic_tasks` still referenced the pre-rename `jobs.check_stale_jobs`, leaving a stranded row on rollback. - Collapse `log_running_async_job_stats` into a sub-check of the umbrella. Drops the second `PeriodicTask` and the shared-connection fallback path — on a 15-minute cadence there's no reason to keep two beat tasks or defend against a per-manager bug, and a quietly hung job now gets a snapshot in the same tick the reconciler will decide whether to revoke it. - Adopt `IntegrityCheckResult` as the shared sub-check shape, housed in a new `ami.main.checks.schemas` module so PR #1188 can re-target its import without a merge-order dance. Wrap the two sub-checks in a `JobsHealthCheckResult` parent dataclass; the umbrella returns `dataclasses.asdict(...)` for celery's JSON backend. Observation checks leave `fixed=0` and count per-item errors in `unfixable`. Tests collapse to one `JobsHealthCheckTest` class covering both sub-checks (reconcile + snapshot) and the edge cases that matter: per-job snapshot failure, shared-connection setup failure, non-async jobs skipped, idle deployment returns all zeros. Co-Authored-By: Claude <noreply@anthropic.com> * fix(jobs): isolate sub-checks + pre-resolve loggers off event loop Addresses review findings from the re-review on 6dc7e6e: - The umbrella had no guard around sub-check calls, so a DB hiccup in ``_run_stale_jobs_check`` would kill the snapshot check and fail the whole task. Wrap each call in ``_safe_run_sub_check``, which catches, logs, and returns ``IntegrityCheckResult(unfixable=1)`` as a sentinel — operators watching the task result in Flower see the sub-check failed rather than reading all-zeros and assuming all-clear. - ``Job.logger`` attaches a ``JobLogHandler`` on first access which touches the ORM; the file's own docstring says resolve outside the event loop, but two accesses were inside the coroutine. Pre-resolve into a list of ``(job, job_logger)`` tuples before entering ``async_to_sync``. - Escalate the ``running_job_snapshots`` summary log to WARNING when ``errors > 0`` so persistent NATS unavailability is distinguishable from a quiet tick in aggregated logs. - Document that the outer shared-connection except overwrites per-iteration error counts on the rare ``__aexit__`` teardown path. New tests: - ``test_sub_check_exception_does_not_block_the_other`` — patches the snapshot sub-check to raise; stale-jobs still reports correctly and snapshots come back as the ``unfixable=1`` sentinel. - ``test_stale_jobs_fixed_counts_celery_updated_and_revoked_paths`` — one stale job with a terminal Celery ``task_id``, one without; both branches of ``fixed`` counted so a future refactor dropping one branch breaks the test. - Explicit ``fixed == 0`` assertion in the snapshot test locks the observation-only contract. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
Summary
Closes #1220. Makes NATS queue lifecycle events visible in the per-job log that users see in the UI, so async ML jobs no longer have a silent gap where their NATS activity used to be invisible.
TaskQueueManagernow accepts an optionaljob_logger. When set, lifecycle events are mirrored to both the module logger (unchanged — still feeds stdout / container logs / observability pipelines) and the per-job logger:Setup
Created NATS stream job_<id>/Reusing NATS stream job_<id> (messages=N, last_seq=N)Created NATS consumer job-<id>-consumer (max_deliver=5, ack_wait=30.0s, max_ack_pending=1000, deliver_policy=all, ack_policy=explicit)— config snapshot read from theConsumerInforeturned by the serverReusing NATS consumer job-<id>-consumer (delivered=N ack_floor=N num_pending=N num_ack_pending=N num_redelivered=N)Cleanup (forensic)
Finalizing NATS consumer job-<id>-consumer before deletion (delivered=N ack_floor=N num_pending=N num_ack_pending=N num_redelivered=N)— logged before the delete calls. This is the single most useful line for post-mortem investigations of jobs that ended in a weird state; without it, the consumer is already gone by the time anyone asks.Deleted NATS consumer job-<id>-consumer/Deleted NATS stream job_<id>and matching failure linesError paths
Failed to publish task to stream for job '<id>': <error>— previously only on the module logger, now also on the job logger with traceback preserved on the module side viaexc_info=TrueWhat's deliberately NOT logged to job_logger
Per-message and per-poll success events stay on the module logger only — a 10k-image job would otherwise drown its own log:
publish_tasksuccess (one per image)reserve_tasks(one per worker poll, every few seconds)acknowledge_tasksuccess (one per result)Per-image queueing is routed through
manager.log_asyncatDEBUGlevel, so it's available in the job log when debug logging is intentionally enabled (off by default). These might be worth surfacing more permanently when we can identify each processing service (#1194).Implementation notes
Async bridging. Logging from inside the async block goes through the manager's
log_async(level, msg, exc_info=False)helper — one entry point that:job.loggerviasync_to_asyncwhen one was passed inThe bridge is needed because Django's
JobLogHandler.emit()does ORM writes (refresh_from_db+save), which are forbidden from the event loop. Without it, every lifecycle line was silently dropped — the handler's broadexceptswallowedSynchronousOnlyOperationand routed a warning to the module logger that nobody was watching.Centralizing this in
log_asyncmeans the bridge lives in one place instead of being re-implemented at every call site (an earlier draft of this PR had three different logging styles in the publish loop alone — module-direct, module-with-traceback, and a hand-rolledsync_to_async(job.logger.error)bridge).Future intent — route, not mirror.
log_asynccurrently fans granular per-job lifecycle out to both loggers. The longer-term preference (documented inline in thelog_asyncdocstring) is to route — granular lifecycle lives onjob.loggeronly, matching the convention inami.jobs.tasks.save_resultsand friends, wherejob.logger.propagate = Falseand per-job detail is kept out of stdout / NR. The module logger then becomes a clean ops channel for true infra signals (connection failures, NATS-side errors) plus an automatic mirror atWARNING+so error signals always reach ops dashboards. Kept symmetric for now because async ML processing is still being stabilized and the extra stdout visibility is actively helping debug. Switching the fan-out direction is a follow-up once the per-job UI log is trusted as the canonical inspection surface.Dedup and early-return.
_ensure_streamand_ensure_consumerare called on everypublish_task(once per image). After the first call per manager session, subsequent calls skip the NATS round-trip entirely via an early return keyed on twoset[int]fields (_streams_logged,_consumers_logged).Concurrency note:
Job.cancel()can triggercleanup_async_job_resources(running in the request thread) whilequeue_images_to_natsis still in its publish loop (running in the Celery worker), so the stream/consumer can be deleted mid-flight from a different manager session. The early-return stays safe in that case — subsequent publishes in the queue loop fail loudly (publish_taskreturnsFalseand logs an ERROR) rather than silently recreating an orphan stream without a consumer. That's actually better than the non-deduped baseline behavior, where_ensure_streamwould silently recreate the stream on every iteration and the subsequent publishes would succeed to a stream that no worker will ever consume from. Not exercised by a concurrent-cancel test yet; worth adding if we lean harder on this claim.Exception narrowing.
_ensure_consumerpreviously caught broadExceptionon theconsumer_info()call, masking auth/API/transient errors as "consumer missing." Narrowed tonats.js.errors.NotFoundErrorto match the pattern already used in_ensure_streamand_stream_exists.Caller wiring.
cleanup_async_job_resources(job_id)now takes justjob_idand resolves theJob(and its per-job logger) internally — matching thesave_resultspattern inami/jobs/tasks.py. If theJobrow is gone (e.g. theJob.DoesNotExistpath in_fail_job), it falls back to the module logger.queue_images_to_natspassesjob.loggerdirectly toTaskQueueManager. Callers without a job context (_ack_task_via_nats, the worker-poll view, the DLQ management command) constructTaskQueueManager()without a logger.Tests
Added
TestTaskQueueManagerJobLoggerinami/ml/orchestration/tests/test_nats_queue.py(7 new tests):cleanup_job_resourceslogs the forensic snapshot before delete callscleanup_job_resourcestolerates a missing consumer without raisingpublish_taskfailures surface on job_logger at ERROR levelTaskQueueManager()without ajob_loggerstill works (module logger fallback)Tests use a real
logging.Loggerwith a capture handler (notMagicMock), so thesync_to_asyncbridge inlog_async()is exercised on the real code path. All 18 tests intest_nats_queue.pypass. Not covered: concurrent-cancel-during-queue (see Dedup and early-return above).Manual verification
Deployed to a staging environment and ran async_api jobs end-to-end against real ML workers.
Small smoke test (18 images) — two jobs completed the full lifecycle. A third was canceled mid-flight; its UI log showed:
Larger test (636 images) — canceled after ~16 minutes (staging server backlogged on result ingestion). The forensic snapshot:
920 delivery attempts for 636 tasks (~1.45× factor from redeliveries), 169 acked, 43 in flight, 141 redelivered. Workers were pulling but result POSTs were timing out, causing ack-before-ack_wait failures. Without this line you'd be guessing.
What this PR is NOT
reserve_tasksor the worker-poll pathasync_apijobs killed by transient Redis errors duringupdate_state—RedisErrorand "state actually missing" are conflated into a single fatal path #1219 (transient Redis error conflation) — but makes those diagnostics visible in the job log onceasync_apijobs killed by transient Redis errors duringupdate_state—RedisErrorand "state actually missing" are conflated into a single fatal path #1219 landslog_asyncfrom "mirror to both" to "route, then auto-mirror at WARNING+" — see Future intent aboveTest plan
pytest ami/ml/orchestration/tests/test_nats_queue.py— 18 tests passpytest ami/ml/orchestration/tests/test_cleanup.py— integration tests pass (real NATS + Redis)black,isort,flake8,pyupgradeclean on all touched files (via pre-commit)job.logs.stdoutend-to-end🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Tests