Skip to content

Commit 1c6be7a

Browse files
mihowclaude
andauthored
feat(jobs): make NATS queue activity visible in async ML job logs (#1222)
* feat(nats): surface queue lifecycle events on the per-job logger TaskQueueManager now accepts an optional job_logger. When set, lifecycle events (stream/consumer create+reuse with state/config snapshot, publish failures, cleanup deletions, and a forensic consumer-stats line before deletion) are mirrored to the per-job logger in addition to the module logger. The UI job log now reflects what the NATS layer is actually doing for that specific job instead of a silent gap. Per-message and per-poll paths (publish_task success, reserve_tasks, acknowledge_task) intentionally stay on the module logger only — a 10k-image job would otherwise drown its own log. Lifecycle log lines are deduped per manager session so a loop over N images still only emits a single "Created NATS stream" line per job. cleanup_async_job_resources and queue_images_to_nats pass job.logger through to TaskQueueManager so real async_api jobs pick up the new logging without further caller changes. Closes #1220 * fix(nats): bridge job-logger mirroring through sync_to_async TaskQueueManager._log() was calling job_logger.log() synchronously from inside async methods (_ensure_stream, _ensure_consumer, cleanup). That triggers JobLogHandler.emit() which does a Django ORM refresh_from_db + save — forbidden from an event loop. Every lifecycle line was silently dropped with "Failed to save logs for job #N: You cannot call this from an async context", defeating the point of the original change. Convert _log to async and await sync_to_async(job_logger.log)(...) so the ORM work runs in a thread. Update all call sites to await. Apply the same fix to the publish-failure path in queue_images_to_nats. Verified on ami-demo with job #74: lifecycle lines fired on module logger but "Failed to save logs" errors swallowed the job-logger mirror. * fix(nats): narrow consumer_info exception + stop forwarding module logger to TaskQueueManager in cleanup Addresses CodeRabbit review on #1222. 1. _ensure_consumer caught broad Exception when consumer_info() failed, masking auth/API/transient JetStream errors as "consumer missing" and emitting misleading creation logs. Narrowed to NotFoundError to match the pattern already used in _ensure_stream (line 208). 2. cleanup_async_job_resources forwarded its `_logger` argument into TaskQueueManager as `job_logger`. One caller (_fail_job on the Job.DoesNotExist path in ami/jobs/tasks.py:198) passes a plain module logger, which would then have cleanup lifecycle lines mirrored into an unrelated logger via sync_to_async. Added a separate `job_logger` parameter, defaulted to None, and updated the two callers that have real job context (_fail_job happy path, cleanup_async_job_if_needed) to pass `job.logger` explicitly. The DoesNotExist path leaves job_logger=None, so TaskQueueManager falls through to the module logger only. Tests: 18/18 in test_nats_queue.py pass, pre-commit clean. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(jobs): simplify cleanup_async_job_resources to match codebase job_logger pattern Drop the separate `_logger` parameter that was redundant with `job_logger`. Follow the existing convention (e.g. save_results in ami/jobs/tasks.py): `_log = job_logger or logger` — use per-job logger when available, module logger otherwise. Callers now read cleanly: cleanup_async_job_resources(job.pk, job_logger=job.logger) # has job cleanup_async_job_resources(job_id) # no job Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(nats): read consumer config from ConsumerInfo instead of hardcoding in creation log The "Created NATS consumer" log line was hardcoding max_deliver=5 and interpolating TASK_TTR for ack_wait. Now reads from the ConsumerInfo returned by add_consumer(), so the log always reflects what the server accepted. Added _format_consumer_config() alongside the existing _format_consumer_stats() for the two different log contexts (creation vs runtime stats). Updated test mocks to return ConsumerInfo-like objects with a config sub-object from add_consumer. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(nats): skip redundant NATS calls after first ensure, fix enum rendering in config log Three cleanup items found on self-review: 1. _ensure_stream and _ensure_consumer were calling stream_info/consumer_info on every publish_task (once per image). Since the stream and consumer are never deleted mid-flight (cleanup uses a separate manager session), added an early return when job_id is already in the logged set. Saves 2 NATS round-trips per image after the first. 2. _format_consumer_config was rendering enum fields as "DeliverPolicy.ALL" instead of "all" — added a _val() helper that unwraps .value when present. 3. Removed now-redundant inner dedup checks (the early return makes them always-true). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(nats): preserve traceback on publish failure + defensive stream state access Addresses two CodeRabbit findings: 1. queue_images_to_nats publish exception path only wrote a stringified error to the job log. Added logger.exception() on the module logger so ops dashboards keep the full traceback. The job.logger.error bridge still writes the user-facing message. 2. _ensure_stream accessed info.state.messages / info.state.last_seq without defending against None, unlike _format_consumer_stats which already does. Match the defensive pattern so the reuse line doesn't blow up if the server ever returns a StreamInfo with no state. Also pushing back on the "ack_wait nanoseconds" finding in the review thread — nats-py does convert it to seconds in from_response via _convert_nanoseconds (source: val / _NANOSECOND where _NANOSECOND = 1e9). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(jobs): resolve job logger internally in cleanup_async_job_resources Match the pattern used by save_results in ami/jobs/tasks.py: take only job_id, resolve job.logger internally, fall back to the module logger when the Job row is missing. Keeps call sites consistent across the codebase — cleanup_async_job_resources(job.pk) everywhere — and makes the job object available inside the function for future use. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(jobs): rename _log to job_logger to avoid name collision with TaskQueueManager._log Two _log identifiers in the same module had incompatible semantics: - nats_queue.TaskQueueManager._log(level, msg) — async coroutine that fans out to module + job loggers with sync_to_async bridging. - jobs.cleanup_async_job_resources local _log = job_logger or logger — a plain Logger instance called as _log.info(...) / .error(...). Rename the local to job_logger and assign directly from `job.logger if job else logger`, matching the save_results pattern. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore(nats): tidy up minor code smells from self-review - Drop redundant ternary: cfg = info.config, not `info.config if info.config is not None else None` - Merge two-line f-string in _log_final_consumer_stats - Drop redundant `except asyncio.TimeoutError: raise` in _ensure_consumer (now that the catch is narrowed to NotFoundError, TimeoutError propagates naturally — matches _ensure_stream style) - Explicit comment on the intentionally-broad `except Exception` in _log_final_consumer_stats clarifying it's different from _ensure_consumer Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(nats): unify log fan-out behind manager.log_async Rename TaskQueueManager._log → log_async and route every log line in the queue_images_to_nats async block through it (debug, error + traceback). Drops the ad-hoc sync_to_async(job.logger.error) bridge and the separate logger.exception call — one consistent API, one place that knows how to bridge JobLogHandler's ORM save through sync_to_async. log_async also now accepts exc_info=True so callers don't need to pair a module-only logger.exception with a job-logger error call. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * docs(nats): note future intent to route, not mirror, lifecycle logs log_async currently mirrors granular per-job lifecycle to both the module and job loggers. Document why this is intentional for now (async ML processing still stabilizing, stdout visibility helping us debug) and the eventual target shape (route to job logger only at INFO/DEBUG, mirror at WARNING+). Matches the pattern in ami.jobs.tasks.save_results, where job.logger.propagate=False keeps granular per-job state out of ops logs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * docs(nats): correct safety claim for _ensure_stream/_ensure_consumer dedup The original docstring claimed the stream/consumer won't be deleted mid-flight because cleanup uses a separate manager session. That's incomplete — Job.cancel() runs cleanup_async_job_resources in the request thread while queue_images_to_nats is still in its publish loop in the Celery worker. So a concurrent delete across manager sessions is possible. The early-return is still safe in that scenario, but for a different reason than the original claim: downstream publish_task fails loudly (returns False, logs ERROR) when the stream is gone, rather than silently recreating an orphan stream without a consumer (which is what the non-deduped baseline would do). Updates _ensure_stream and _ensure_consumer docstrings to describe the actual safety argument accurately. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * perf(nats): gate log_async on isEnabledFor before sync_to_async mirror Without this gate, every log_async call fires the job-logger mirror through sync_to_async — a ThreadPoolExecutor submit per call — regardless of whether the effective level would drop the record. For a 10k-image queue this amounts to 10k unnecessary thread-pool submissions when DEBUG is off. stdlib Logger.log does the same isEnabledFor check internally before formatting. We need to do it explicitly here because the mirror goes through sync_to_async, bypassing the in-logger short-circuit. No behavior change when at least one logger is enabled for the level; pure short-circuit when both are gated out. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 8f8b177 commit 1c6be7a

4 files changed

Lines changed: 568 additions & 58 deletions

File tree

ami/jobs/tasks.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,10 +192,10 @@ def _fail_job(job_id: int, reason: str) -> None:
192192
job.save(update_fields=["status", "progress", "finished_at"])
193193

194194
job.logger.error(f"Job {job_id} marked as FAILURE: {reason}")
195-
cleanup_async_job_resources(job.pk, job.logger)
195+
cleanup_async_job_resources(job.pk)
196196
except Job.DoesNotExist:
197197
logger.error(f"Cannot fail job {job_id}: not found")
198-
cleanup_async_job_resources(job_id, logger)
198+
cleanup_async_job_resources(job_id)
199199

200200

201201
def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> None:
@@ -423,7 +423,7 @@ def cleanup_async_job_if_needed(job) -> None:
423423
# import here to avoid circular imports
424424
from ami.ml.orchestration.jobs import cleanup_async_job_resources
425425

426-
cleanup_async_job_resources(job.pk, job.logger)
426+
cleanup_async_job_resources(job.pk)
427427

428428

429429
@task_prerun.connect(sender=run_job)

ami/ml/orchestration/jobs.py

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
logger = logging.getLogger(__name__)
1212

1313

14-
def cleanup_async_job_resources(job_id: int, _logger: logging.Logger) -> bool:
14+
def cleanup_async_job_resources(job_id: int) -> bool:
1515
"""
1616
Clean up NATS JetStream and Redis resources for a completed job.
1717
@@ -21,37 +21,53 @@ def cleanup_async_job_resources(job_id: int, _logger: logging.Logger) -> bool:
2121
2222
Cleanup failures are logged but don't fail the job - data is already saved.
2323
24+
Resolves the job (and its per-job logger) internally so callers only need
25+
to pass the ``job_id`` — matches the pattern used by ``save_results`` in
26+
``ami/jobs/tasks.py``. If the ``Job`` row is gone (e.g. the
27+
``Job.DoesNotExist`` path in ``_fail_job``), the function falls back to
28+
the module logger and TaskQueueManager's module-logger path.
29+
2430
Args:
25-
job_id: The Job ID (integer primary key)
26-
_logger: Logger to use for logging cleanup results
31+
job_id: The Job ID (integer primary key).
2732
Returns:
2833
bool: True if both cleanups succeeded, False otherwise
2934
"""
35+
# Resolve the logger up front: job.logger when the Job exists, module
36+
# logger otherwise. Matches the pattern used by save_results.
37+
job: Job | None = None
38+
try:
39+
job = Job.objects.get(pk=job_id)
40+
except Job.DoesNotExist:
41+
pass
42+
job_logger: logging.Logger = job.logger if job else logger
43+
3044
redis_success = False
3145
nats_success = False
3246

3347
# Cleanup Redis state
3448
try:
3549
state_manager = AsyncJobStateManager(job_id)
3650
state_manager.cleanup()
37-
_logger.info(f"Cleaned up Redis state for job {job_id}")
51+
job_logger.info(f"Cleaned up Redis state for job {job_id}")
3852
redis_success = True
3953
except Exception as e:
40-
_logger.error(f"Error cleaning up Redis state for job {job_id}: {e}")
54+
job_logger.error(f"Error cleaning up Redis state for job {job_id}: {e}")
4155

42-
# Cleanup NATS resources
56+
# Cleanup NATS resources. Only forward a real per-job logger to
57+
# TaskQueueManager — passing the module logger would mirror cleanup
58+
# lifecycle lines into an unrelated logger.
4359
async def cleanup():
44-
async with TaskQueueManager() as manager:
60+
async with TaskQueueManager(job_logger=job.logger if job else None) as manager:
4561
return await manager.cleanup_job_resources(job_id)
4662

4763
try:
4864
nats_success = async_to_sync(cleanup)()
4965
if nats_success:
50-
_logger.info(f"Cleaned up NATS resources for job {job_id}")
66+
job_logger.info(f"Cleaned up NATS resources for job {job_id}")
5167
else:
52-
_logger.warning(f"Failed to clean up NATS resources for job {job_id}")
68+
job_logger.warning(f"Failed to clean up NATS resources for job {job_id}")
5369
except Exception as e:
54-
_logger.error(f"Error cleaning up NATS resources for job {job_id}: {e}")
70+
job_logger.error(f"Error cleaning up NATS resources for job {job_id}: {e}")
5571

5672
return redis_success and nats_success
5773

@@ -97,16 +113,29 @@ async def queue_all_images():
97113
successful_queues = 0
98114
failed_queues = 0
99115

100-
async with TaskQueueManager() as manager:
116+
# Pass job.logger so stream/consumer setup, per-image debug lines, and
117+
# publish failures all appear in the UI job log (not just the module
118+
# logger). All log calls inside this block go through manager.log_async
119+
# so module + job logger stay in sync with one consistent API — and
120+
# the sync_to_async bridge for JobLogHandler's ORM save lives in one
121+
# place instead of being re-implemented at every call site.
122+
async with TaskQueueManager(job_logger=job.logger) as manager:
101123
for image_pk, task in tasks:
102124
try:
103-
logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
125+
await manager.log_async(
126+
logging.DEBUG,
127+
f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}",
128+
)
104129
success = await manager.publish_task(
105130
job_id=job.pk,
106131
data=task,
107132
)
108133
except Exception as e:
109-
logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
134+
await manager.log_async(
135+
logging.ERROR,
136+
f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}",
137+
exc_info=True,
138+
)
110139
success = False
111140

112141
if success:

0 commit comments

Comments
 (0)