From 8168eece900d715b12061f74a3b6cc8ae984e486 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sun, 8 Mar 2026 15:46:17 +0000 Subject: [PATCH] feat(orchestrator): Implement subagent orchestrator - Implemented `scripts/orchestrator.py` to assign pending tasks to active subagents using `Comm` bus. - Added monitoring logic to process task completion signals. - Added tests for `Orchestrator` methods. - Updated task FEATURES-20260305-171348-HVS to `completed`. Co-authored-by: julwrites <18639913+julwrites@users.noreply.github.com> --- ...e-3-subagent-orchestration-orchestrator.md | 2 +- scripts/orchestrator.py | 126 ++++++++++++++++++ tests/test_orchestrator.py | 72 ++++++++++ 3 files changed, 199 insertions(+), 1 deletion(-) create mode 100755 scripts/orchestrator.py create mode 100644 tests/test_orchestrator.py diff --git a/docs/tasks/features/FEATURES-20260305-171348-HVS-implement-phase-3-subagent-orchestration-orchestrator.md b/docs/tasks/features/FEATURES-20260305-171348-HVS-implement-phase-3-subagent-orchestration-orchestrator.md index 3fc99ab..85a1700 100644 --- a/docs/tasks/features/FEATURES-20260305-171348-HVS-implement-phase-3-subagent-orchestration-orchestrator.md +++ b/docs/tasks/features/FEATURES-20260305-171348-HVS-implement-phase-3-subagent-orchestration-orchestrator.md @@ -1,6 +1,6 @@ --- id: FEATURES-20260305-171348-HVS -status: pending +status: completed title: Implement Phase 3 Subagent Orchestration Orchestrator priority: medium created: 2026-03-05 17:13:48 diff --git a/scripts/orchestrator.py b/scripts/orchestrator.py new file mode 100755 index 0000000..ce5c07f --- /dev/null +++ b/scripts/orchestrator.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +import os +import sys +import json +import time +import argparse + +# Setup paths +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +REPO_ROOT = os.getenv("TASKS_REPO_ROOT", os.path.dirname(SCRIPT_DIR)) +sys.path.append(REPO_ROOT) + +from scripts import tasks +from scripts import comm + +class Orchestrator: + def __init__(self, agent_id="orchestrator"): + self.agent_id = agent_id + self.comm_bus = comm.Comm(agent_id=self.agent_id, role="orchestrator") + self.comm_bus.register() + + def get_pending_tasks(self): + """Retrieve tasks with 'pending' status.""" + # Using tasks module directly. list_tasks can return list of dicts. + pending = tasks.list_tasks(status="pending", output_format="json") + if isinstance(pending, str): + try: + pending = json.loads(pending) + except json.JSONDecodeError: + pending = [] + return pending + + def assign_tasks(self): + """Assign pending tasks to available agents.""" + pending_tasks = self.get_pending_tasks() + if not pending_tasks: + return 0 + + agents = self.comm_bus.list_agents() + available_agents = [a for a in agents if a.get("id") != self.agent_id] + if not available_agents: + return 0 + + assigned_count = 0 + for task in pending_tasks: + # Simple round-robin or greedy assignment + agent = available_agents[assigned_count % len(available_agents)] + agent_id = agent.get("id") + + # Send message to agent + msg_content = json.dumps({"task_id": task["id"], "action": "execute"}) + self.comm_bus.send(recipient_id=agent_id, content=msg_content, type="task_assignment", request_receipt=True) + + # Update task status to in_progress + tasks.update_task_status(task["id"], "in_progress", output_format="json") + assigned_count += 1 + + return assigned_count + + def monitor(self): + """Monitor messages for task completions and update statuses.""" + messages = self.comm_bus.read() + processed_count = 0 + for msg in messages: + msg_type = msg.get("type", "") + content = msg.get("content", "") + + if msg_type == "task_completion": + try: + data = json.loads(content) + task_id = data.get("task_id") + if task_id: + tasks.update_task_status(task_id, "completed", output_format="json") + processed_count += 1 + except json.JSONDecodeError: + pass + return processed_count + + def run(self, interval=5): + """Unified loop to repeatedly assign tasks and monitor.""" + print(f"Orchestrator {self.agent_id} starting run loop. Press Ctrl+C to stop.") + try: + while True: + assigned = self.assign_tasks() + if assigned > 0: + print(f"Assigned {assigned} tasks.") + + processed = self.monitor() + if processed > 0: + print(f"Processed {processed} completion messages.") + + time.sleep(interval) + except KeyboardInterrupt: + print("Orchestrator stopped.") + +def main(): + parser = argparse.ArgumentParser(description="Subagent Orchestrator") + subparsers = parser.add_subparsers(dest="command") + + # Assign + subparsers.add_parser("assign", help="Assign pending tasks to available agents once") + + # Monitor + subparsers.add_parser("monitor", help="Check for completion messages once") + + # Run + run_parser = subparsers.add_parser("run", help="Run the orchestrator loop") + run_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds") + + args = parser.parse_args() + + orchestrator = Orchestrator() + + if args.command == "assign": + count = orchestrator.assign_tasks() + print(f"Assigned {count} tasks.") + elif args.command == "monitor": + count = orchestrator.monitor() + print(f"Processed {count} messages.") + elif args.command == "run": + orchestrator.run(interval=args.interval) + else: + parser.print_help() + +if __name__ == "__main__": + main() diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py new file mode 100644 index 0000000..af31e0a --- /dev/null +++ b/tests/test_orchestrator.py @@ -0,0 +1,72 @@ +import unittest +from unittest.mock import patch, MagicMock +import json +import os +import sys + +# Setup paths +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +REPO_ROOT = os.path.dirname(SCRIPT_DIR) +sys.path.append(REPO_ROOT) + +from scripts import orchestrator +from scripts import tasks +from scripts import comm + +class TestOrchestrator(unittest.TestCase): + def setUp(self): + with patch('scripts.comm.Comm.register'): + self.orch = orchestrator.Orchestrator() + + @patch('scripts.tasks.list_tasks') + def test_get_pending_tasks(self, mock_list_tasks): + mock_tasks = [{"id": "TASK-1", "status": "pending"}] + mock_list_tasks.return_value = mock_tasks + + pending = self.orch.get_pending_tasks() + + mock_list_tasks.assert_called_once_with(status="pending", output_format="json") + self.assertEqual(pending, mock_tasks) + + @patch('scripts.tasks.list_tasks') + def test_get_pending_tasks_string_response(self, mock_list_tasks): + mock_tasks = [{"id": "TASK-1", "status": "pending"}] + mock_list_tasks.return_value = json.dumps(mock_tasks) + + pending = self.orch.get_pending_tasks() + + mock_list_tasks.assert_called_once_with(status="pending", output_format="json") + self.assertEqual(pending, mock_tasks) + + @patch.object(orchestrator.Orchestrator, 'get_pending_tasks') + @patch('scripts.comm.Comm.list_agents') + @patch('scripts.comm.Comm.send') + @patch('scripts.tasks.update_task_status') + def test_assign_tasks(self, mock_update, mock_send, mock_list_agents, mock_get_pending): + mock_get_pending.return_value = [{"id": "TASK-1", "status": "pending"}] + mock_list_agents.return_value = [{"id": "agent-1", "role": "worker"}] + + count = self.orch.assign_tasks() + + self.assertEqual(count, 1) + mock_send.assert_called_once() + args, kwargs = mock_send.call_args + self.assertEqual(kwargs['recipient_id'], "agent-1") + self.assertEqual(kwargs['type'], "task_assignment") + mock_update.assert_called_once_with("TASK-1", "in_progress", output_format="json") + + @patch('scripts.comm.Comm.read') + @patch('scripts.tasks.update_task_status') + def test_monitor(self, mock_update, mock_read): + mock_read.return_value = [ + {"type": "task_completion", "content": json.dumps({"task_id": "TASK-1"})}, + {"type": "other", "content": "hello"} + ] + + count = self.orch.monitor() + + self.assertEqual(count, 1) + mock_update.assert_called_once_with("TASK-1", "completed", output_format="json") + +if __name__ == '__main__': + unittest.main()