Skip to content

fix(streaming): prefer event.task_id when building StreamChunk.task_id#5375

Open
llaoj wants to merge 2 commits intocrewAIInc:mainfrom
llaoj:main
Open

fix(streaming): prefer event.task_id when building StreamChunk.task_id#5375
llaoj wants to merge 2 commits intocrewAIInc:mainfrom
llaoj:main

Conversation

@llaoj
Copy link
Copy Markdown

@llaoj llaoj commented Apr 9, 2026

Summary

This PR makes StreamChunk.task_id use event.task_id as the primary source, with a backward-compatible fallback to current_task_info["id"].

  • Before:
    • StreamChunk.task_id could be empty when current_task_info["id"] was empty.
  • After:
    • StreamChunk.task_id = event.task_id or current_task_info["id"] or "".

Why this matters

LLMStreamChunkEvent.task_id is populated from from_task in event normalization, while current_task_info["id"] may remain empty in some streaming contexts.

So even when task identity exists in the event, downstream consumers can still receive chunks with empty chunk.task_id if StreamChunk.task_id is built only from current_task_info.

Real downstream example (SSE filtering)

In a downstream FastAPI SSE endpoint, we collect crew task IDs and filter stream chunks by chunk.task_id:

crew = CrewAgent().crew()
task_id_set = {str(task.id) for task in crew.tasks}

async for chunk in streaming:
    content = getattr(chunk, "content", None)
    task_id = getattr(chunk, "task_id", None)
    if content and task_id in task_id_set:
        await queue.put(f"data: {json.dumps(content, ensure_ascii=False)}\n\n")

If chunk.task_id is empty, valid chunks can be dropped by filtering logic.

Related concurrency concern (important, not fully solved by this PR)

While each request/run typically has its own queue, stream handlers are registered on the singleton crewai_event_bus.
That means with concurrent streaming runs:

  • Request A registers handler_A -> writes to queue_A
  • Request B registers handler_B -> writes to queue_B
  • A single LLMStreamChunkEvent can be dispatched to all active handlers

So this is not a "shared queue" problem; it is an event fan-out to multiple handlers problem on a global event bus, which can cause cross-run chunk contamination if handler-level scoping/filtering is missing.

This PR addresses task_id correctness in StreamChunk.
A follow-up improvement could add stronger run/stream scoping for handler dispatch.

Compatibility

Backward compatible:
If event.task_id is missing, existing fallback behavior (current_task_info["id"]) is preserved.

Test plan

  • Verify stream chunks now carry non-empty task_id when event contains task context.
  • Verify fallback still works when event.task_id is unavailable.
  • Validate downstream filtering by task IDs no longer drops valid chunks due to empty task_id.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant