-
Notifications
You must be signed in to change notification settings - Fork 12
feat(jobs): make missing-Redis-state FAILUREs loud and self-diagnosing #1241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,7 +45,13 @@ def run_job(self, job_id: int) -> None: | |
| raise e | ||
| # self.retry(exc=e, countdown=1, max_retries=1) | ||
| else: | ||
| job.logger.info(f"Running job {job}") | ||
| # Log the Redis target at task start so cross-host DB-index drift surfaces | ||
| # in every job's log without needing to know where to look. The cluster | ||
| # convention is DB 0 = cache, DB 1 = Celery results (see | ||
| # config/settings/base.py); the Job state manager also uses the "default" | ||
| # connection, so every worker in the pool must agree on the DB number or | ||
| # initialize_job writes to one DB while update_state reads from another. | ||
| job.logger.info(f"Running job {job} on {_describe_redis_target()}") | ||
| try: | ||
| job.run() | ||
| except Exception as e: | ||
|
|
@@ -171,9 +177,17 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub | |
| if not progress_info: | ||
| # State keys genuinely missing (the total-images key returned None). | ||
| # Ack so NATS stops redelivering and fail the job — there's no state | ||
| # left to reconcile against. | ||
| # left to reconcile against. The reason string is built from a live | ||
| # Redis snapshot (DB index, keys present under job:{id}:*) so the | ||
| # FAILURE log and the UI progress.errors entry name the actual cause | ||
| # instead of the previous hardcoded "likely cleaned up concurrently" | ||
| # guess — which conflated DB-index misconfig, eviction, and genuine | ||
| # concurrent cleanup into a single misleading string. | ||
| _ack_task_via_nats(reply_subject, logger) | ||
| _fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)") | ||
| _fail_job( | ||
| job_id, | ||
| f"Job state missing from Redis (stage=process): {state_manager.diagnose_missing_state()}", | ||
| ) | ||
| return | ||
|
|
||
| try: | ||
|
|
@@ -255,7 +269,10 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub | |
| # first so NATS stops redelivering a message whose state is gone, | ||
| # then fail the job. Mirrors the stage=process missing-state path. | ||
| _ack_task_via_nats(reply_subject, job.logger) | ||
| _fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)") | ||
| _fail_job( | ||
| job_id, | ||
| f"Job state missing from Redis (stage=results): {state_manager.diagnose_missing_state()}", | ||
| ) | ||
| return | ||
|
|
||
| # update complete state based on latest progress info after saving results | ||
|
|
@@ -304,6 +321,24 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub | |
| job.logger.error(error) | ||
|
|
||
|
|
||
| def _describe_redis_target() -> str: | ||
| """Return a ``host:port/dbN`` string for the "default" Redis connection. | ||
|
|
||
| Logged at the start of every ``run_job`` so DB-index drift across hosts | ||
| (the class of misconfig that manifests as silent ``process_nats_pipeline_result`` | ||
| FAILUREs on whichever worker happens to read state from the wrong DB) is | ||
| visible in each job's log without requiring a separate diagnostic. | ||
| """ | ||
| try: | ||
| from django_redis import get_redis_connection | ||
|
|
||
| redis = get_redis_connection("default") | ||
| kwargs = getattr(redis.connection_pool, "connection_kwargs", {}) or {} | ||
| return f"redis={kwargs.get('host', '?')}:{kwargs.get('port', '?')}/db{kwargs.get('db', '?')}" | ||
|
Comment on lines
+324
to
+337
|
||
| except Exception as e: | ||
| return f"redis=(unavailable: {e})" | ||
|
|
||
|
|
||
| def _fail_job(job_id: int, reason: str) -> None: | ||
| from ami.jobs.models import Job, JobState | ||
| from ami.ml.orchestration.jobs import cleanup_async_job_resources | ||
|
|
@@ -313,6 +348,15 @@ def _fail_job(job_id: int, reason: str) -> None: | |
| job = Job.objects.select_for_update().get(pk=job_id) | ||
| if job.status in (JobState.CANCELING, *JobState.final_states()): | ||
| return | ||
| # Mirror the reason into progress.errors so the UI surfaces it | ||
| # alongside the FAILURE state. Previously the reason lived only in | ||
| # job.logger, which meant the UI showed errors=[] and operators had | ||
| # to dig into Celery worker logs to find out why a job died. | ||
| try: | ||
| job.progress.errors.append(reason) | ||
| except Exception: | ||
| # Don't let diagnostic-write failures mask the original FAILURE. | ||
| pass | ||
| job.update_status(JobState.FAILURE, save=False) | ||
| job.finished_at = datetime.datetime.now() | ||
| job.save(update_fields=["status", "progress", "finished_at"]) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -163,6 +163,18 @@ def update_state( | |||||||||||||||||||
| newly_removed = results[0] if processed_image_ids else 0 | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if total_raw is None: | ||||||||||||||||||||
| # Loud diagnostic before the silent None return. The caller will mark | ||||||||||||||||||||
| # the job FAILURE based on this result, so the operator needs to see | ||||||||||||||||||||
| # *why* the total key is gone. Distinguishes three different causes | ||||||||||||||||||||
| # that previously all surfaced as the same hardcoded "likely cleaned | ||||||||||||||||||||
| # up concurrently" reason string: Redis DB mismatch across hosts, | ||||||||||||||||||||
| # key eviction, and genuinely-never-initialized state. | ||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||
| "Job %s state missing in Redis (stage=%s): %s", | ||||||||||||||||||||
| self.job_id, | ||||||||||||||||||||
| stage, | ||||||||||||||||||||
| self.diagnose_missing_state(), | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| return None | ||||||||||||||||||||
|
|
||||||||||||||||||||
| total = int(total_raw) | ||||||||||||||||||||
|
|
@@ -182,6 +194,53 @@ def update_state( | |||||||||||||||||||
| newly_removed=newly_removed, | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def diagnose_missing_state(self) -> str: | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| One-line snapshot of what Redis actually holds for this job. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Called from the missing-state path in ``update_state`` and from | ||||||||||||||||||||
| ``_fail_job`` so the FAILURE log and the UI ``progress.errors`` entry | ||||||||||||||||||||
| distinguish the three common causes — DB mismatch across hosts, key | ||||||||||||||||||||
| eviction, and never-initialized state — instead of a single hardcoded | ||||||||||||||||||||
| "likely cleaned up concurrently" guess that all three collapse to. | ||||||||||||||||||||
|
Comment on lines
+202
to
+205
|
||||||||||||||||||||
| ``_fail_job`` so the FAILURE log and the UI ``progress.errors`` entry | |
| distinguish the three common causes — DB mismatch across hosts, key | |
| eviction, and never-initialized state — instead of a single hardcoded | |
| "likely cleaned up concurrently" guess that all three collapse to. | |
| ``process_nats_pipeline_result`` so the FAILURE log and the UI | |
| ``progress.errors`` entry distinguish the three common causes — DB | |
| mismatch across hosts, key eviction, and never-initialized state — | |
| instead of a single hardcoded "likely cleaned up concurrently" guess | |
| that all three collapse to. |
Copilot
AI
Apr 17, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
diagnose_missing_state() uses scan_iter(match=job:{id}:*), which iterates the full Redis keyspace (SCAN walks the entire DB even when MATCH filters server-side). On a large/loaded Redis (or if this failure repeats due to misconfig), this can add significant latency and load. Consider checking the small, fixed set of expected keys directly (pending:process, pending:results, failed, total) via exists/scard/get, instead of scanning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the missing-state path,
update_state()already logs a warning that callsdiagnose_missing_state(), and this call site invokesdiagnose_missing_state()again to build the failure reason. That duplicates the Redis diagnostic work on the same event. Consider capturing the diagnosis once (e.g., haveupdate_statereturn it alongsideNone/raise a dedicated exception carrying it) and reuse it for both logging and_fail_job/UI errors.