Skip to content

Commit 620bbcf

Browse files
authored
Merge pull request #48 from TogetherCrew/fix/47-unlimited-workflow-call
feat: enhance Hivemind agent execution with max iterations and improve error handling!
2 parents 6bff1c6 + 1ba0ddd commit 620bbcf

2 files changed

Lines changed: 31 additions & 8 deletions

File tree

tasks/hivemind/agent.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,11 @@ def do_rag_query(self) -> str:
176176

177177
# Run the agent
178178
agent_executor = AgentExecutor(
179-
agent=agent, tools=tools, verbose=True, return_intermediate_steps=False
179+
agent=agent,
180+
tools=tools,
181+
verbose=True,
182+
return_intermediate_steps=False,
183+
max_iterations=3,
180184
)
181185

182186
result = agent_executor.invoke({"input": self.state.user_query})

tasks/hivemind/query_data_sources.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import os
3+
import logging
34

45
import nest_asyncio
56
from dotenv import load_dotenv
@@ -8,6 +9,7 @@
89
from tc_temporal_backend.client import TemporalClient
910
from tc_temporal_backend.schema.hivemind import HivemindQueryPayload
1011
from temporalio.common import RetryPolicy
12+
from temporalio.client import WorkflowFailureError
1113

1214
nest_asyncio.apply()
1315

@@ -42,13 +44,30 @@ async def query(self, query: str) -> str | None:
4244
payload.workflow_id = self.workflow_id
4345

4446
hivemind_queue = self.load_hivemind_queue()
45-
result = await client.execute_workflow(
46-
"HivemindWorkflow",
47-
payload,
48-
id=f"hivemind-query-{self.community_id}-{self.workflow_id}",
49-
task_queue=hivemind_queue,
50-
retry_policy=RetryPolicy(maximum_attempts=3),
51-
)
47+
try:
48+
result = await client.execute_workflow(
49+
"HivemindWorkflow",
50+
payload,
51+
id=f"hivemind-query-{self.community_id}-{self.workflow_id}",
52+
task_queue=hivemind_queue,
53+
retry_policy=RetryPolicy(maximum_attempts=3),
54+
)
55+
except WorkflowFailureError as e:
56+
logging.error(f"WorkflowFailureError: {e} for workflow {self.workflow_id}", exc_info=True)
57+
return None
58+
except Exception as e:
59+
logging.error(f"Exception: {e} for workflow {self.workflow_id}", exc_info=True)
60+
return None
61+
62+
# Normalize Temporal failure-shaped responses that may be returned as data
63+
if isinstance(result, dict) and (
64+
"workflowExecutionFailedEventAttributes" in result or "failure" in result
65+
):
66+
logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True)
67+
return None
68+
if isinstance(result, str) and "workflowExecutionFailedEventAttributes" in result:
69+
logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True)
70+
return None
5271

5372
return result
5473

0 commit comments

Comments
 (0)