Skip to content

feat: add graceful cancellation support (aclose/cancel) for streaming outputs#5313

Open
devin-ai-integration[bot] wants to merge 2 commits intomainfrom
devin/1775554369-streaming-graceful-cancellation
Open

feat: add graceful cancellation support (aclose/cancel) for streaming outputs#5313
devin-ai-integration[bot] wants to merge 2 commits intomainfrom
devin/1775554369-streaming-graceful-cancellation

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot commented Apr 7, 2026

Summary

Adds aclose() and cancel() methods to CrewStreamingOutput and FlowStreamingOutput (via the shared StreamingOutputBase) so callers can abort in-flight streaming and release resources promptly — e.g. when an HTTP client disconnects from a FastAPI StreamingResponse.

Types (types/streaming.py):

  • New _cancelled flag, _cancel_event (asyncio), _cancel_thread_event (threading), _background_task, _background_thread attributes on StreamingOutputBase
  • aclose() — async cancellation: sets cancel events, cancels the background asyncio.Task, awaits cleanup
  • cancel() — sync cancellation: sets cancel events, fires task cancellation (no await), marks completed
  • is_cancelled property
  • Both are idempotent (no-op on already-completed streams)

Utilities (utilities/streaming.py):

  • create_chunk_generator (sync): switched from blocking queue.get() to polling with 0.1s timeout so cancellation can be detected between chunks
  • create_async_chunk_generator: switched from await queue.get() to asyncio.wait() racing queue-get vs cancel-event, with proper task cancellation in the finally block
  • Both generators wire their cancel primitives to the streaming output object via a _wire_cancel() helper called each iteration

Tests: 14 new tests in TestStreamingCancellation covering async close, sync cancel, background task cancellation, idempotency, no-op on completed streams, FlowStreamingOutput support, and multi-call safety.

Closes #5312

Review & Testing Checklist for Human

  • Performance regression in sync streaming: create_chunk_generator now polls with a 0.1s timeout instead of a blocking queue.get(). Verify this doesn't introduce noticeable latency in chunk delivery for real LLM streaming workloads.
  • Async generator overhead: create_async_chunk_generator now creates two futures (ensure_future) and calls asyncio.wait per chunk. Confirm this is acceptable overhead vs the original single await queue.get().
  • Thread safety of _wire_cancel: The wiring mutates output_holder[0]'s internal attributes on every iteration. Verify no race with concurrent cancel()/aclose() calls from another thread/task.
  • cancel() does not join the background thread: By design, sync cancel() only signals and marks complete — the daemon thread may still run briefly. Verify this is acceptable for the intended FastAPI disconnect use case.
  • Integration test gap: All new tests use mocked generators. Consider testing aclose() through the full Crew.akickoff(stream=True) → cancel path to validate end-to-end behavior with the real event bus and queue wiring.

Notes

  • The _background_thread attribute is stored but not used for active cleanup (Python threads can't be cancelled). Cleanup relies on the polling loop checking the threading.Event.
  • _wire_cancel() is called every loop iteration; it could be optimized to wire once, but the cost is minimal (just attribute assignment + truthiness check).

Link to Devin session: https://app.devin.ai/sessions/d15a57dd90ab46c2a89fa4288605d1f4


Note

Medium Risk
Adds new cancellation paths that alter sync/async streaming control flow and background task/thread lifecycle handling; mistakes could cause leaked handlers/tasks or prematurely completed streams under race conditions.

Overview
Adds graceful cancellation to streaming outputs via new StreamingOutputBase.aclose() (async) and cancel() (sync), plus an is_cancelled flag, so callers can abort in-flight crew/flow streaming and mark streams completed.

Updates the sync and async chunk generators to support cooperative cancellation by wiring per-stream cancel events into the output object and changing queue waits to be interruptible (sync polling with timeout; async wait() racing queue-get vs cancel-event), with adjusted cleanup logic for background tasks/threads.

Extends test_streaming.py with a new cancellation-focused test suite covering idempotency, completed-stream no-ops, background task cancellation, and both CrewStreamingOutput and FlowStreamingOutput behavior.

Reviewed by Cursor Bugbot for commit da65140. Bugbot is set up for automated code reviews on this repo. Configure here.

… outputs

Implements aclose() and cancel() methods on CrewStreamingOutput and
FlowStreamingOutput to allow callers to abort in-flight streaming and
release resources promptly (e.g. when an HTTP client disconnects).

- Add aclose() async method for async streaming contexts
- Add cancel() sync method for sync streaming contexts
- Add is_cancelled property to check cancellation state
- Update create_async_chunk_generator to support cancel events
- Update create_chunk_generator to support cancel thread events
- Add comprehensive tests for cancellation scenarios

Closes #5312

Co-Authored-By: João <joao@crewai.com>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Prompt hidden (unlisted session)

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions github-actions bot added the size/L label Apr 7, 2026
Addresses code quality review comment about bare 'await task' having
no visible effect. Now consistently handles CancelledError/Exception
in both the normal and cancellation paths of the finally block.

Co-Authored-By: João <joao@crewai.com>
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit da65140. Configure here.

except (asyncio.CancelledError, Exception): # noqa: S110
pass

self._completed = True
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_wire_cancel ignores prior cancellation, losing cancel signal

Medium Severity

Calling aclose() or cancel() before a streaming generator starts iterating doesn't effectively stop the process. The cancellation events and background tasks are only wired during the generator's first iteration. This means pre-requested cancellations are ignored, and the stream runs to completion, resulting in an inconsistent state where is_cancelled is true.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit da65140. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support for Graceful Cancellation and Resource Cleanup via aclose()/cancel() on CrewStreamingOutput Streaming Objects

0 participants