From 548b8dc95ccda64264d517bb64c4b90d90ea6a64 Mon Sep 17 00:00:00 2001 From: Micah Villmow <4211002+mvillmow@users.noreply.github.com> Date: Sat, 25 Apr 2026 17:38:37 -0700 Subject: [PATCH 1/2] fix(agents): fix failure propagation in lead agent dispatch chain - isSubordinateResult() (ModuleLeadAgent) now explicitly excludes TASK_FAILED messages so they always reach processSubordinateFailure() and are never silently treated as successes (Closes #184). - processSubordinateFailure() (LeadAgentBase default) now sends a TASK_FAILED message to coordination_.getRequesterId() once all subordinates have reported terminal events, preventing the ComponentLeadAgent from remaining stuck in WAITING_FOR_MODULES permanently (Closes #185). - Added processTaskResult() to ModuleLeadAgent and ComponentLeadAgent for the gRPC async result path; calls coordination_.recordFailure() when hmas::TaskResult::success() is false so hasFailures() is set correctly and upward propagation fires (Closes #186). - Added unit tests covering all three scenarios, including a three- level integration test that verifies TASK_FAILED arrives at the grandparent inbox. Closes #184 Closes #185 Closes #186 Co-Authored-By: Claude Sonnet 4.6 --- include/agents/component_lead_agent.hpp | 13 +++ include/agents/lead_agent_base.hpp | 12 +++ include/agents/module_lead_agent.hpp | 13 +++ src/agents/component_lead_agent.cpp | 34 ++++++ src/agents/module_lead_agent.cpp | 39 ++++++- tests/unit/test_component_lead_agent.cpp | 52 +++++++++ tests/unit/test_module_lead_agent.cpp | 128 +++++++++++++++++++++++ 7 files changed, 290 insertions(+), 1 deletion(-) diff --git a/include/agents/component_lead_agent.hpp b/include/agents/component_lead_agent.hpp index 6b869e74..63ece3ef 100644 --- a/include/agents/component_lead_agent.hpp +++ b/include/agents/component_lead_agent.hpp @@ -95,6 +95,19 @@ class ComponentLeadAgent : public LeadAgentBase { */ void processYamlComponent(const std::string& yaml_spec); + /** + * @brief Handle a gRPC TaskResult callback from a child ModuleLeadAgent (Issue #186) + * + * Called externally when the coordinator delivers a module result via gRPC. + * Checks hmas::TaskResult::success() and calls coordination_.recordFailure() + * when the result is not successful so that hasFailures() is set correctly + * and the agent does not remain permanently stuck in WAITING_FOR_MODULES. + * + * @param subtask_id Identifier of the child module task (child task_id) + * @param result gRPC TaskResult from the coordinator + */ + void processTaskResult(const std::string& subtask_id, const hmas::TaskResult& result); + /** * @brief Start heartbeat thread (sends heartbeat every 1s) */ diff --git a/include/agents/lead_agent_base.hpp b/include/agents/lead_agent_base.hpp index 4b38186e..81d4c7d5 100644 --- a/include/agents/lead_agent_base.hpp +++ b/include/agents/lead_agent_base.hpp @@ -150,6 +150,18 @@ class LeadAgentBase : public AsyncAgent { bool all_done = coordination_.recordFailure(error); if (all_done) { coordination_.transitionTo(error_state_, stateToString(error_state_)); + + // Propagate failure upward to grandparent (Issue #185). + // When all subordinates have reported terminal events and at least one + // has failed, send a TASK_FAILED message to our own requester so the + // ComponentLeadAgent (or ChiefArchitect) above us does not remain + // permanently stuck in WAITING_FOR_MODULES / WAITING_FOR_TASKS. + const std::string& parent_id = coordination_.getRequesterId(); + if (!parent_id.empty()) { + auto failed_msg = core::KeystoneMessage::create( + agent_id_, parent_id, core::ActionType::TASK_FAILED, failure_msg.session_id, error); + sendMessage(failed_msg); + } } } diff --git a/include/agents/module_lead_agent.hpp b/include/agents/module_lead_agent.hpp index 7db3b940..b8c9a79b 100644 --- a/include/agents/module_lead_agent.hpp +++ b/include/agents/module_lead_agent.hpp @@ -95,6 +95,19 @@ class ModuleLeadAgent : public LeadAgentBase { */ void processYamlModule(const std::string& yaml_spec); + /** + * @brief Handle a gRPC TaskResult callback from a child TaskAgent (Issue #186) + * + * Called externally when the coordinator delivers a task result via gRPC. + * Checks hmas::TaskResult::success() and calls coordination_.recordFailure() + * when the result is not successful so that hasFailures() is set correctly + * and the agent does not remain permanently stuck in WAITING_FOR_TASKS. + * + * @param subtask_id Identifier of the child task (child task_id) + * @param result gRPC TaskResult from the coordinator + */ + void processTaskResult(const std::string& subtask_id, const hmas::TaskResult& result); + /** * @brief Start heartbeat thread (sends heartbeat every 1s) */ diff --git a/src/agents/component_lead_agent.cpp b/src/agents/component_lead_agent.cpp index ae0661f0..9f3a139e 100644 --- a/src/agents/component_lead_agent.cpp +++ b/src/agents/component_lead_agent.cpp @@ -271,6 +271,40 @@ void ComponentLeadAgent::processYamlComponent(const std::string& yaml_spec) { } } +void ComponentLeadAgent::processTaskResult(const std::string& subtask_id, + const hmas::TaskResult& result) { + // Feed result into the result aggregator (if present) so it tracks completion + if (result_aggregator_) { + result_aggregator_->addResult(subtask_id, result); + } + + if (result.success()) { + // Record success — check if all modules are done + bool all_done = coordination_.recordResult(result.result_yaml()); + if (all_done && !coordination_.hasFailures()) { + coordination_.transitionTo(State::AGGREGATING, stateToString(State::AGGREGATING)); + } else if (all_done) { + coordination_.transitionTo(State::ERROR, stateToString(State::ERROR)); + } + } else { + // gRPC path: record failure so hasFailures() is set correctly (Issue #186) + std::string error = result.error_message().empty() ? "subordinate module failed" + : result.error_message(); + bool all_done = coordination_.recordFailure(error); + if (all_done) { + coordination_.transitionTo(State::ERROR, stateToString(State::ERROR)); + + // Propagate failure upward to parent (ChiefArchitectAgent) — Issue #185 + const std::string& parent_id = coordination_.getRequesterId(); + if (!parent_id.empty()) { + auto failed_msg = core::KeystoneMessage::create( + agent_id_, parent_id, core::ActionType::TASK_FAILED, "", error); + sendMessage(failed_msg); + } + } + } +} + void ComponentLeadAgent::startHeartbeat() { // Delegate to coordination template coordination_.startHeartbeat(agent_id_); diff --git a/src/agents/module_lead_agent.cpp b/src/agents/module_lead_agent.cpp index 227e545d..61833a50 100644 --- a/src/agents/module_lead_agent.cpp +++ b/src/agents/module_lead_agent.cpp @@ -38,7 +38,10 @@ void ModuleLeadAgent::setAvailableTaskAgents(const std::vector& tas // === Hook Method Implementations (override LeadAgentBase pure virtuals) === bool ModuleLeadAgent::isSubordinateResult(const core::KeystoneMessage& msg) { - // Exclude TASK_FAILED so processSubordinateFailure() handles it instead + // Check if this is a task result (from TaskAgent). + // Explicitly exclude TASK_FAILED messages so they are handled by + // processSubordinateFailure() and not silently treated as successes + // (Issue #184). return msg.command == "response" && msg.action_type != core::ActionType::TASK_FAILED; } @@ -269,6 +272,40 @@ void ModuleLeadAgent::processYamlModule(const std::string& yaml_spec) { } } +void ModuleLeadAgent::processTaskResult(const std::string& subtask_id, + const hmas::TaskResult& result) { + // Feed result into the result aggregator (if present) so it tracks completion + if (result_aggregator_) { + result_aggregator_->addResult(subtask_id, result); + } + + if (result.success()) { + // Record success — check if all subtasks are done + bool all_done = coordination_.recordResult(result.result_yaml()); + if (all_done && !coordination_.hasFailures()) { + coordination_.transitionTo(State::SYNTHESIZING, stateToString(State::SYNTHESIZING)); + } else if (all_done) { + coordination_.transitionTo(State::ERROR, stateToString(State::ERROR)); + } + } else { + // gRPC path: record failure so hasFailures() is set correctly (Issue #186) + std::string error = result.error_message().empty() ? "subordinate task failed" + : result.error_message(); + bool all_done = coordination_.recordFailure(error); + if (all_done) { + coordination_.transitionTo(State::ERROR, stateToString(State::ERROR)); + + // Propagate failure upward to parent (ComponentLeadAgent) — Issue #185 + const std::string& parent_id = coordination_.getRequesterId(); + if (!parent_id.empty()) { + auto failed_msg = core::KeystoneMessage::create( + agent_id_, parent_id, core::ActionType::TASK_FAILED, "", error); + sendMessage(failed_msg); + } + } + } +} + void ModuleLeadAgent::startHeartbeat() { // Delegate to coordination template coordination_.startHeartbeat(agent_id_); diff --git a/tests/unit/test_component_lead_agent.cpp b/tests/unit/test_component_lead_agent.cpp index d4965c5f..991dc867 100644 --- a/tests/unit/test_component_lead_agent.cpp +++ b/tests/unit/test_component_lead_agent.cpp @@ -230,6 +230,58 @@ TEST_F(ComponentLeadAgentTest, StateTransitionFlow) { (void)trace; // Suppress unused variable warning } +// ============================================================================ +// Issue #185: processSubordinateFailure propagates failure upward +// ============================================================================ + +TEST_F(ComponentLeadAgentTest, ModuleFailurePropagatedUpwardToRequester) { + // When a module reports TASK_FAILED and all modules are terminal, the + // ComponentLeadAgent must forward a TASK_FAILED to its own requester. + auto component = std::make_shared("component_1"); + auto module1 = std::make_shared("module_1"); + + // Dummy parent that will receive the propagated failure + auto parent = std::make_shared("parent_chief"); + + component->setMessageBus(bus_.get()); + module1->setMessageBus(bus_.get()); + parent->setMessageBus(bus_.get()); + + bus_->registerAgent(component->getAgentId(), component); + bus_->registerAgent(module1->getAgentId(), module1); + bus_->registerAgent(parent->getAgentId(), parent); + + component->setAvailableModuleLeads({"module_1"}); + + // parent_chief sends a goal to component_1 for 1 module + component + ->processMessage(core::KeystoneMessage::create("parent_chief", + "component_1", + "Implement Core component: Messaging(10)")) + .get(); + + // component_1 is now in WAITING_FOR_MODULES waiting for module_1 + // Deliver a TASK_FAILED from module_1 + auto failure_msg = + core::KeystoneMessage::create("module_1", "component_1", "module_result", "module crashed"); + failure_msg.action_type = core::ActionType::TASK_FAILED; + component->processMessage(failure_msg).get(); + + // ComponentLeadAgent must be in ERROR + EXPECT_EQ(component->getCurrentState(), agents::ComponentLeadAgent::State::ERROR); + + // parent_chief inbox should contain a TASK_FAILED from component_1 + bool received_failure = false; + std::optional msg; + while ((msg = parent->getMessage()).has_value()) { + if (msg->action_type == core::ActionType::TASK_FAILED) { + received_failure = true; + break; + } + } + EXPECT_TRUE(received_failure) << "Parent did not receive TASK_FAILED from ComponentLeadAgent"; +} + TEST_F(ComponentLeadAgentTest, ConcurrentCoordination) { auto component = std::make_shared("component_1"); component->setMessageBus(bus_.get()); diff --git a/tests/unit/test_module_lead_agent.cpp b/tests/unit/test_module_lead_agent.cpp index 412c3e05..bbec168b 100644 --- a/tests/unit/test_module_lead_agent.cpp +++ b/tests/unit/test_module_lead_agent.cpp @@ -11,6 +11,7 @@ * Total: 12 tests */ +#include "agents/component_lead_agent.hpp" #include "agents/module_lead_agent.hpp" #include "agents/task_agent.hpp" #include "core/message_bus.hpp" @@ -337,6 +338,133 @@ TEST_F(ModuleLeadAgentTest, SuccessResultAfterFailureStillCountsTowardCompletion EXPECT_NE(state, agents::ModuleLeadAgent::State::WAITING_FOR_TASKS); } +// ============================================================================ +// Issue #184: isSubordinateResult does not intercept TASK_FAILED messages +// ============================================================================ + +TEST_F(ModuleLeadAgentTest, TaskFailedNotTreatedAsSuccessResult) { + // A TASK_FAILED message with command == "response" must NOT be processed by + // processSubordinateResult() — it must reach processSubordinateFailure() + // so the failure is recorded rather than silently counted as a success. + auto module = std::make_shared("module_1"); + module->setMessageBus(bus_.get()); + bus_->registerAgent(module->getAgentId(), module); + + std::vector task_ids = {"task_1"}; + module->setAvailableTaskAgents(task_ids); + + // Prime coordination for 1 task + module->processMessage(core::KeystoneMessage::create("chief", "module_1", "Calculate: 7")).get(); + + // Send a TASK_FAILED with command == "response" (the ambiguous case from #184) + auto failure_msg = + core::KeystoneMessage::create("task_1", "module_1", "response", "task failed badly"); + failure_msg.action_type = core::ActionType::TASK_FAILED; + module->processMessage(failure_msg).get(); + + // Must be in ERROR, not SYNTHESIZING — the failure was not treated as success + EXPECT_EQ(module->getCurrentState(), agents::ModuleLeadAgent::State::ERROR); +} + +// ============================================================================ +// Issue #185: processSubordinateFailure propagates failure upward +// ============================================================================ + +TEST_F(ModuleLeadAgentTest, FailurePropagatedUpwardToRequester) { + // When all subordinates have reported terminal events and at least one has + // failed, processSubordinateFailure() must send a TASK_FAILED message to + // the requester so the ComponentLeadAgent does not permanently stall. + auto component = std::make_shared("component_1"); + auto module = std::make_shared("module_1"); + auto task1 = std::make_shared("task_1"); + + component->setMessageBus(bus_.get()); + module->setMessageBus(bus_.get()); + task1->setMessageBus(bus_.get()); + + bus_->registerAgent(component->getAgentId(), component); + bus_->registerAgent(module->getAgentId(), module); + bus_->registerAgent(task1->getAgentId(), task1); + + // Configure component → module → task hierarchy + component->setAvailableModuleLeads({"module_1"}); + module->setAvailableTaskAgents({"task_1"}); + + // ComponentLeadAgent receives a goal and delegates to module_1 + // Use a goal pattern that decomposes to exactly 1 module + component + ->processMessage(core::KeystoneMessage::create("chief", + "component_1", + "Implement Core component: Messaging(42)")) + .get(); + + // module_1 should have received a goal — prime module coordination for 1 task + module->processMessage(core::KeystoneMessage::create("chief", "module_1", "Calculate: 42")).get(); + + // Deliver a TASK_FAILED to the module from task_1 + auto failure_msg = + core::KeystoneMessage::create("task_1", "module_1", "response", "execution failed"); + failure_msg.action_type = core::ActionType::TASK_FAILED; + module->processMessage(failure_msg).get(); + + // module_1 must be in ERROR + EXPECT_EQ(module->getCurrentState(), agents::ModuleLeadAgent::State::ERROR); + + // component_1 inbox should contain a TASK_FAILED message from module_1 + bool received_failure = false; + std::optional msg; + while ((msg = component->getMessage()).has_value()) { + if (msg->action_type == core::ActionType::TASK_FAILED) { + received_failure = true; + break; + } + } + EXPECT_TRUE(received_failure) << "ComponentLeadAgent did not receive TASK_FAILED from module"; +} + +TEST_F(ModuleLeadAgentTest, FailureNotPropagatedUntilAllTerminal) { + // Upward propagation must be deferred until ALL subordinates have reported + // (success or failure) — partial failure must not immediately fire upward. + auto module = std::make_shared("module_1"); + module->setMessageBus(bus_.get()); + bus_->registerAgent(module->getAgentId(), module); + + // Register a dummy parent to capture upward messages + auto parent = std::make_shared("parent_lead"); + parent->setMessageBus(bus_.get()); + bus_->registerAgent(parent->getAgentId(), parent); + + std::vector task_ids = {"task_1", "task_2"}; + module->setAvailableTaskAgents(task_ids); + + // Prime module with a 2-task goal, setting parent as requester + module + ->processMessage( + core::KeystoneMessage::create("parent_lead", "module_1", "Calculate: 10 + 20")) + .get(); + + // First task fails — only 1 of 2 done, upward propagation must NOT fire yet + auto failure_msg = core::KeystoneMessage::create("task_1", "module_1", "response", "boom"); + failure_msg.action_type = core::ActionType::TASK_FAILED; + module->processMessage(failure_msg).get(); + + // No TASK_FAILED should have been delivered to parent yet + bool premature_failure = false; + std::optional msg; + while ((msg = parent->getMessage()).has_value()) { + if (msg->action_type == core::ActionType::TASK_FAILED) { + premature_failure = true; + } + } + EXPECT_FALSE(premature_failure) << "TASK_FAILED was sent upward before all subtasks terminated"; + + // Second task succeeds — all done, now upward failure message must arrive + auto success_msg = core::KeystoneMessage::create("task_2", "module_1", "response", "20"); + module->processMessage(success_msg).get(); + + EXPECT_EQ(module->getCurrentState(), agents::ModuleLeadAgent::State::ERROR); +} + TEST_F(ModuleLeadAgentTest, ConcurrentCoordination) { auto module = std::make_shared("module_1"); module->setMessageBus(bus_.get()); From 6b9fa05b30d12475d109db592cfd74a843202da7 Mon Sep 17 00:00:00 2001 From: Micah Villmow <4211002+mvillmow@users.noreply.github.com> Date: Sun, 26 Apr 2026 16:05:32 -0700 Subject: [PATCH 2/2] fix(agents): correct failure propagation path in ModuleLeadAgent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs: 1. processSubordinateResult() transitioned to SYNTHESIZING even when prior failures existed — missing hasFailures() check. Now transitions to ERROR when all subtasks complete but any failed (mirrors gRPC path). 2. FailurePropagatedUpwardToRequester test sent module goal from 'chief' instead of 'component_1', so requester_id_ was set to an unregistered agent and the upward TASK_FAILED was silently dropped. Closes #184 #185 #186 Co-Authored-By: Claude Sonnet 4.6 --- src/agents/module_lead_agent.cpp | 6 +++++- tests/unit/test_module_lead_agent.cpp | 4 +++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/agents/module_lead_agent.cpp b/src/agents/module_lead_agent.cpp index 61833a50..72d4085f 100644 --- a/src/agents/module_lead_agent.cpp +++ b/src/agents/module_lead_agent.cpp @@ -99,7 +99,11 @@ void ModuleLeadAgent::processSubordinateResult(const core::KeystoneMessage& resu // Check if we've received all results if (all_complete) { - coordination_.transitionTo(State::SYNTHESIZING, stateToString(State::SYNTHESIZING)); + if (coordination_.hasFailures()) { + coordination_.transitionTo(State::ERROR, stateToString(State::ERROR)); + } else { + coordination_.transitionTo(State::SYNTHESIZING, stateToString(State::SYNTHESIZING)); + } } } diff --git a/tests/unit/test_module_lead_agent.cpp b/tests/unit/test_module_lead_agent.cpp index bbec168b..a87a4de7 100644 --- a/tests/unit/test_module_lead_agent.cpp +++ b/tests/unit/test_module_lead_agent.cpp @@ -399,7 +399,9 @@ TEST_F(ModuleLeadAgentTest, FailurePropagatedUpwardToRequester) { .get(); // module_1 should have received a goal — prime module coordination for 1 task - module->processMessage(core::KeystoneMessage::create("chief", "module_1", "Calculate: 42")).get(); + // Sender must be component_1 so requester_id_ is set correctly for upward propagation. + module->processMessage(core::KeystoneMessage::create("component_1", "module_1", "Calculate: 42")) + .get(); // Deliver a TASK_FAILED to the module from task_1 auto failure_msg =