Skip to content

feat(tool-streams): add tool streams#1584

Draft
paul-nechifor wants to merge 1 commit intodevfrom
paul/feat/allow-streams
Draft

feat(tool-streams): add tool streams#1584
paul-nechifor wants to merge 1 commit intodevfrom
paul/feat/allow-streams

Conversation

@paul-nechifor
Copy link
Contributor

Problem

Closes DIM-XXX

Solution

Breaking Changes

How to Test

Contributor License Agreement

  • I have read and approved the CLA.

@paul-nechifor paul-nechifor marked this pull request as draft March 17, 2026 00:55
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 17, 2026

Greptile Summary

This PR introduces a ToolStream abstraction that lets long-running skills (e.g. PersonFollowSkillContainer, PerceiveLoopSkill) push real-time status updates back to the LLM agent without blocking. Messages are published over LCM, fanned out through an SSE endpoint on the MCP server, and received by a new background SSE-listener thread in the MCP client, which injects them as HumanMessages into the agent's processing queue. The Agent class is also wired to accept the same updates via a direct tool_streams reactive input.

Key issues found:

  • ToolStream.stop() always crashes (dimos/agents/mcp/tool_stream.py:67): pLCMTransport.broadcast requires two positional arguments (_, msg), but the close-message call passes only one. Every ToolStream.stop() invocation will raise TypeError: broadcast() missing 1 required positional argument: 'msg'.
  • PerceiveLoopSkill.start() crashes on startup (dimos/perception/perceive_loop_skill.py:55): self._tool_stream is None until look_out_for() is called, yet start() unconditionally calls self._tool_stream.start(), raising AttributeError on every module startup.
  • Race condition on sse_queues (dimos/agents/mcp/mcp_server.py:219): The plain list app.state.sse_queues is iterated inside a sync subscriber thread while async coroutines append and remove items, which can raise RuntimeError: list changed size during iteration.

Confidence Score: 1/5

  • Not safe to merge — two bugs cause crashes on every normal startup and teardown path.
  • The PR contains two bugs that are guaranteed to crash at runtime on normal usage: (1) ToolStream.stop() calls broadcast() with the wrong number of arguments, raising TypeError every time a skill finishes; (2) PerceiveLoopSkill.start() dereferences self._tool_stream while it is still None, crashing the module on startup. These are not edge-case issues — they occur on the happy path. Additionally, the unprotected sse_queues list introduces a thread-safety hazard. None of the three issues are covered by tests or documentation in the PR.
  • dimos/agents/mcp/tool_stream.py (wrong broadcast call in stop()) and dimos/perception/perceive_loop_skill.py (None dereference in start()) require fixes before merging.

Important Files Changed

Filename Overview
dimos/agents/mcp/tool_stream.py New file introducing the ToolStream class; contains a critical bug in stop() where broadcast() is called with only one argument instead of the required two, causing a TypeError at runtime.
dimos/perception/perceive_loop_skill.py Migrated from AgentSpec.add_message to ToolStream; introduces a bug in start() that calls self._tool_stream.start() on a None value, crashing the module on every startup.
dimos/agents/mcp/mcp_server.py Adds the /mcp/streams SSE endpoint and wires the tool_streams reactive input to broadcast to connected SSE clients; has an unprotected sse_queues list accessed from both async and sync thread contexts.
dimos/agents/mcp/mcp_client.py Adds an SSE listener thread that reconnects on failure and injects tool-stream updates directly into the LLM message queue; reconnection logic and stop handling look correct.
dimos/agents/agent.py Adds a tool_streams input stream and subscribes to it, forwarding update-type messages as HumanMessages into the agent queue — straightforward and looks correct.
dimos/agents/skills/person_follow.py Migrated stop-reason delivery from AgentSpec.add_message to ToolStream; the ToolStream is correctly started just before the follow thread and stopped in _send_stop_reason.

Sequence Diagram

sequenceDiagram
    participant Skill as Skill (PersonFollow / PerceiveLoop)
    participant TS as ToolStream
    participant LCM as pLCMTransport (LCM /tool_streams)
    participant McpSrv as McpServer (subscriber)
    participant SSEQueue as asyncio.Queue (per client)
    participant McpClient as McpClient (SSE thread)
    participant Agent as Agent / McpClient LLM queue

    Skill->>TS: start()
    TS->>LCM: transport.start()

    Skill->>TS: send("update message")
    TS->>LCM: broadcast(None, {type:"update", ...})
    LCM-->>McpSrv: _on_tool_stream_message(msg)
    McpSrv->>SSEQueue: queue.put(msg) [run_coroutine_threadsafe]
    SSEQueue-->>McpClient: SSE line "data: {...}"
    McpClient->>Agent: message_queue.put(HumanMessage)

    Skill->>TS: stop()
    TS->>LCM: broadcast(None, {type:"close", ...})
    LCM-->>McpSrv: _on_tool_stream_message(msg)
    McpSrv->>SSEQueue: queue.put({type:"close"})
    TS->>LCM: transport.stop()
Loading

Last reviewed commit: 4e1e816

Comment on lines +67 to +73
self._transport.broadcast(
{
"stream_id": self.id,
"tool_name": self.tool_name,
"type": "close",
},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong argument count in broadcast() call

pLCMTransport.broadcast has the signature def broadcast(self, _: Out[T] | None, msg: T) -> None: — it requires two positional arguments. In ToolStream.send() this is called correctly with self._transport.broadcast(None, {...}). However, in stop() the close message is passed as the first argument (_), completely omitting the required msg positional argument. This will raise a TypeError: broadcast() missing 1 required positional argument: 'msg' at runtime every time a ToolStream is stopped.

Suggested change
self._transport.broadcast(
{
"stream_id": self.id,
"tool_name": self.tool_name,
"type": "close",
},
)
try:
self._transport.broadcast(
None,
{
"stream_id": self.id,
"tool_name": self.tool_name,
"type": "close",
},
)

Comment on lines 53 to +55
def start(self) -> None:
super().start()
self._tool_stream.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AttributeError on None during start()

self._tool_stream is initialised to None in __init__ and is only assigned a real ToolStream instance inside look_out_for(). Calling self._tool_stream.start() unconditionally in start() will therefore raise AttributeError: 'NoneType' object has no attribute 'start' every time the module is started, before any lookout has been requested.

The ToolStream instance is already started immediately after it is created inside look_out_for(), so this call in start() appears to be leftover/accidental and should be removed entirely.

Suggested change
def start(self) -> None:
super().start()
self._tool_stream.start()
@rpc
def start(self) -> None:
super().start()

Comment on lines +216 to +220
def _on_tool_stream_message(msg: dict[str, Any]) -> None:
if loop is None:
return
for queue in app.state.sse_queues:
asyncio.run_coroutine_threadsafe(queue.put(msg), loop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition on sse_queues list

app.state.sse_queues is a plain list that is mutated from multiple concurrent contexts without any synchronisation:

  • Async contextstreams_sse_endpoint appends a queue, and the event_generator finaliser removes it.
  • Sync thread context_on_tool_stream_message (called from the RxPY subscriber thread) iterates over the list with for queue in app.state.sse_queues.

Python's GIL protects individual atomic operations, but iterating over a list while another coroutine or thread appends/removes items from it can still raise RuntimeError: list changed size during iteration or silently skip/double-process entries.

Consider protecting mutations and the iteration with a threading.Lock, or replacing the list with a set guarded by a lock, to make concurrent access safe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant