Add task_expires_seconds for async endpoint task expiration#754
Add task_expires_seconds for async endpoint task expiration#754lukasewecker wants to merge 8 commits intomainfrom
Conversation
Add configurable task expiration for async endpoints, allowing users to specify how long a task can wait in the queue before being discarded by Celery workers. Defaults to 86400 seconds (24 hours) if not specified. Changes: - Add task_expires_seconds field to DTOs, entities, and ORM model - Pass expires parameter to Celery task queue gateway - Add K8s annotations for celery autoscaler configuration - Include database migration for new column - Add comprehensive unit tests for the feature Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| self, | ||
| topic: str, | ||
| predict_request: EndpointPredictV1Request, | ||
| task_timeout_seconds: int, |
There was a problem hiding this comment.
I believe this is being used in async_inference_use_cases.py
There was a problem hiding this comment.
I just renamed the parameter to task_expires_seconds here. Can you explain more what you mean?
There was a problem hiding this comment.
oh i was mentioning that this is currently used in https://github.com/scaleapi/llm-engine/blob/main/model-engine/model_engine_server/domain/use_cases/async_inference_use_cases.py#L72
There was a problem hiding this comment.
The issue was that it wasnt properly passed down to when celery creates the task:
Also it wasnt possible to configure this parameter. So basically what I did was
- rename it (you could argue if thats necessary)
- pass it down properly to downstream code
- make it configurable from outside
|
incorporated changes from this other PR #760 as requested by @dmchoiboi reasoning for these timeout changes:
|
model-engine/model_engine_server/common/dtos/llms/model_endpoints.py
Outdated
Show resolved
Hide resolved
...-engine/model_engine_server/infra/gateways/resources/sqs_queue_endpoint_resource_delegate.py
Outdated
Show resolved
Hide resolved
...-engine/model_engine_server/infra/gateways/resources/sqs_queue_endpoint_resource_delegate.py
Outdated
Show resolved
Hide resolved
...-engine/model_engine_server/infra/gateways/resources/asb_queue_endpoint_resource_delegate.py
Outdated
Show resolved
Hide resolved
...l_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml
Show resolved
Hide resolved
| task_expires_seconds: Optional[int] = Field( | ||
| default=None, | ||
| description="For async endpoints, how long a task can wait in queue before expiring (in seconds).", | ||
| ) |
There was a problem hiding this comment.
can we add queue_message_timeout_seconds to this
| @@ -35,6 +35,7 @@ class BuildEndpointRequest(BaseModel): | |||
| high_priority: Optional[bool] = None | |||
| default_callback_url: Optional[str] = None | |||
| default_callback_auth: Optional[CallbackAuth] = None | |||
There was a problem hiding this comment.
do we need to pipe task_expires_seconds in here as well?
| resource_state: Optional[ModelEndpointResourceState] = Field(default=None) | ||
| num_queued_items: Optional[int] = Field(default=None) | ||
| public_inference: Optional[bool] = Field(default=None) | ||
| task_expires_seconds: Optional[int] = Field(default=None) |
There was a problem hiding this comment.
add queue_message_timeout_seconds
| resource_state=(None if infra_state is None else infra_state.resource_state), | ||
| num_queued_items=(None if infra_state is None else infra_state.num_queued_items), | ||
| public_inference=model_endpoint.record.public_inference, | ||
| task_expires_seconds=model_endpoint.record.task_expires_seconds, |
There was a problem hiding this comment.
add queue_message_timeout_seconds
| pass | ||
|
|
||
| if queue_message_timeout_seconds is not None: | ||
| lock_duration = timedelta(seconds=min(queue_message_timeout_seconds, 300)) |
There was a problem hiding this comment.
extract 300 to a const, and can we bump to a higher default? some requests can take > 5min
Pull Request Summary
Add configurable task expiration for async endpoints, allowing users to specify how long a task can wait in the queue before being discarded by Celery workers. Defaults to 86400 seconds (24 hours) if not specified.
Keep in mind that this change does not automatically discard a task from the queue once it is stale. It only marks a task as stale, which then waits in the queue to be picked up by the worker. Once picked up by a worker, the worker then notices that the task is stale and will not process it. This means that when a stale task is never picked up by the worker (e.g. no worker is available), it stays in the queue.
Changes:
Note
alembic update head).task_timeout_secondsbefore, which was not used in the downstream tasks though. It was all renamed totask_expires_seconds, and is now passed properly to the celery tasks.Example API usage
When creating a new model endpoint via the api, you can now pass the parameter:
Test Plan and Usage Guide
Ran unit tests and added new tests for the new feature.
Also, tested locally with this script (spun up Redis locally):
Greptile Summary
This PR adds configurable task expiration for async endpoints via a new
task_expires_secondsfield, allowing users to control how long a Celery task can wait in the queue before being discarded by workers (defaults to 86400 seconds / 24 hours). It also introducesqueue_message_timeout_secondsto configure SQS VisibilityTimeout and ASB lock_duration for queue message processing. The changes span the full stack: DTOs, domain entities, ORM model, database migration, use cases, gateways, K8s templates, and tests.task_expires_seconds(persisted to DB) andqueue_message_timeout_seconds(passed to queue infrastructure) to create/update endpoint flows for both base and LLM endpointstask_timeout_secondstotask_expires_secondsthroughout the codebase for clarity and now actually passes it to Celery'sexpiresparametercelery.scaleml.autoscaler/taskExpiresSecondsto async deployment templates for autoscaler awarenesstask_expires_secondscolumn and comprehensive unit testscharts/model-engine/templates/_helpers.tpl) was not updated with the newtaskExpiresSecondsannotation, creating an inconsistency with the CircleCI templateConfidence Score: 4/5
taskExpiresSecondsannotation in the Helm chart template, which could cause inconsistent behavior for Helm-based deployments. All other changes follow existing patterns correctly.charts/model-engine/templates/_helpers.tplneeds thetaskExpiresSecondsannotation added tomodelEngine.serviceTemplateAsyncAnnotationsfor parity with the CircleCI template.Important Files Changed
queue_message_timeout_secondsandtask_expires_secondsfields to Create/Update request DTOs and Get response DTO with appropriate validation (gt=0 for task_expires, ge=1/le=43200 for queue_message_timeout).DEFAULT_TASK_TIMEOUT_SECONDStoDEFAULT_TASK_EXPIRES_SECONDS, readstask_expires_secondsfrom the endpoint record with fallback to default. Correctly passes the value to the inference gateway.expiresparameter through tocelery_dest.send_task(). Simple and correct change.TASK_EXPIRES_SECONDSto_AsyncDeploymentArgumentsTypedDict and populates it in all 4 async deployment argument builders usingmodel_endpoint_record.task_expires_seconds or DEFAULT_TASK_EXPIRES_SECONDS.celery.scaleml.autoscaler/taskExpiresSecondsannotation to all 4 async deployment templates. However, the equivalent Helm chart template (charts/model-engine/templates/_helpers.tpl) was not updated.queue_message_timeout_secondssupport for SQS VisibilityTimeout. Updates existing queues when parameter is provided, and uses it (with 43200 fallback) for new queues.queue_message_timeout_secondssupport for ASB lock_duration with proper guard (only updates when not None) and 300-second cap. Handles errors gracefully with logging.task_expires_secondsinteger column to theendpointstable. Proper upgrade/downgrade implemented.task_expires_secondsthrough create/update flows to record repository andqueue_message_timeout_secondsto infra gateway.task_expires_secondsto both create and update paths, usingdict_not_nonefor the update to correctly handle None values.Sequence Diagram
sequenceDiagram participant Client participant API participant UseCase as AsyncInferenceUseCase participant Service as ModelEndpointService participant Record as EndpointRecord (DB) participant InfraGW as InfraGateway participant Queue as Queue (SQS/ASB/Redis) participant CeleryGW as CeleryTaskQueueGateway participant Worker as Celery Worker Note over Client,Worker: Endpoint Creation Flow Client->>API: POST /model-endpoints (task_expires_seconds=3600, queue_message_timeout_seconds=600) API->>Service: create_model_endpoint(task_expires_seconds, queue_message_timeout_seconds) Service->>Record: create_record(task_expires_seconds=3600) Service->>InfraGW: create_infra(queue_message_timeout_seconds=600) InfraGW->>Queue: create_queue(VisibilityTimeout=600) InfraGW-->>Service: K8s deployment with taskExpiresSeconds annotation Note over Client,Worker: Async Task Submission Flow Client->>API: POST /async-tasks API->>UseCase: execute(endpoint_id, request) UseCase->>Record: read task_expires_seconds (3600) UseCase->>CeleryGW: send_task(expires=3600) CeleryGW->>Queue: send_task(expires=3600) Queue->>Worker: pick up task Worker->>Worker: check if task expired (>3600s old → discard)Last reviewed commit: a60fb88