Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ def run_job(self, job_id: int) -> None:
raise e
# self.retry(exc=e, countdown=1, max_retries=1)
else:
job.logger.info(f"Running job {job}")
# Log the Redis target at task start so cross-host DB-index drift surfaces
# in every job's log without needing to know where to look. The cluster
# convention is DB 0 = cache, DB 1 = Celery results (see
# config/settings/base.py); the Job state manager also uses the "default"
# connection, so every worker in the pool must agree on the DB number or
# initialize_job writes to one DB while update_state reads from another.
job.logger.info(f"Running job {job} on {_describe_redis_target()}")
try:
job.run()
except Exception as e:
Expand Down Expand Up @@ -171,9 +177,17 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
if not progress_info:
# State keys genuinely missing (the total-images key returned None).
# Ack so NATS stops redelivering and fail the job — there's no state
# left to reconcile against.
# left to reconcile against. The reason string is built from a live
# Redis snapshot (DB index, keys present under job:{id}:*) so the
# FAILURE log and the UI progress.errors entry name the actual cause
# instead of the previous hardcoded "likely cleaned up concurrently"
# guess — which conflated DB-index misconfig, eviction, and genuine
# concurrent cleanup into a single misleading string.
_ack_task_via_nats(reply_subject, logger)
_fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)")
_fail_job(
job_id,
f"Job state missing from Redis (stage=process): {state_manager.diagnose_missing_state()}",
)
Comment on lines 186 to +190
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the missing-state path, update_state() already logs a warning that calls diagnose_missing_state(), and this call site invokes diagnose_missing_state() again to build the failure reason. That duplicates the Redis diagnostic work on the same event. Consider capturing the diagnosis once (e.g., have update_state return it alongside None/raise a dedicated exception carrying it) and reuse it for both logging and _fail_job/UI errors.

Copilot uses AI. Check for mistakes.
return

try:
Expand Down Expand Up @@ -255,7 +269,10 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
# first so NATS stops redelivering a message whose state is gone,
# then fail the job. Mirrors the stage=process missing-state path.
_ack_task_via_nats(reply_subject, job.logger)
_fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)")
_fail_job(
job_id,
f"Job state missing from Redis (stage=results): {state_manager.diagnose_missing_state()}",
)
return

# update complete state based on latest progress info after saving results
Expand Down Expand Up @@ -304,6 +321,24 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
job.logger.error(error)


def _describe_redis_target() -> str:
"""Return a ``host:port/dbN`` string for the "default" Redis connection.

Logged at the start of every ``run_job`` so DB-index drift across hosts
(the class of misconfig that manifests as silent ``process_nats_pipeline_result``
FAILUREs on whichever worker happens to read state from the wrong DB) is
visible in each job's log without requiring a separate diagnostic.
"""
try:
from django_redis import get_redis_connection

redis = get_redis_connection("default")
kwargs = getattr(redis.connection_pool, "connection_kwargs", {}) or {}
return f"redis={kwargs.get('host', '?')}:{kwargs.get('port', '?')}/db{kwargs.get('db', '?')}"
Comment on lines +324 to +337
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_describe_redis_target() claims it returns a host:port/dbN string, but it currently returns redis=host:port/dbN (and the exception case returns redis=(unavailable: ...)). Align the docstring with the actual format, or adjust the return value to match the documented output.

Copilot uses AI. Check for mistakes.
except Exception as e:
return f"redis=(unavailable: {e})"


def _fail_job(job_id: int, reason: str) -> None:
from ami.jobs.models import Job, JobState
from ami.ml.orchestration.jobs import cleanup_async_job_resources
Expand All @@ -313,6 +348,15 @@ def _fail_job(job_id: int, reason: str) -> None:
job = Job.objects.select_for_update().get(pk=job_id)
if job.status in (JobState.CANCELING, *JobState.final_states()):
return
# Mirror the reason into progress.errors so the UI surfaces it
# alongside the FAILURE state. Previously the reason lived only in
# job.logger, which meant the UI showed errors=[] and operators had
# to dig into Celery worker logs to find out why a job died.
try:
job.progress.errors.append(reason)
except Exception:
# Don't let diagnostic-write failures mask the original FAILURE.
pass
job.update_status(JobState.FAILURE, save=False)
job.finished_at = datetime.datetime.now()
job.save(update_fields=["status", "progress", "finished_at"])
Expand Down
99 changes: 96 additions & 3 deletions ami/jobs/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,14 @@ def test_genuinely_missing_state_acks_and_fails_job(self, mock_manager_class, mo

mock_ack.assert_called_once()
mock_fail.assert_called_once()
# New, accurate message — no longer the misleading "Redis state missing"
# that users saw in the UI for transient connection drops.
# Reason string now leads with the stage and embeds a live Redis
# snapshot (DB index + key listing from diagnose_missing_state) so the
# failure cause — DB-index drift, eviction, or never-initialized —
# is visible in the FAILURE log instead of the previous single
# hardcoded "likely cleaned up concurrently" guess.
args, _ = mock_fail.call_args
self.assertIn("Job state keys not found in Redis", args[1])
self.assertIn("Job state missing from Redis", args[1])
self.assertIn("stage=process", args[1])

@patch("ami.jobs.tasks._ack_task_via_nats")
@patch("ami.jobs.tasks.TaskQueueManager")
Expand Down Expand Up @@ -625,6 +629,95 @@ def test_task_failure_marks_sync_api_job_failure_and_cleans_up(self, mock_cleanu
mock_cleanup.assert_called_once()


class TestFailJob(TransactionTestCase):
"""
Regression tests for ``_fail_job`` — specifically for the reason-string
mirroring into ``progress.errors`` that this PR adds.

The FAILURE log line alone is not enough for operators; the UI reads
``progress.errors``, and prior to this PR that list stayed empty on the
missing-Redis-state path. Any regression that stops appending the reason
(e.g. silently dropping it via the defensive ``try/except``) would put
operators back in the position of digging through Celery worker logs to
find out why a job died.
"""

def setUp(self):
cache.clear()
self.project = Project.objects.create(name="FailJob Test Project")
self.pipeline = Pipeline.objects.create(name="FailJob Pipeline", slug="fail-job-pipeline")
self.pipeline.projects.add(self.project)
self.collection = SourceImageCollection.objects.create(name="FailJob Collection", project=self.project)

def tearDown(self):
cache.clear()

def _make_job(self, dispatch_mode: JobDispatchMode = JobDispatchMode.ASYNC_API) -> Job:
job = Job.objects.create(
job_type_key=MLJob.key,
project=self.project,
name=f"{dispatch_mode} fail-job test",
pipeline=self.pipeline,
source_image_collection=self.collection,
dispatch_mode=dispatch_mode,
)
job.update_status(JobState.STARTED, save=True)
return job

@patch("ami.ml.orchestration.jobs.cleanup_async_job_resources")
def test_fail_job_appends_reason_to_progress_errors(self, mock_cleanup):
"""
Reason string must end up in ``job.progress.errors`` (persisted) so the
UI shows the cause of the FAILURE alongside the status change. Before
this PR the reason lived only in ``job.logger`` and the UI showed
``errors=[]``. A silent regression here would not be caught by the
``_fail_job`` call-site tests in ``TestProcessNatsPipelineResultError``
(they mock ``_fail_job`` entirely).
"""
from ami.jobs.tasks import _fail_job

job = self._make_job()
reason = "Job state missing from Redis (stage=process): redis=host:6379/db1 keys_for_job=<none>"

_fail_job(job.pk, reason)

job.refresh_from_db()
self.assertEqual(job.status, JobState.FAILURE)
self.assertIn(
reason,
job.progress.errors,
f"expected reason in progress.errors, got: {job.progress.errors!r}",
)
# Sanity: the fix also propagates to the DB-persisted copy (i.e. the
# update_fields tuple on job.save includes 'progress'). Re-read from a
# fresh Job instance to prove the append wasn't only visible on the
# in-memory object returned by select_for_update.
reloaded = Job.objects.get(pk=job.pk)
self.assertIn(reason, reloaded.progress.errors)
mock_cleanup.assert_called_once_with(job.pk)

@patch("ami.ml.orchestration.jobs.cleanup_async_job_resources")
def test_fail_job_is_noop_on_already_final_job(self, mock_cleanup):
"""
If the job is already in a final state (e.g. concurrent cleanup
beat us), ``_fail_job`` must return early without touching status
or progress. This protects against double-failing a job that has
already been reconciled to SUCCESS by the reconciler path.
"""
from ami.jobs.tasks import _fail_job

job = self._make_job()
job.update_status(JobState.SUCCESS, save=True)
errors_before = list(job.progress.errors)

_fail_job(job.pk, "should be ignored")

job.refresh_from_db()
self.assertEqual(job.status, JobState.SUCCESS)
self.assertEqual(job.progress.errors, errors_before)
mock_cleanup.assert_not_called()


class TestResultEndpointWithError(APITestCase):
"""Integration test for the result API endpoint with error results."""

Expand Down
59 changes: 59 additions & 0 deletions ami/ml/orchestration/async_job_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ def update_state(
newly_removed = results[0] if processed_image_ids else 0

if total_raw is None:
# Loud diagnostic before the silent None return. The caller will mark
# the job FAILURE based on this result, so the operator needs to see
# *why* the total key is gone. Distinguishes three different causes
# that previously all surfaced as the same hardcoded "likely cleaned
# up concurrently" reason string: Redis DB mismatch across hosts,
# key eviction, and genuinely-never-initialized state.
logger.warning(
"Job %s state missing in Redis (stage=%s): %s",
self.job_id,
stage,
self.diagnose_missing_state(),
)
return None

total = int(total_raw)
Expand All @@ -182,6 +194,53 @@ def update_state(
newly_removed=newly_removed,
)

def diagnose_missing_state(self) -> str:
"""
One-line snapshot of what Redis actually holds for this job.

Called from the missing-state path in ``update_state`` and from
``_fail_job`` so the FAILURE log and the UI ``progress.errors`` entry
distinguish the three common causes — DB mismatch across hosts, key
eviction, and never-initialized state — instead of a single hardcoded
"likely cleaned up concurrently" guess that all three collapse to.
Comment on lines +202 to +205
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diagnose_missing_state() docstring says it's called from _fail_job, but _fail_job doesn't invoke it (only update_state and process_nats_pipeline_result do). Please update the docstring to match actual call sites so future readers don’t look for diagnostics propagation in _fail_job.

Suggested change
``_fail_job`` so the FAILURE log and the UI ``progress.errors`` entry
distinguish the three common causesDB mismatch across hosts, key
eviction, and never-initialized stateinstead of a single hardcoded
"likely cleaned up concurrently" guess that all three collapse to.
``process_nats_pipeline_result`` so the FAILURE log and the UI
``progress.errors`` entry distinguish the three common causesDB
mismatch across hosts, key eviction, and never-initialized state
instead of a single hardcoded "likely cleaned up concurrently" guess
that all three collapse to.

Copilot uses AI. Check for mistakes.

Cost: the internal ``SCAN`` runs only on the failure path (once per
job-lifetime FAILURE), and the per-job key fanout is at most four
(pending:process, pending:results, failed, total), so the cost is
negligible compared to the FAILURE branch it only helps diagnose.

Intentionally defensive: any failure to collect diagnostics is
swallowed, because the caller is already about to fail the job and
an exception from diagnostics would mask the original cause.
"""
try:
redis = self._get_redis()
kwargs = getattr(redis.connection_pool, "connection_kwargs", {}) or {}
db = kwargs.get("db", "?")
host = kwargs.get("host", "?")
port = kwargs.get("port", "?")
# Cursor-safe SCAN over the job's keyspace — cheap even on a busy
# Redis because it's filtered server-side and the per-job fanout is
# at most a handful of keys (pending:process, pending:results,
# failed, total).
keys = sorted(k.decode() if isinstance(k, bytes) else k for k in redis.scan_iter(match=self._pattern()))
sizes: list[str] = []
Comment on lines +222 to +227
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diagnose_missing_state() uses scan_iter(match=job:{id}:*), which iterates the full Redis keyspace (SCAN walks the entire DB even when MATCH filters server-side). On a large/loaded Redis (or if this failure repeats due to misconfig), this can add significant latency and load. Consider checking the small, fixed set of expected keys directly (pending:process, pending:results, failed, total) via exists/scard/get, instead of scanning.

Copilot uses AI. Check for mistakes.
for key in keys:
if key == self._total_key:
sizes.append(f"{key}=<str>")
continue
try:
sizes.append(f"{key}=SCARD:{redis.scard(key)}")
except RedisError:
sizes.append(f"{key}=<err>")
keys_summary = ", ".join(sizes) if sizes else "<none>"
return f"redis={host}:{port}/db{db} keys_for_job={keys_summary}"
except Exception as e:
return f"(diagnostics failed: {e})"

def _pattern(self) -> str:
return f"job:{self.job_id}:*"

def get_progress(self, stage: str) -> "JobStateProgress | None":
"""Read-only progress snapshot for the given stage."""
try:
Expand Down
29 changes: 29 additions & 0 deletions ami/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,35 @@ def test_update_state_returns_none_when_state_genuinely_missing(self):
progress = self.manager.update_state({"img1", "img2"}, "process")
self.assertIsNone(progress)

def test_diagnose_missing_state_when_never_initialized(self):
"""
Diagnostic string for the "never initialized" case: no keys are
present under ``job:{id}:*``. Output must still name the Redis host
and DB so cross-host DB drift is distinguishable from eviction and
truly-never-initialized state in one log line.
"""
# initialize_job has NOT been called; nothing under job:123:*.
diagnosis = self.manager.diagnose_missing_state()
self.assertIn("redis=", diagnosis)
self.assertIn("/db", diagnosis)
self.assertIn("keys_for_job=<none>", diagnosis)

def test_diagnose_missing_state_lists_present_keys(self):
"""
Diagnostic string for the partial-cleanup / eviction case: some keys
remain under ``job:{id}:*`` and their SCARDs should appear so the
operator can tell "total key evicted but pending sets still present"
from "nothing here, this DB never saw the job".
"""
self.manager.initialize_job(self.image_ids)
# Drop the total key to simulate eviction while pending sets survive.
redis = self.manager._get_redis()
redis.delete(self.manager._total_key)

diagnosis = self.manager.diagnose_missing_state()
self.assertIn(f"job:{self.job_id}:pending_images:process=SCARD:", diagnosis)
self.assertNotIn(self.manager._total_key, diagnosis)


class TestSaveResultsRefreshesDeploymentCounts(TestCase):
"""save_results must refresh Deployment cached counts, not just Event counts.
Expand Down
Loading