Skip to content

fix: add run_id scoping to streaming handlers to prevent cross-run chunk contamination (#5376)#5377

Open
devin-ai-integration[bot] wants to merge 2 commits intomainfrom
devin/1775713725-fix-streaming-concurrent-run-isolation
Open

fix: add run_id scoping to streaming handlers to prevent cross-run chunk contamination (#5376)#5377
devin-ai-integration[bot] wants to merge 2 commits intomainfrom
devin/1775713725-fix-streaming-concurrent-run-isolation

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot commented Apr 9, 2026

Summary

Fixes #5376 — the singleton CrewAIEventsBus fans out LLMStreamChunkEvent to all registered stream handlers. When multiple streaming runs execute concurrently, each run's handler receives chunks from every other run, causing cross-run chunk contamination.

Approach: Each streaming run is assigned a unique run_id (UUID) that flows through via contextvars.ContextVar. Events are stamped with the run_id at emission time, and handlers filter to only accept events matching their own run_id.

Changes:

  • LLMStreamChunkEvent — new optional run_id: str | None = None field
  • streaming.py — new _current_stream_run_id context var, get_current_stream_run_id() accessor; create_streaming_state() generates + sets run_id; _create_stream_handler() filters by run_id
  • base_llm.py / llm.py — all 4 LLMStreamChunkEvent emission sites stamp run_id=get_current_stream_run_id()
  • 5 new tests in TestStreamingRunIsolation covering handler filtering, concurrent state isolation, and multi-thread isolation

Review & Testing Checklist for Human

  • Filter semantics when run_id is None: The handler passes events through when either the handler's or event's run_id is None (streaming.py:148-150). This preserves backward compatibility but means un-stamped events still fan out to all handlers. Verify this is the desired behavior for your use cases.
  • Context propagation in all thread/async paths: contextvars.ContextVar does NOT auto-propagate to child threads. create_chunk_generator already uses contextvars.copy_context() — verify no other code paths spawn threads without context copying that would lose the run_id.
  • Test fragility in _get_run_id_from_handler: One test introspects handler closure cells to extract the captured run_id by matching UUID-shaped strings. This works but is coupled to closure internals.

Suggested test plan: Run two concurrent Crew(stream=True).kickoff() calls against a real or mocked LLM and verify each stream's chunks contain only content from its own run.

Notes

  • run_id defaults to None so all existing code/serialization is unaffected
  • All 39 existing streaming tests continue to pass alongside the 5 new ones

Link to Devin session: https://app.devin.ai/sessions/bc659bf095044869b49955ca3f222f65


Note

Medium Risk
Changes core streaming/event-bus plumbing by introducing a per-run run_id and filtering handlers, which could affect chunk delivery in concurrent or nested streaming scenarios (especially when run_id is missing). Covered by new concurrency-focused tests but still touches a central execution path.

Overview
Fixes concurrent streaming chunk cross-talk by introducing a per-streaming-run run_id and scoping LLMStreamChunkEvent delivery to matching handlers.

create_streaming_state() now generates/reuses a UUID run scope via a ContextVar, LLM streaming emission sites stamp run_id onto each LLMStreamChunkEvent, and stream handlers drop chunks from other runs. Adds a new TestStreamingRunIsolation suite to verify handler filtering and isolation across contexts/threads.

Reviewed by Cursor Bugbot for commit 0a22307. Bugbot is set up for automated code reviews on this repo. Configure here.

…unk contamination (#5376)

The singleton event bus fans out LLMStreamChunkEvent to all registered
handlers. When multiple streaming runs execute concurrently, each run's
handler receives chunks from all other runs, causing cross-run chunk
contamination.

Fix:
- Add run_id field to LLMStreamChunkEvent
- Use contextvars.ContextVar to track the current streaming run_id
- create_streaming_state() generates a unique run_id per run and sets it
  in the context var
- LLM emit paths (base_llm.py, llm.py) stamp run_id on emitted events
- Stream handler filters events by matching run_id

Tests:
- Handler rejects events from different run_id
- Two concurrent streaming states receive only their own events
- Two concurrent threads with isolated contextvars receive only their
  own chunks
- run_id field defaults to None for backward compatibility

Co-Authored-By: João <joao@crewai.com>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Prompt hidden (unlisted session)

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions github-actions bot added the size/L label Apr 9, 2026
…lation

Co-Authored-By: João <joao@crewai.com>
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 0a22307. Configure here.


# Filter: only accept events belonging to this streaming run.
if run_id is not None and event.run_id is not None and event.run_id != run_id:
return
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Permissive filter allows un-stamped event cross-contamination

Medium Severity

The three-way AND filter (run_id is not None and event.run_id is not None and event.run_id != run_id) only rejects events when both run_ids are set and differ. When event.run_id is None, the check short-circuits and accepts the event into every handler. This means if context propagation fails (e.g., a thread spawned without contextvars.copy_context()), get_current_stream_run_id() returns None, events are emitted without a run_id, and they leak to all concurrent handlers — reproducing the exact cross-run chunk contamination this PR aims to fix. A two-condition check (run_id is not None and event.run_id != run_id) would provide complete isolation, which is safe since all four emission sites now stamp run_id.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 0a22307. Configure here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The three-way check is intentionally permissive for backward compatibility. External code or plugins may emit LLMStreamChunkEvent without stamping run_id. Tightening to a two-condition check would cause those un-stamped events to be silently dropped by all handlers, breaking backward compatibility.

The second commit (0a22307) also addresses the "context var overwrite" concern (comment 5) by reusing the parent run_id when one exists: run_id = _current_stream_run_id.get() or str(uuid.uuid4()). This ensures nested streaming (flow → crew) shares the same scope, and create_chunk_generator's contextvars.copy_context() correctly captures the run_id at iteration time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Streaming handlers on singleton event bus can fan out chunks across concurrent runs

0 participants