Skip to content

Commit 66e0453

Browse files
committed
chore(runner): refactored agent runner, preparing for parallel delegate tasks
1 parent 48c3c33 commit 66e0453

10 files changed

Lines changed: 158 additions & 137 deletions

File tree

AgentCrew/app.py

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,8 @@ def run_job(
303303
memory_llm: Optional[str] = None,
304304
output_schema: Optional[str] = None,
305305
) -> str:
306-
from AgentCrew.modules.chat import MessageHandler
306+
from AgentCrew.modules.agents.agent_runner import run_agent_loop
307+
from AgentCrew.modules.agents.base import MessageType
307308
from AgentCrew.modules.mcpclient import MCPSessionManager
308309
from AgentCrew.modules.llm.model_registry import ModelRegistry
309310

@@ -361,37 +362,63 @@ def run_job(
361362

362363
self.agent_manager.select_agent(current_agent.name)
363364

364-
message_handler = MessageHandler(
365-
services["memory"], services["context_persistent"]
366-
)
367-
message_handler.is_non_interactive = True
368-
message_handler.agent = current_agent
365+
history: List[Dict[str, Any]] = []
369366

370367
if files:
368+
from AgentCrew.modules.utils.file_handler import FileHandler
369+
370+
file_handler = FileHandler()
371+
all_file_contents: List[Dict[str, Any]] = []
371372
for file_path in files:
372-
asyncio.run(
373-
message_handler.process_user_input(f"/file {file_path}")
373+
file_path = os.path.expanduser(file_path.strip())
374+
file_content = file_handler.process_file(file_path)
375+
if not file_content:
376+
file_content = current_agent.format_message(
377+
MessageType.FileContent, {"file_uri": file_path}
378+
)
379+
if file_content:
380+
all_file_contents.append(file_content)
381+
if all_file_contents:
382+
history.append(
383+
{
384+
"role": "user",
385+
"content": all_file_contents,
386+
}
374387
)
375388

389+
history.append(
390+
{
391+
"role": "user",
392+
"content": [{"type": "text", "text": task}],
393+
}
394+
)
395+
376396
max_attempts = 4
377397
attempt = 0
378398
response = None
379399

380-
asyncio.run(message_handler.process_user_input(task))
381-
382400
while attempt < max_attempts:
383401
attempt += 1
384-
response, _, _ = asyncio.run(
385-
message_handler.get_assistant_response()
402+
response = asyncio.run(
403+
run_agent_loop(
404+
agent=current_agent,
405+
history=history,
406+
)
386407
)
387408
if not output_schema or not schema_dict:
388409
break
389410

390411
if response is None:
391-
asyncio.run(
392-
message_handler.process_user_input(
393-
"No response was generated. Please try again."
394-
)
412+
history.append(
413+
{
414+
"role": "user",
415+
"content": [
416+
{
417+
"type": "text",
418+
"text": "No response was generated. Please try again.",
419+
}
420+
],
421+
}
395422
)
396423
continue
397424

@@ -402,12 +429,17 @@ def run_job(
402429
break
403430
else:
404431
if retry_message:
405-
asyncio.run(
406-
message_handler.process_user_input(retry_message)
432+
history.append(
433+
{
434+
"role": "user",
435+
"content": [
436+
{"type": "text", "text": retry_message}
437+
],
438+
}
407439
)
408440

409441
MCPSessionManager.get_instance().cleanup()
410-
return response.strip() if response else ""
442+
return self._clean_json_response(response).strip() if response else ""
411443
else:
412444
raise ValueError(f"Agent '{agent}' not found")
413445

AgentCrew/modules/a2a/task_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ async def on_send_message(
156156
(m for m in message.get("content", []) if m.get("type", "text") == "file"),
157157
None,
158158
):
159-
from AgentCrew.modules.chat.file_handler import FileHandler
159+
from AgentCrew.modules.utils.file_handler import FileHandler
160160

161161
new_parts = []
162162
if self.file_handler is None:
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
from typing import Any, Callable, Dict, List, Optional
2+
3+
from AgentCrew.modules.agents.local_agent import LocalAgent
4+
from AgentCrew.modules.agents.base import MessageType
5+
6+
7+
async def run_agent_loop(
8+
agent: LocalAgent,
9+
history: List[Dict[str, Any]],
10+
*,
11+
tool_filter: Optional[Callable[[Dict[str, Any]], bool]] = None,
12+
) -> str:
13+
current_response = ""
14+
thinking_content = ""
15+
thinking_signature = ""
16+
tool_uses: List[Dict[str, Any]] = []
17+
input_tokens = 0
18+
output_tokens = 0
19+
20+
def process_result(_tool_uses, _input_tokens, _output_tokens):
21+
nonlocal tool_uses, input_tokens, output_tokens
22+
tool_uses = _tool_uses
23+
input_tokens += _input_tokens
24+
output_tokens += _output_tokens
25+
26+
async for (
27+
response_message,
28+
chunk_text,
29+
thinking_chunk,
30+
) in agent.process_messages(history, callback=process_result):
31+
if response_message:
32+
current_response = response_message
33+
if thinking_chunk:
34+
think_text_chunk, signature = thinking_chunk
35+
if think_text_chunk:
36+
thinking_content += think_text_chunk
37+
if signature:
38+
thinking_signature += signature
39+
40+
if not tool_uses:
41+
return current_response
42+
43+
if tool_filter:
44+
filtered = [t for t in tool_uses if tool_filter(t)]
45+
else:
46+
filtered = tool_uses
47+
48+
if not filtered:
49+
return current_response
50+
51+
thinking_data = (thinking_content, thinking_signature) if thinking_content else None
52+
thinking_message = agent.format_message(
53+
MessageType.Thinking, {"thinking": thinking_data}
54+
)
55+
if thinking_message:
56+
history.append(thinking_message)
57+
58+
assistant_message = agent.format_message(
59+
MessageType.Assistant,
60+
{"message": current_response, "tool_uses": filtered},
61+
)
62+
if assistant_message:
63+
history.append(assistant_message)
64+
65+
for tool_use in filtered:
66+
try:
67+
tool_result = await agent.execute_tool_call(
68+
tool_use["name"], tool_use["input"]
69+
)
70+
except Exception as e:
71+
tool_result = str(e)
72+
error_msg = agent.format_message(
73+
MessageType.ToolResult,
74+
{"tool_use": tool_use, "tool_result": tool_result, "is_error": True},
75+
)
76+
if error_msg:
77+
history.append(error_msg)
78+
continue
79+
80+
result_msg = agent.format_message(
81+
MessageType.ToolResult,
82+
{"tool_use": tool_use, "tool_result": tool_result},
83+
)
84+
if result_msg:
85+
history.append(result_msg)
86+
87+
return await run_agent_loop(agent, history, tool_filter=tool_filter)

AgentCrew/modules/agents/tools/delegate.py

Lines changed: 11 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22
import asyncio
33

44
from AgentCrew.modules.agents import AgentManager
5-
6-
# from AgentCrew.modules.llm.message import MessageTransformer
7-
from AgentCrew.modules.agents.base import MessageType
5+
from AgentCrew.modules.agents.agent_runner import run_agent_loop
86

97

108
def get_delegate_tool_definition(provider="claude") -> Dict[str, Any]:
@@ -121,10 +119,10 @@ def handler(**params) -> str:
121119

122120
try:
123121
# Get the target agent
124-
target_agent = agent_manager.get_agent(target_agent_name)
122+
target_agent = agent_manager.get_local_agent(target_agent_name)
125123
if not target_agent:
126124
raise ValueError(
127-
f"Error: Could not retrieve agent '{target_agent_name}'"
125+
f"Error: Could not retrieve local agent '{target_agent_name}'"
128126
)
129127

130128
# Prepare context from current conversation
@@ -177,106 +175,14 @@ def handler(**params) -> str:
177175

178176
try:
179177

180-
async def _process_delegation():
181-
"""Process delegation similar to _process_agent_task - runs until no tool uses"""
182-
183-
current_response = ""
184-
tool_uses = []
185-
input_tokens = 0
186-
output_tokens = 0
187-
thinking_content = ""
188-
189-
def process_result(_tool_uses, _input_tokens, _output_tokens):
190-
nonlocal tool_uses, input_tokens, output_tokens
191-
tool_uses = _tool_uses
192-
input_tokens += _input_tokens
193-
output_tokens += _output_tokens
194-
195-
if not target_agent:
196-
return current_response
197-
async for (
198-
response_message,
199-
chunk_text,
200-
thinking_chunk,
201-
) in target_agent.process_messages(
202-
delegate_history, callback=process_result
203-
):
204-
if response_message:
205-
current_response = response_message
206-
207-
if thinking_chunk:
208-
think_text_chunk, _ = thinking_chunk
209-
if think_text_chunk:
210-
thinking_content += think_text_chunk
211-
212-
if tool_uses and len(tool_uses) > 0:
213-
filtered_tool_uses = [
214-
t
215-
for t in tool_uses
216-
if t["name"] not in ["transfer", "delegate"]
217-
]
218-
219-
if not filtered_tool_uses:
220-
return current_response
221-
222-
# Add thinking content as a separate message if available
223-
thinking_data = (
224-
(thinking_content, None) if thinking_content else None
225-
)
226-
if thinking_data:
227-
thinking_message = target_agent.format_message(
228-
MessageType.Thinking, {"thinking": thinking_data}
229-
)
230-
if thinking_message:
231-
delegate_history.append(thinking_message)
232-
233-
# Format assistant message with the response and tool uses
234-
assistant_message = target_agent.format_message(
235-
MessageType.Assistant,
236-
{
237-
"message": current_response,
238-
"tool_uses": filtered_tool_uses,
239-
},
240-
)
241-
if assistant_message:
242-
delegate_history.append(assistant_message)
243-
244-
# Process each tool use
245-
for tool_use in filtered_tool_uses:
246-
try:
247-
tool_result = await target_agent.execute_tool_call(
248-
tool_use["name"],
249-
tool_use["input"],
250-
)
251-
252-
tool_result_message = target_agent.format_message(
253-
MessageType.ToolResult,
254-
{
255-
"tool_use": tool_use,
256-
"tool_result": tool_result,
257-
},
258-
)
259-
if tool_result_message:
260-
delegate_history.append(tool_result_message)
261-
262-
except Exception as e:
263-
error_message = target_agent.format_message(
264-
MessageType.ToolResult,
265-
{
266-
"tool_use": tool_use,
267-
"tool_result": str(e),
268-
"is_error": True,
269-
},
270-
)
271-
if error_message:
272-
delegate_history.append(error_message)
273-
274-
# Continue the loop to process more messages
275-
return await _process_delegation()
276-
277-
return current_response
278-
279-
response = asyncio.run(_process_delegation())
178+
async def _do_delegation():
179+
return await run_agent_loop(
180+
agent=target_agent,
181+
history=delegate_history,
182+
tool_filter=lambda t: t["name"] not in ["transfer", "delegate"],
183+
)
184+
185+
response = asyncio.run(_do_delegation())
280186

281187
except Exception as e:
282188
raise ValueError(

AgentCrew/modules/chat/message/command_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import Optional, Tuple, List, Dict, Any
44
import os
55

6-
from AgentCrew.modules.chat.file_handler import FileHandler
6+
from AgentCrew.modules.utils.file_handler import FileHandler
77
from AgentCrew.modules.llm.model_registry import ModelRegistry
88
from AgentCrew.modules.llm.service_manager import ServiceManager
99
from AgentCrew.modules.chat.consolidation import ConversationConsolidator

AgentCrew/modules/chat/message/handler.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from AgentCrew.modules.agents.base import MessageType
1010
from AgentCrew.modules.chat.history import ChatHistoryManager
1111
from AgentCrew.modules.agents import AgentManager
12-
from AgentCrew.modules.chat.file_handler import FileHandler
12+
from AgentCrew.modules.utils.file_handler import FileHandler
1313

1414
from AgentCrew.modules.memory import (
1515
BaseMemoryService,
@@ -74,7 +74,6 @@ def __init__(
7474
self._queued_attached_files = []
7575
self.stop_streaming = False
7676
self.streamline_messages = []
77-
self.is_non_interactive = False
7877
self.current_conversation_id: Optional[str] = None # ID for persistence
7978

8079
# Initialize components
@@ -419,11 +418,6 @@ def process_result(_tool_uses, _input_tokens, _output_tokens):
419418
# Final assistant message
420419
self._notify("response_completed", assistant_response)
421420

422-
if self.is_non_interactive:
423-
# No need to persist or store conversation turn since is single turn task
424-
return assistant_response, input_tokens, output_tokens
425-
426-
# --- Start of Persistence Logic ---
427421
if self.current_conversation_id and self.last_assisstant_response_idx >= 0:
428422
try:
429423
messages_for_this_turn = self.get_recent_agent_responses()

AgentCrew/modules/chat/message/tool_manager.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,7 @@ async def execute_tool(self, tool_use: Dict[str, Any]):
125125
return
126126

127127
if (
128-
not self.message_handler.is_non_interactive
129-
and not self.get_effective_yolo_mode()
128+
not self.get_effective_yolo_mode()
130129
and tool_name not in self._auto_approved_tools
131130
):
132131
# Request confirmation from the user

0 commit comments

Comments
 (0)