fix(jobs): mark failed or lost images as failed so job can be marked complete#1244
fix(jobs): mark failed or lost images as failed so job can be marked complete#1244
Conversation
Two small primitives the upcoming mark_lost_images_failed reconciler (next commit) needs to inspect running async_api jobs. TaskQueueManager.get_consumer_state(job_id) → ConsumerState | None Thin, mockable projection over nats.js.api.ConsumerInfo. Returns only num_pending / num_ack_pending / num_redelivered — the counters that matter for diagnostic logging. A missing stream/consumer returns None rather than raising, mirroring log_consumer_stats_snapshot's tolerance: a job that was already cleaned up should not blow up a caller iterating over a batch. AsyncJobStateManager.get_pending_image_ids() → set[str] SUNION of both stage pending sets. Used by the reconciler to find ids that Redis still tracks as pending-for-processing. Returns empty set on RedisError so one pooled-connection blip does not abort a sweep. No call sites added in this commit. Unit tests for these primitives come in the reconciler commit, where they are exercised as part of the reconciliation path. Co-Authored-By: Claude <noreply@anthropic.com>
When NATS exhausts max_deliver=2 (per #1234) for a message whose processing kept failing non-retriably, JetStream stops redelivering but keeps the message in the consumer's num_ack_pending indefinitely. Redis still tracks the image id in pending_images:{process,results}. The job sits at <100% progress until check_stale_jobs REVOKEs it at 10 min, wiping all the legitimately processed work alongside the lost images. Adds mark_lost_images_failed() — a new sub-check wired into jobs_health_check ahead of check_stale_jobs. For each running async_api job whose updated_at is older than STALLED_JOBS_MAX_MINUTES and whose Redis pending sets still hold ids, SREMs those ids from pending, SADDs them to failed_images, and runs _update_job_progress for both stages. The existing completion logic (FAILURE_THRESHOLD = 0.5) then decides SUCCESS vs FAILURE on accurate per-image counts instead of REVOKE-ing the whole job. Design decisions: * Idle-threshold signal is Job.updated_at, not NATS consumer counters. JetStream does not clear num_ack_pending when max_deliver is hit — guarding on num_ack_pending > 0 would block the exact production case this is meant to reconcile. Verified empirically with chaos_monkey exhaust_max_deliver (next commit) against a live NATS JetStream. * get_consumer_state() is still called per candidate, but only to log current counters in the diagnostic appended to progress.errors. The reconcile decision itself does not read them. * Runs BEFORE _run_stale_jobs_check so a reconcilable job lands in its natural completion state instead of being REVOKEd. Tests: * 8 TransactionTestCase cases in TestMarkLostImagesFailed covering the main job-2421 shape, the mixed-failure path, the >50% FAILURE threshold, the idle-threshold guard, and two cases (num_pending > 0, num_ack_pending > 0) that now reconcile because the idle cutoff is the load-bearing signal — previously (in earlier revisions of this branch) those were guarded noops. Co-Authored-By: Claude <noreply@anthropic.com>
Fault-injection tool for validating mark_lost_images_failed (and any
future reconciler logic) against a live NATS JetStream without needing
to run ADC and a real ML pipeline.
Given a job_id and a set of image ids, publishes one message per id on
the job's subject, then pulls without ACK and waits ack_wait — repeating
NATS_MAX_DELIVER times so each message hits its delivery budget. After
this the consumer sits in (num_pending=0, num_ack_pending=len(ids),
num_redelivered>0), which empirically is the post-exhaustion resting
state (JetStream does not clear num_ack_pending for messages that hit
max_deliver).
Complements the existing 'flush redis' / 'flush nats' subcommands.
Takes ~NATS_MAX_DELIVER × (TASK_TTR + 3s) to run (default ≈ 66s).
Run either against a real dispatched job, or with --ensure-stream to
create the stream+consumer itself (useful for standalone reconciler
tests against a fake job_id).
Usage:
python manage.py chaos_monkey exhaust_max_deliver \
--job-id 999999 \
--image-ids img-a,img-b,img-c \
--ensure-stream
Co-Authored-By: Claude <noreply@anthropic.com>
… + chaos runbook * processing-lifecycle.md — add a new failure-mode row describing the symptom (job stuck at <100% indefinitely, some images still in Redis pending sets, NATS consumer shows num_redelivered > 0 and num_ack_pending stays elevated) and point at mark_lost_images_failed as the fix. Distinct from Bug A (pre-#1234 ACK/SREM ordering) — that was a crash-window race; this one is max_deliver giving up. * chaos-scenarios.md — add Scenario F: a step-by-step runbook for validating the reconciler against a live stack using the new chaos_monkey exhaust_max_deliver subcommand. No code patches or celeryworker restart required, unlike Scenarios B/D which rely on sentinel-file sentinel patches. Co-Authored-By: Claude <noreply@anthropic.com>
✅ Deploy Preview for antenna-preview canceled.
|
✅ Deploy Preview for antenna-ssec canceled.
|
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 44 minutes and 30 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (8)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Adds a reconciliation path to finalize async_api jobs that become stuck when NATS stops redelivering messages but Redis still tracks a small remainder of pending image IDs, so jobs land in their natural SUCCESS/FAILURE outcome (via the existing FAILURE_THRESHOLD) instead of being blanket-REVOKEd by the stale-job reaper.
Changes:
- Add a
mark_lost_images_failed()sub-check (run beforecheck_stale_jobs) that moves “lost” pending IDs intofailed_imagesand drives progress completion via existing_update_job_progresslogic. - Add small supporting primitives:
ConsumerState+TaskQueueManager.get_consumer_state(), andAsyncJobStateManager.get_pending_image_ids(). - Add tests, a chaos-monkey scenario to exhaust
max_deliver, and docs/runbook updates describing the new failure mode and recovery workflow.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/claude/processing-lifecycle.md | Documents the new “max_deliver exhaustion / pending IDs remain in Redis” failure mode and points to the reconciler fix. |
| docs/claude/debugging/chaos-scenarios.md | Adds a runbook scenario to reproduce max-deliver exhaustion and validate reconciliation behavior. |
| ami/ml/orchestration/nats_queue.py | Introduces ConsumerState and a tolerant get_consumer_state() helper for health-check diagnostics. |
| ami/ml/orchestration/async_job_state.py | Adds get_pending_image_ids() to query union of pending IDs across stages. |
| ami/jobs/tasks.py | Implements mark_lost_images_failed(), reconciliation helper, and wires it into jobs_health_check before the stale-job reaper. |
| ami/jobs/tests/test_tasks.py | Adds transaction-level regression tests covering reconciliation outcomes and integration ordering in jobs_health_check. |
| ami/jobs/tests/test_periodic_beat_tasks.py | Updates expected jobs_health_check result shape to include the new lost_images sub-check. |
| ami/jobs/management/commands/chaos_monkey.py | Adds exhaust_max_deliver subcommand to drive a consumer past max_deliver without ADC. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The module defined ``logger = logging.getLogger(__name__)`` but never used it (all output goes through ``self.stdout.write``). flake8 F401/F841 would flag on a future lint pass. Spotted in Copilot review of #1244. Co-Authored-By: Claude <noreply@anthropic.com>
The new failure-mode row was committed with a placeholder PR reference; replace it now that #1244 is open. Co-Authored-By: Claude <noreply@anthropic.com>
Three Copilot review findings on #1244 rolled up: - Re-validate inside ``select_for_update`` at the top of ``_reconcile_lost_images`` before issuing any Redis SREM/SADD. Closes the window where a late ``process_nats_pipeline_result`` arrives between ``mark_lost_images_failed``'s candidate selection and reconciliation, bumps ``updated_at``, and would otherwise cause counter inflation (same image counted as both processed and failed). - Cap the id list embedded in ``progress.errors`` at 10 entries plus "and N more". The full list still goes to ``job.logger.warning`` — recoverable from the UI's logs panel without bloating the JSONB field surfaced in the job detail payload. - Fix the ``_run_mark_lost_images_failed_check`` docstring that claimed the reconciler skips when ``num_redelivered=0``. It does not — that guard was removed when we discovered NATS keeps exhausted messages in ``num_ack_pending`` indefinitely; the only skip conditions now are consumer missing or no pending ids. ``_reconcile_lost_images`` now returns one of ``"marked_failed"``, ``"raced"``, or ``"state_disappeared"`` so the caller can record the distinction in the per-tick result dict. Two new tests: - ``test_reconcile_skips_when_job_updated_at_bumped_after_candidate_select`` — TDD-verified: fails without the new ``cutoff`` parameter. - ``test_progress_errors_truncates_long_id_list`` — TDD-verified: fails without the new preview-limit constant. Co-Authored-By: Claude <noreply@anthropic.com>
|
@coderabbitai review now? |
|
✅ Actions performedReview triggered.
|
|
Claude says: End-to-end validated in production on demo job 88 — first real stuck-job recovery via this reconciler. Job had 544 images, 16 stranded after the NATS stream drained (last Final: What the validation confirms:
Ready for review. |
Summary
When NATS gives up redelivering a message (after
max_deliver=2) because the handler kept raising, Redis still tracks that image as pending. The job sits at something like 17/20 done indefinitely. At the 10-minSTALLED_JOBS_MAX_MINUTEScutoff,check_stale_jobsREVOKEs the whole job — throwing away the 17 images of successful work alongside the 3 that are actually lost.This PR adds a reconciler inside
jobs_health_checkthat runs beforecheck_stale_jobs: for each runningasync_apijob that has been idle past the cutoff while Redis still tracks images as pending, mark those images as failed and let the existing completion logic land the job inSUCCESSorFAILUREbased on the sameFAILURE_THRESHOLD = 0.5used everywhere else. A 17/20 job landsSUCCESSwithfailed=3. A 2/20 job still landsFAILURE, but with the full stage breakdown visible.Complements the three PRs merged this week (#1231, #1234, #1235). Those fixed the transient/terminal Redis boundary, the ACK/SREM ordering, and made the stale-job reaper faster. Those plus this close the loop: the reaper is now the last-resort safety net for truly unreachable jobs, not the primary recovery path for a single stuck image.
Real production symptom this addresses
demo.antenna.insectai.org/projects/5/jobs/83 — job completed 6 short of 100% and will hang for 10 minutes before the whole job is marked FAILURE, rather than being able to land
SUCCESS/FAILUREon the 6 lost images alone.What this is NOT
max_deliverchange. The 5→2 reduction in fix(jobs): prevent jobs from hanging in STARTED state with no progress #1234 is the right knob for persistent bad-data; this PR is downstream of that setting and agnostic to its value.Architecture
Matches the sub-check pattern established by #1239 (zombie streams) and #1235 (stale jobs):
mark_lost_images_failed()inami/jobs/tasks.py— scans runningasync_apijobs withupdated_at < now - STALLED_JOBS_MAX_MINUTES, checks each for leftover Redis pending ids, and reconciles the ones it finds._reconcile_lost_images()— SREMs the lost ids from bothpending_images:processandpending_images:results, SADDs them tofailed_images, and runs_update_job_progressfor both stages so the existing completion path (which callscleanup_async_job_if_needed) fires naturally._run_mark_lost_images_failed_check()— wraps the call in anIntegrityCheckResultand adds it toJobsHealthCheckResult. Runs before_run_stale_jobs_checkinside thejobs_health_checkumbrella, so a reconcilable job never gets REVOKEd.Reuses existing primitives — no new FAILURE path, no new threshold, no new Celery task, no new settings. Adds three small pieces:
ConsumerStatedataclass +TaskQueueManager.get_consumer_state()(diagnostic only — readsnum_pending/num_ack_pending/num_redeliveredfor the log line).AsyncJobStateManager.get_pending_image_ids()(SUNION of both stage sets).chaos_monkey exhaust_max_deliversubcommand — drives a consumer pastmax_deliverwithout ADC for validation. Pairs with the existingflush redis/flush natspattern from fix(jobs): prevent jobs from hanging in STARTED state with no progress #1234's chaos runbook.Design choice: the idle signal is
Job.updated_at, not NATS countersInitial draft gated on
num_pending == 0 AND num_ack_pending == 0 AND num_redelivered > 0. Empirical finding during e2e (chaos_monkey exhaust_max_deliveragainst a live NATS JetStream): aftermax_deliver=2is hit, the messages stay innum_ack_pendingindefinitely — JetStream does not clear that counter until the stream is deleted. Three more ack_wait cycles: stillnum_ack_pending=3.This means a
num_ack_pending > 0guard would skip the exact production case. Removed both counter guards. The idle signal is now the one that actually correlates with stuckness:Job.updated_atis bumped on every successful result save, so a job that hasn't updated in 10+ min has genuinely stopped making progress — whatever the NATS counters say.Why this is safe
Late deliveries after reconciliation are idempotent, as documented in #1234's
processing-lifecycle.md§2:save_resultsdedupes on(detection, source_image)— detections still land if ADC recovers and pushes the result.SREMon already-removed ids is a no-op withnewly_removed == 0gating counter accumulation, sodetections/classifications/capturesdon't double._update_job_progressclamps percentage withmax()and preserves FAILURE once set — a late result can't regress progress or un-fail a job.Worst case: a genuinely slow-but-alive ADC gets its 10-min-old images marked failed, but its late result still saves detections. The Job shows FAILURE in the UI; the detections exist in the DB — same as any partial-failure async job.
Files
ami/ml/orchestration/nats_queue.pyConsumerStatedataclass +TaskQueueManager.get_consumer_stateami/ml/orchestration/async_job_state.pyAsyncJobStateManager.get_pending_image_idsami/jobs/tasks.pymark_lost_images_failed+_reconcile_lost_images+_run_mark_lost_images_failed_checksub-check +jobs_health_checkwiringami/jobs/tests/test_tasks.pyTestMarkLostImagesFailed— 8 TransactionTestCaseami/jobs/tests/test_periodic_beat_tasks.pylost_imagesfield onJobsHealthCheckResultami/jobs/management/commands/chaos_monkey.pyexhaust_max_deliversubcommanddocs/claude/processing-lifecycle.mddocs/claude/debugging/chaos-scenarios.mdVerified locally
Unit —
ami.jobssuite, 84 tests passdocker compose -f docker-compose.ci.yml run --rm django python manage.py test ami.jobs --keepdb— green.TestMarkLostImagesFailed(8 cases,TransactionTestCasewith real Redis and real DB,TaskQueueManagermocked) specifically covers:process.failed = 3,remaining = 0; job lands SUCCESS (3/10 < threshold). Asserts stage params, Redis drained, diagnostic inprogress.errors.FAILURE_THRESHOLD→ job lands FAILURE not SUCCESS, with accurate per-image counts. Verifies the reconciler doesn't accidentally mask genuine failure.explicit_failuresplus newly-SADDedlost_idsland as a single set; SADD is idempotent on accidental overlap. Verifies we don't inflatefailedby counting overlap twice.updated_at(inside the cutoff window) skips reconciliation even when all other signals say "stuck". This is the one true guard.num_pending > 0andnum_ack_pending > 0cases now reconcile (tests flipped from "noop" to "reconciles") because the empirical NATS behavior showed those counters are unreliable indicators.num_redelivered=0with stuck Redis ids (the pre-fix(jobs): prevent jobs from hanging in STARTED state with no progress #1234 ACK-before-SREM signature) also reconciles cleanly. If that bug ever resurfaces, this reconciler catches it.jobs_health_checkrunslost_imagesbeforestale_jobsso a reconcilable job lands in its natural terminal state instead of REVOKE.Updates to
test_periodic_beat_tasks.pyconfirm the newlost_imagesIntegrityCheckResultfield populates correctly across healthy / idle-deployment paths.E2E path 1 —
chaos_monkey exhaust_max_deliver+ reconciler against live stack (fake job_id)Validates what unit tests cannot mock: real NATS JetStream consumer behavior.
What it verified:
TaskQueueManager.get_consumer_state()returnsNonefor a non-existent consumer (tolerant path).get_consumer_state()returns correctly-populatedConsumerStateagainst a real consumer:num_pending=3, num_ack_pending=0, num_redelivered=0after publishing 3 messages pre-delivery.max_deliverexhaustion resting state isnum_pending=0, num_ack_pending=3, num_redelivered=3— and it stays there through three moreack_waitcycles. This is the finding that drove the guard-removal design change; it is now covered by a test in the suite.AsyncJobStateManager.get_pending_image_ids()returns the SUNION of both stage sets against real Redis._update_job_progress/cleanup_async_job_if_neededall fire in sequence. Redis cleaned up. NATS stream + consumer deleted on completion (via the existing cleanup trigger, not new code). Diagnostic appended toprogress.errorswith the real counter values.E2E path 2 — real ADC happy path
Dispatched job 1529: project 18, pipeline
quebec_vermont_moths_2023, 20-image collection,dispatch_mode=async_api. ADC worker running against the local stack, pulling real messages.What it verified:
updated_atstays fresh as batches land).jobs_health_checkumbrella task continues to run without interference from the new sub-check.E2E path 3 — real Job row stuck via
chaos_monkey(ADC stopped)Job 1530, real DB row referencing real pipeline + collection + project,
dispatch_mode=ASYNC_API,status=STARTED. Back-datedupdated_atby 15 min to cross the cutoff.chaos_monkey exhaust_max_deliver --job-id 1530 --image-ids img-x,img-y,img-z --ensure-streamdrove the consumer pastmax_deliver.What it verified:
updated_at < cutoff + dispatch_mode=ASYNC_API + status in running_statesfilter (not a synthetic fake-job edge case).processandresultsland at 100% FAILURE (3/3 = 100% > threshold).cleanup_async_job_if_neededfires onis_complete(), draining Redis and deleting the NATS stream — proves the reconciler integrates correctly with the existing cleanup lifecycle.progress.errorscontains the actual live counter values.To verify in staging
Before any synthetic traffic
jobs_health_checkbeat-task duration on the first few ticks post-deploy — confirm the new sub-check doesn't materially increase tick time on an idle stack (expected: tens of ms).lost_imagesIntegrityCheckResultcounters on healthy jobs:checked=0, fixed=0, unfixable=0— anything non-zero without a matching real stuck job warrants investigation.Real-traffic observation (first 24-72h)
jobs_health_check: marked N image(s) as failed (job idle past cutoff; ...)WARN lines in celeryworker logs. Expected: rare (only when a real handler failure +max_deliverexhaustion happens). Any spike should correlate with a visible cluster of stuck jobs.check_stale_jobsREVOKEs should drop relative to pre-deploy — partial-failure jobs now finalize through the reconciler instead of being reaped.FAILUREvia the reconciler should have a populatedprogress.errorsstarting withjobs_health_check:— visible both via API and in the Job-detail UI.fixed > 0entry against the job's actual progress — if the job was still making progress within 10 min, that's a bug.Targeted scenario — reproduce a real max_deliver exhaustion
Production can't wait 10 min reliably, so this is best done in staging with a controlled job:
filter_processed_imagesdoesn't strip everything).STALLED_JOBS_MAX_MINUTES).jobs_health_checkbeat tick runsmark_lost_images_failedbeforecheck_stale_jobs.progress.errorshas the diagnostic. NATS stream cleaned up. Redis keys cleaned up. No REVOKE in the Celery task state for this job.save_resultsorprocess_nats_pipeline_result.Sanity checks to run manually on any stuck-looking job post-deploy
From
docs/claude/processing-lifecycle.md§5 (copy-paste block, replaceJOB_ID):errorstrailer).SCARDon both pending sets +failed_images).num_pending,num_ack_pending,num_redelivered).A job showing
status=STARTED + process/results < 100% + updated_at < 10 min oldis not reconcilable — that's an in-flight job, possibly slow. A job showingstatus=STARTED + Redis pending > 0 + updated_at > 10 min oldis reconcilable and should have flipped to SUCCESS/FAILURE on the most recent beat tick.Relationship to concurrent / recently-merged work
_fail_job). This PR relies on the crispupdate_statecontract it established.task_failureguard,max_deliver5→2. This PR's raison d'être is the downstream symptom ofmax_deliver=2exhaustion. Touches the sameprocessing-lifecycle.mdsection (new row).FAILED_CUTOFF_HOURSinto display-hours vs. reaper-minutes. Reaper runs at 10 min. This PR's reconciler runs before the reaper at the same 10-min cutoff so jobs land naturally rather than being REVOKEd.progress.errors. Both PRs append toprogress.errors; the formats differ but coexist without conflict. If feat(jobs): make missing-Redis-state FAILUREs loud and self-diagnosing #1241 lands first, a small_append_progress_error(job, reason)helper could consolidate the two sites.Generated with Claude Code