Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ami/jobs/management/commands/update_stale_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument(
"--hours",
"--minutes",
type=int,
default=Job.FAILED_CUTOFF_HOURS,
help="Number of hours to consider a job stale (default: %(default)s)",
default=Job.STALLED_JOBS_MAX_MINUTES,
help="Minutes since last update to consider a job stale (default: %(default)s)",
)
Comment thread
mihow marked this conversation as resolved.
parser.add_argument(
"--dry-run",
Expand All @@ -21,7 +21,7 @@ def add_arguments(self, parser):
)

def handle(self, *args, **options):
results = check_stale_jobs(hours=options["hours"], dry_run=options["dry_run"])
results = check_stale_jobs(minutes=options["minutes"], dry_run=options["dry_run"])

if not results:
self.stdout.write("No stale jobs found.")
Expand Down
27 changes: 20 additions & 7 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,14 @@ def get_job_type_by_inferred_key(job: "Job") -> type[JobType] | None:
class Job(BaseModel):
"""A job to be run by the scheduler"""

# Hide old failed jobs after 3 days
FAILED_CUTOFF_HOURS = 24 * 3
# UI/API: hide failed jobs older than this from listings (display filter only).
FAILED_JOBS_DISPLAY_MAX_HOURS = 24 * 3
# Reaper: revoke jobs in :meth:`JobState.running_states` whose ``updated_at``
# is older than this. A healthy async_api job bumps ``updated_at`` on every
# Redis SREM-driven progress save, so this is effectively "no progress for
# N minutes". 10 is conservative; raise if legitimate long-running jobs get
# reaped.
STALLED_JOBS_MAX_MINUTES = 10

name = models.CharField(max_length=255)
queue = models.CharField(max_length=255, default="default")
Expand Down Expand Up @@ -1037,14 +1043,21 @@ def update_progress(self, save=True):
else:
for stage in self.progress.stages:
if stage.progress > 0 and stage.status == JobState.CREATED:
# Update any stages that have started but are still in the CREATED state
# Promote stages that have started but are still in the CREATED state.
stage.status = JobState.STARTED
elif stage.status in JobState.final_states() and stage.progress < 1:
# Update any stages that are complete but have a progress less than 1
stage.progress = 1
elif stage.progress == 1 and stage.status not in JobState.final_states():
# Update any stages that are complete but are still in the STARTED state
# Promote stages that have measured-100% progress but are still STARTED.
stage.status = JobState.SUCCESS
# Note: do NOT coerce ``stage.progress = 1`` when status is in a
# final state but progress < 1. That branch used to fire when
# ``_update_job_progress`` wrote ``status=FAILURE`` at partial
# progress (e.g. failed/total temporarily crossed FAILURE_THRESHOLD
# early in an async_api job). The save-time coercion silently
# bumped progress to 100%, which made ``is_complete()`` return True
# and triggered premature ``cleanup_async_job_resources`` while
# NATS was still delivering results. Progress is a measurement;
# leave it alone. Stuck jobs are reaped by ``check_stale_jobs``
# via ``Job.STALLED_JOBS_MAX_MINUTES``.
total_progress = sum([stage.progress for stage in self.progress.stages]) / len(self.progress.stages)

self.progress.summary.progress = total_progress
Expand Down
32 changes: 27 additions & 5 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,15 @@ def _update_job_progress(
cleanup_async_job_if_needed(job)


def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[dict]:
def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[dict]:
"""
Find jobs stuck in a running state past the cutoff and revoke them.

Cutoff is measured against ``Job.updated_at`` (auto-bumped on every save),
so a job that's actively making progress — including async_api jobs that
bump on each Redis SREM-driven progress save — is never reaped while
healthy. Default cutoff is :attr:`Job.STALLED_JOBS_MAX_MINUTES`.

For each stale job, checks Celery for a terminal task status. REVOKED is
always trusted. For async_api jobs, SUCCESS and FAILURE are only accepted
when job.progress.is_complete() — NATS workers may still be delivering
Expand All @@ -397,10 +402,10 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di

from ami.jobs.models import Job, JobDispatchMode, JobState

if hours is None:
hours = Job.FAILED_CUTOFF_HOURS
if minutes is None:
minutes = Job.STALLED_JOBS_MAX_MINUTES

cutoff = datetime.datetime.now() - datetime.timedelta(hours=hours)
cutoff = datetime.datetime.now() - datetime.timedelta(minutes=minutes)
stale_pks = list(
Job.objects.filter(
status__in=JobState.running_states(),
Expand Down Expand Up @@ -449,6 +454,23 @@ def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[di
job.finished_at = datetime.datetime.now()
job.save()
else:
# Per-job diagnostic: surface enough state at revoke time that an
# operator can answer "why was this stalled?" without grepping
# back through tick logs. Pairs with the per-tick NATS consumer
# snapshots logged by ``_run_running_job_snapshot_check``.
stalled_minutes = (datetime.datetime.now() - job.updated_at).total_seconds() / 60
stages_summary = (
", ".join(f"{s.key}={s.progress*100:.1f}% {s.status}" for s in job.progress.stages)
or "(no stages)"
)
job.logger.warning(
f"Reaping stalled job: no progress for {stalled_minutes:.1f} min "
f"(threshold {minutes} min). previous_status={previous_status}, "
f"celery_state={celery_state}, dispatch_mode={job.dispatch_mode}, "
f"stages: {stages_summary}. "
f"For NATS consumer state at the last tick, see prior "
f"running_job_snapshots logs for this job."
)
if not dry_run:
job.update_status(JobState.REVOKED, save=False)
job.finished_at = datetime.datetime.now()
Expand Down Expand Up @@ -492,7 +514,7 @@ class JobsHealthCheckResult:


def _run_stale_jobs_check() -> IntegrityCheckResult:
"""Reconcile jobs stuck in running states past FAILED_CUTOFF_HOURS."""
"""Reconcile jobs stuck in running states past Job.STALLED_JOBS_MAX_MINUTES."""
results = check_stale_jobs()
updated = sum(1 for r in results if r["action"] == "updated")
revoked = sum(1 for r in results if r["action"] == "revoked")
Expand Down
20 changes: 20 additions & 0 deletions ami/jobs/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ def test_create_job(self):
self.assertEqual(job.progress.summary.progress, 0)
self.assertEqual(job.progress.stages, [])

def test_save_does_not_inflate_failed_stage_progress(self):
"""A stage marked FAILURE at partial progress must keep its measured value.

Regression for the premature ``cleanup_async_job_resources`` path: when a
worker writes ``status=FAILURE`` at partial progress (e.g. failed/total
crossed FAILURE_THRESHOLD on an early result), ``Job.update_progress``
used to coerce ``stage.progress = 1`` on the next save. That made
``is_complete()`` return True and triggered cleanup while async results
were still in flight. Progress is a measurement; leave it alone.
"""
job = Job.objects.create(project=self.project, name="Test job - partial failure")
job.progress.add_stage("results")
job.progress.update_stage("results", progress=0.3, status=JobState.FAILURE)
job.save()

results_stage = job.progress.get_stage("results")
self.assertEqual(results_stage.progress, 0.3)
self.assertEqual(results_stage.status, JobState.FAILURE)
self.assertFalse(job.progress.is_complete())

def test_create_job_with_delay(self):
job = Job.objects.create(
job_type_key=MLJob.key,
Expand Down
6 changes: 3 additions & 3 deletions ami/jobs/tests/test_periodic_beat_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ class JobsHealthCheckTest(TestCase):
def setUp(self):
self.project = Project.objects.create(name="Beat schedule test project")

def _create_stale_job(self, status=JobState.STARTED, hours_ago=100):
def _create_stale_job(self, status=JobState.STARTED, minutes_ago=120):
job = Job.objects.create(project=self.project, name="stale", status=status)
Job.objects.filter(pk=job.pk).update(updated_at=timezone.now() - timedelta(hours=hours_ago))
Job.objects.filter(pk=job.pk).update(updated_at=timezone.now() - timedelta(minutes=minutes_ago))
job.refresh_from_db()
return job

Expand Down Expand Up @@ -55,7 +55,7 @@ def test_reports_both_sub_check_results(self, mock_manager_cls, _mock_cleanup):

def test_idle_deployment_returns_all_zeros(self, mock_manager_cls, _mock_cleanup):
# No stale jobs, no running async jobs.
self._create_stale_job(hours_ago=1) # recent — not stale
self._create_stale_job(minutes_ago=5) # recent — not stale
self._stub_manager(mock_manager_cls)

self.assertEqual(
Expand Down
8 changes: 4 additions & 4 deletions ami/jobs/tests/test_update_stale_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ class CheckStaleJobsTest(TestCase):
def setUp(self):
self.project = Project.objects.create(name="Stale jobs test project")

def _create_job(self, status=JobState.STARTED, hours_ago=100, task_id=None):
def _create_job(self, status=JobState.STARTED, minutes_ago=120, task_id=None):
job = Job.objects.create(
project=self.project,
name=f"Test job {status}",
status=status,
)
Job.objects.filter(pk=job.pk).update(
updated_at=timezone.now() - timedelta(hours=hours_ago),
updated_at=timezone.now() - timedelta(minutes=minutes_ago),
)
if task_id is not None:
Job.objects.filter(pk=job.pk).update(task_id=task_id)
Expand Down Expand Up @@ -114,8 +114,8 @@ def test_revokes_when_celery_lookup_fails(self, mock_async_result, mock_cleanup)
@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
def test_skips_recent_and_final_state_jobs(self, mock_cleanup):
"""Recent jobs and jobs in final states are not touched."""
self._create_job(status=JobState.STARTED, hours_ago=1) # recent
self._create_job(status=JobState.SUCCESS, hours_ago=200) # final state
self._create_job(status=JobState.STARTED, minutes_ago=5) # recent
self._create_job(status=JobState.SUCCESS, minutes_ago=300) # final state

results = check_stale_jobs()

Expand Down
2 changes: 1 addition & 1 deletion ami/jobs/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def get_queryset(self) -> QuerySet:
if project:
jobs = jobs.filter(project=project)
cutoff_hours = IntegerField(required=False, min_value=0).clean(
self.request.query_params.get("cutoff_hours", Job.FAILED_CUTOFF_HOURS)
self.request.query_params.get("cutoff_hours", Job.FAILED_JOBS_DISPLAY_MAX_HOURS)
)
# Filter out completed jobs that have not been updated in the last X hours
cutoff_datetime = timezone.now() - timezone.timedelta(hours=cutoff_hours)
Expand Down
Loading