feat: add graceful cancellation support (aclose/cancel) for streaming outputs#5313
feat: add graceful cancellation support (aclose/cancel) for streaming outputs#5313devin-ai-integration[bot] wants to merge 2 commits intomainfrom
Conversation
… outputs Implements aclose() and cancel() methods on CrewStreamingOutput and FlowStreamingOutput to allow callers to abort in-flight streaming and release resources promptly (e.g. when an HTTP client disconnects). - Add aclose() async method for async streaming contexts - Add cancel() sync method for sync streaming contexts - Add is_cancelled property to check cancellation state - Update create_async_chunk_generator to support cancel events - Update create_chunk_generator to support cancel thread events - Add comprehensive tests for cancellation scenarios Closes #5312 Co-Authored-By: João <joao@crewai.com>
|
Prompt hidden (unlisted session) |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
Addresses code quality review comment about bare 'await task' having no visible effect. Now consistently handles CancelledError/Exception in both the normal and cancellation paths of the finally block. Co-Authored-By: João <joao@crewai.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit da65140. Configure here.
| except (asyncio.CancelledError, Exception): # noqa: S110 | ||
| pass | ||
|
|
||
| self._completed = True |
There was a problem hiding this comment.
_wire_cancel ignores prior cancellation, losing cancel signal
Medium Severity
Calling aclose() or cancel() before a streaming generator starts iterating doesn't effectively stop the process. The cancellation events and background tasks are only wired during the generator's first iteration. This means pre-requested cancellations are ignored, and the stream runs to completion, resulting in an inconsistent state where is_cancelled is true.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit da65140. Configure here.


Summary
Adds
aclose()andcancel()methods toCrewStreamingOutputandFlowStreamingOutput(via the sharedStreamingOutputBase) so callers can abort in-flight streaming and release resources promptly — e.g. when an HTTP client disconnects from a FastAPIStreamingResponse.Types (
types/streaming.py):_cancelledflag,_cancel_event(asyncio),_cancel_thread_event(threading),_background_task,_background_threadattributes onStreamingOutputBaseaclose()— async cancellation: sets cancel events, cancels the backgroundasyncio.Task, awaits cleanupcancel()— sync cancellation: sets cancel events, fires task cancellation (no await), marks completedis_cancelledpropertyUtilities (
utilities/streaming.py):create_chunk_generator(sync): switched from blockingqueue.get()to polling with 0.1s timeout so cancellation can be detected between chunkscreate_async_chunk_generator: switched fromawait queue.get()toasyncio.wait()racing queue-get vs cancel-event, with proper task cancellation in the finally block_wire_cancel()helper called each iterationTests: 14 new tests in
TestStreamingCancellationcovering async close, sync cancel, background task cancellation, idempotency, no-op on completed streams, FlowStreamingOutput support, and multi-call safety.Closes #5312
Review & Testing Checklist for Human
create_chunk_generatornow polls with a 0.1s timeout instead of a blockingqueue.get(). Verify this doesn't introduce noticeable latency in chunk delivery for real LLM streaming workloads.create_async_chunk_generatornow creates two futures (ensure_future) and callsasyncio.waitper chunk. Confirm this is acceptable overhead vs the original singleawait queue.get()._wire_cancel: The wiring mutatesoutput_holder[0]'s internal attributes on every iteration. Verify no race with concurrentcancel()/aclose()calls from another thread/task.cancel()does not join the background thread: By design, synccancel()only signals and marks complete — the daemon thread may still run briefly. Verify this is acceptable for the intended FastAPI disconnect use case.aclose()through the fullCrew.akickoff(stream=True)→ cancel path to validate end-to-end behavior with the real event bus and queue wiring.Notes
_background_threadattribute is stored but not used for active cleanup (Python threads can't be cancelled). Cleanup relies on the polling loop checking thethreading.Event._wire_cancel()is called every loop iteration; it could be optimized to wire once, but the cost is minimal (just attribute assignment + truthiness check).Link to Devin session: https://app.devin.ai/sessions/d15a57dd90ab46c2a89fa4288605d1f4
Note
Medium Risk
Adds new cancellation paths that alter sync/async streaming control flow and background task/thread lifecycle handling; mistakes could cause leaked handlers/tasks or prematurely completed streams under race conditions.
Overview
Adds graceful cancellation to streaming outputs via new
StreamingOutputBase.aclose()(async) andcancel()(sync), plus anis_cancelledflag, so callers can abort in-flight crew/flow streaming and mark streams completed.Updates the sync and async chunk generators to support cooperative cancellation by wiring per-stream cancel events into the output object and changing queue waits to be interruptible (sync polling with timeout; async
wait()racing queue-get vs cancel-event), with adjusted cleanup logic for background tasks/threads.Extends
test_streaming.pywith a new cancellation-focused test suite covering idempotency, completed-stream no-ops, background task cancellation, and bothCrewStreamingOutputandFlowStreamingOutputbehavior.Reviewed by Cursor Bugbot for commit da65140. Bugbot is set up for automated code reviews on this repo. Configure here.