(feat): temporal agent works with the agentex ui#153
Open
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
When creating a temporal chatbot updates are not streamed to the UI. that requires constant refreshing of the UI to see new messages. This fix aligns these and ensures that the temporal-based chatbots will work in the agentex UI.
Greptile Summary
This PR enables real-time UI updates for temporal-based chatbots by publishing Redis stream events whenever task messages are created or updated. Previously, the
TaskMessageServiceonly persisted messages to MongoDB without notifying subscribed UI clients, requiring manual page refreshes to see new messages.TaskMessageServicenow publishesStreamTaskMessageFullevents to Redis on everyappend_message,append_messages,update_message, andupdate_messagescall, using the sametask:{task_id}topic that the SSE stream endpoint reads from.stream_repositorydependency injected intoTaskMessageService, following the same pattern already used byAgentTaskService._publish_message_full_eventcatches and logs exceptions without disrupting the core message CRUD operations.stream_repositoryparameter.Confidence Score: 4/5
_publish_message_full_eventprevents stream failures from breaking message operations. All test fixtures are properly updated. The only consideration is potential duplicate Redis events when the ACP streaming path (which already yields NDJSON) also triggers these new Redis publishes, but this is harmless and actually beneficial for SSE subscribers.agentex/src/domain/services/task_message_service.py— core logic change with new Redis stream publishing on all message mutation pathsImportant Files Changed
_publish_message_full_eventto all message create/update paths. Newstream_repositorydependency injected. Exception handling prevents stream failures from breaking message operations. Hardcodedindex=0is acceptable for standalone full events.stream_repository/redis_stream_repositorytoTaskMessageService. Keyword argument names corrected to match the constructor.stream_repositoryfrom isolated repositories when constructingTaskMessageService. Aligns with the new constructor signature.redis_stream_repositorytoTaskMessageService. No test logic changes needed since stream publishing is fire-and-forget with error swallowing.redis_stream_repositorytoTaskMessageServiceconstructor. No test logic changes required.Sequence Diagram
sequenceDiagram participant Temporal as Temporal Agent participant TMS as TaskMessageService participant MongoDB as MongoDB participant Redis as Redis Stream participant SSE as SSE Endpoint participant UI as Agentex UI Temporal->>TMS: append_message(task_id, content) TMS->>MongoDB: create(task_message) MongoDB-->>TMS: created_message TMS->>Redis: send_data(task:{task_id}, StreamTaskMessageFull) Note over Redis: New! Previously missing TMS-->>Temporal: created_message UI->>SSE: GET /tasks/{task_id}/stream SSE->>Redis: read_messages(task:{task_id}) Redis-->>SSE: StreamTaskMessageFull event SSE-->>UI: SSE event (real-time update)Last reviewed commit: bcd22b3
(5/5) You can turn off certain types of comments like style here!