Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ def _fail_job(job_id: int, reason: str) -> None:
job.save(update_fields=["status", "progress", "finished_at"])

job.logger.error(f"Job {job_id} marked as FAILURE: {reason}")
cleanup_async_job_resources(job.pk, job.logger)
cleanup_async_job_resources(job.pk)
except Job.DoesNotExist:
logger.error(f"Cannot fail job {job_id}: not found")
cleanup_async_job_resources(job_id, logger)
cleanup_async_job_resources(job_id)


def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> None:
Expand Down Expand Up @@ -423,7 +423,7 @@ def cleanup_async_job_if_needed(job) -> None:
# import here to avoid circular imports
from ami.ml.orchestration.jobs import cleanup_async_job_resources

cleanup_async_job_resources(job.pk, job.logger)
cleanup_async_job_resources(job.pk)


@task_prerun.connect(sender=run_job)
Expand Down
55 changes: 42 additions & 13 deletions ami/ml/orchestration/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
logger = logging.getLogger(__name__)


def cleanup_async_job_resources(job_id: int, _logger: logging.Logger) -> bool:
def cleanup_async_job_resources(job_id: int) -> bool:
"""
Clean up NATS JetStream and Redis resources for a completed job.

Expand All @@ -21,37 +21,53 @@ def cleanup_async_job_resources(job_id: int, _logger: logging.Logger) -> bool:

Cleanup failures are logged but don't fail the job - data is already saved.

Resolves the job (and its per-job logger) internally so callers only need
to pass the ``job_id`` — matches the pattern used by ``save_results`` in
``ami/jobs/tasks.py``. If the ``Job`` row is gone (e.g. the
``Job.DoesNotExist`` path in ``_fail_job``), the function falls back to
the module logger and TaskQueueManager's module-logger path.

Args:
job_id: The Job ID (integer primary key)
_logger: Logger to use for logging cleanup results
job_id: The Job ID (integer primary key).
Returns:
bool: True if both cleanups succeeded, False otherwise
"""
# Resolve the logger up front: job.logger when the Job exists, module
# logger otherwise. Matches the pattern used by save_results.
job: Job | None = None
try:
job = Job.objects.get(pk=job_id)
except Job.DoesNotExist:
pass
job_logger: logging.Logger = job.logger if job else logger

redis_success = False
nats_success = False

# Cleanup Redis state
try:
state_manager = AsyncJobStateManager(job_id)
state_manager.cleanup()
_logger.info(f"Cleaned up Redis state for job {job_id}")
job_logger.info(f"Cleaned up Redis state for job {job_id}")
redis_success = True
except Exception as e:
_logger.error(f"Error cleaning up Redis state for job {job_id}: {e}")
job_logger.error(f"Error cleaning up Redis state for job {job_id}: {e}")

# Cleanup NATS resources
# Cleanup NATS resources. Only forward a real per-job logger to
# TaskQueueManager — passing the module logger would mirror cleanup
# lifecycle lines into an unrelated logger.
async def cleanup():
async with TaskQueueManager() as manager:
async with TaskQueueManager(job_logger=job.logger if job else None) as manager:
return await manager.cleanup_job_resources(job_id)

try:
nats_success = async_to_sync(cleanup)()
if nats_success:
_logger.info(f"Cleaned up NATS resources for job {job_id}")
job_logger.info(f"Cleaned up NATS resources for job {job_id}")
else:
_logger.warning(f"Failed to clean up NATS resources for job {job_id}")
job_logger.warning(f"Failed to clean up NATS resources for job {job_id}")
except Exception as e:
_logger.error(f"Error cleaning up NATS resources for job {job_id}: {e}")
job_logger.error(f"Error cleaning up NATS resources for job {job_id}: {e}")

return redis_success and nats_success

Expand Down Expand Up @@ -97,16 +113,29 @@ async def queue_all_images():
successful_queues = 0
failed_queues = 0

async with TaskQueueManager() as manager:
# Pass job.logger so stream/consumer setup, per-image debug lines, and
# publish failures all appear in the UI job log (not just the module
# logger). All log calls inside this block go through manager.log_async
# so module + job logger stay in sync with one consistent API — and
# the sync_to_async bridge for JobLogHandler's ORM save lives in one
# place instead of being re-implemented at every call site.
async with TaskQueueManager(job_logger=job.logger) as manager:
for image_pk, task in tasks:
try:
logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
await manager.log_async(
logging.DEBUG,
f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}",
)
success = await manager.publish_task(
job_id=job.pk,
data=task,
)
except Exception as e:
logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
await manager.log_async(
logging.ERROR,
f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}",
exc_info=True,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
success = False

if success:
Expand Down
Loading
Loading