fix: add run_id scoping to streaming handlers to prevent cross-run chunk contamination (#5376)#5377
Conversation
…unk contamination (#5376) The singleton event bus fans out LLMStreamChunkEvent to all registered handlers. When multiple streaming runs execute concurrently, each run's handler receives chunks from all other runs, causing cross-run chunk contamination. Fix: - Add run_id field to LLMStreamChunkEvent - Use contextvars.ContextVar to track the current streaming run_id - create_streaming_state() generates a unique run_id per run and sets it in the context var - LLM emit paths (base_llm.py, llm.py) stamp run_id on emitted events - Stream handler filters events by matching run_id Tests: - Handler rejects events from different run_id - Two concurrent streaming states receive only their own events - Two concurrent threads with isolated contextvars receive only their own chunks - run_id field defaults to None for backward compatibility Co-Authored-By: João <joao@crewai.com>
|
Prompt hidden (unlisted session) |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
…lation Co-Authored-By: João <joao@crewai.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 0a22307. Configure here.
|
|
||
| # Filter: only accept events belonging to this streaming run. | ||
| if run_id is not None and event.run_id is not None and event.run_id != run_id: | ||
| return |
There was a problem hiding this comment.
Permissive filter allows un-stamped event cross-contamination
Medium Severity
The three-way AND filter (run_id is not None and event.run_id is not None and event.run_id != run_id) only rejects events when both run_ids are set and differ. When event.run_id is None, the check short-circuits and accepts the event into every handler. This means if context propagation fails (e.g., a thread spawned without contextvars.copy_context()), get_current_stream_run_id() returns None, events are emitted without a run_id, and they leak to all concurrent handlers — reproducing the exact cross-run chunk contamination this PR aims to fix. A two-condition check (run_id is not None and event.run_id != run_id) would provide complete isolation, which is safe since all four emission sites now stamp run_id.
Reviewed by Cursor Bugbot for commit 0a22307. Configure here.
There was a problem hiding this comment.
The three-way check is intentionally permissive for backward compatibility. External code or plugins may emit LLMStreamChunkEvent without stamping run_id. Tightening to a two-condition check would cause those un-stamped events to be silently dropped by all handlers, breaking backward compatibility.
The second commit (0a22307) also addresses the "context var overwrite" concern (comment 5) by reusing the parent run_id when one exists: run_id = _current_stream_run_id.get() or str(uuid.uuid4()). This ensures nested streaming (flow → crew) shares the same scope, and create_chunk_generator's contextvars.copy_context() correctly captures the run_id at iteration time.


Summary
Fixes #5376 — the singleton
CrewAIEventsBusfans outLLMStreamChunkEventto all registered stream handlers. When multiple streaming runs execute concurrently, each run's handler receives chunks from every other run, causing cross-run chunk contamination.Approach: Each streaming run is assigned a unique
run_id(UUID) that flows through viacontextvars.ContextVar. Events are stamped with therun_idat emission time, and handlers filter to only accept events matching their ownrun_id.Changes:
LLMStreamChunkEvent— new optionalrun_id: str | None = Nonefieldstreaming.py— new_current_stream_run_idcontext var,get_current_stream_run_id()accessor;create_streaming_state()generates + setsrun_id;_create_stream_handler()filters byrun_idbase_llm.py/llm.py— all 4LLMStreamChunkEventemission sites stamprun_id=get_current_stream_run_id()TestStreamingRunIsolationcovering handler filtering, concurrent state isolation, and multi-thread isolationReview & Testing Checklist for Human
run_idisNone: The handler passes events through when either the handler's or event'srun_idisNone(streaming.py:148-150). This preserves backward compatibility but means un-stamped events still fan out to all handlers. Verify this is the desired behavior for your use cases.contextvars.ContextVardoes NOT auto-propagate to child threads.create_chunk_generatoralready usescontextvars.copy_context()— verify no other code paths spawn threads without context copying that would lose therun_id._get_run_id_from_handler: One test introspects handler closure cells to extract the capturedrun_idby matching UUID-shaped strings. This works but is coupled to closure internals.Suggested test plan: Run two concurrent
Crew(stream=True).kickoff()calls against a real or mocked LLM and verify each stream's chunks contain only content from its own run.Notes
run_iddefaults toNoneso all existing code/serialization is unaffectedLink to Devin session: https://app.devin.ai/sessions/bc659bf095044869b49955ca3f222f65
Note
Medium Risk
Changes core streaming/event-bus plumbing by introducing a per-run
run_idand filtering handlers, which could affect chunk delivery in concurrent or nested streaming scenarios (especially whenrun_idis missing). Covered by new concurrency-focused tests but still touches a central execution path.Overview
Fixes concurrent streaming chunk cross-talk by introducing a per-streaming-run
run_idand scopingLLMStreamChunkEventdelivery to matching handlers.create_streaming_state()now generates/reuses a UUID run scope via aContextVar, LLM streaming emission sites stamprun_idonto eachLLMStreamChunkEvent, and stream handlers drop chunks from other runs. Adds a newTestStreamingRunIsolationsuite to verify handler filtering and isolation across contexts/threads.Reviewed by Cursor Bugbot for commit 0a22307. Bugbot is set up for automated code reviews on this repo. Configure here.