Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/agents/component_lead_agent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ class ComponentLeadAgent : public LeadAgentBase<ComponentLeadState> {
*/
void processYamlComponent(const std::string& yaml_spec);

/**
* @brief Handle a gRPC TaskResult callback from a child ModuleLeadAgent (Issue #186)
*
* Called externally when the coordinator delivers a module result via gRPC.
* Checks hmas::TaskResult::success() and calls coordination_.recordFailure()
* when the result is not successful so that hasFailures() is set correctly
* and the agent does not remain permanently stuck in WAITING_FOR_MODULES.
*
* @param subtask_id Identifier of the child module task (child task_id)
* @param result gRPC TaskResult from the coordinator
*/
void processTaskResult(const std::string& subtask_id, const hmas::TaskResult& result);

/**
* @brief Start heartbeat thread (sends heartbeat every 1s)
*/
Expand Down
12 changes: 12 additions & 0 deletions include/agents/lead_agent_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,18 @@ class LeadAgentBase : public AsyncAgent {
bool all_done = coordination_.recordFailure(error);
if (all_done) {
coordination_.transitionTo(error_state_, stateToString(error_state_));

// Propagate failure upward to grandparent (Issue #185).
// When all subordinates have reported terminal events and at least one
// has failed, send a TASK_FAILED message to our own requester so the
// ComponentLeadAgent (or ChiefArchitect) above us does not remain
// permanently stuck in WAITING_FOR_MODULES / WAITING_FOR_TASKS.
const std::string& parent_id = coordination_.getRequesterId();
if (!parent_id.empty()) {
auto failed_msg = core::KeystoneMessage::create(
agent_id_, parent_id, core::ActionType::TASK_FAILED, failure_msg.session_id, error);
sendMessage(failed_msg);
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions include/agents/module_lead_agent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ class ModuleLeadAgent : public LeadAgentBase<ModuleLeadState> {
*/
void processYamlModule(const std::string& yaml_spec);

/**
* @brief Handle a gRPC TaskResult callback from a child TaskAgent (Issue #186)
*
* Called externally when the coordinator delivers a task result via gRPC.
* Checks hmas::TaskResult::success() and calls coordination_.recordFailure()
* when the result is not successful so that hasFailures() is set correctly
* and the agent does not remain permanently stuck in WAITING_FOR_TASKS.
*
* @param subtask_id Identifier of the child task (child task_id)
* @param result gRPC TaskResult from the coordinator
*/
void processTaskResult(const std::string& subtask_id, const hmas::TaskResult& result);

/**
* @brief Start heartbeat thread (sends heartbeat every 1s)
*/
Expand Down
34 changes: 34 additions & 0 deletions src/agents/component_lead_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,40 @@ void ComponentLeadAgent::processYamlComponent(const std::string& yaml_spec) {
}
}

void ComponentLeadAgent::processTaskResult(const std::string& subtask_id,
const hmas::TaskResult& result) {
// Feed result into the result aggregator (if present) so it tracks completion
if (result_aggregator_) {
result_aggregator_->addResult(subtask_id, result);
}

if (result.success()) {
// Record success — check if all modules are done
bool all_done = coordination_.recordResult(result.result_yaml());
if (all_done && !coordination_.hasFailures()) {
coordination_.transitionTo(State::AGGREGATING, stateToString(State::AGGREGATING));
} else if (all_done) {
coordination_.transitionTo(State::ERROR, stateToString(State::ERROR));
}
} else {
// gRPC path: record failure so hasFailures() is set correctly (Issue #186)
std::string error = result.error_message().empty() ? "subordinate module failed"
: result.error_message();
bool all_done = coordination_.recordFailure(error);
if (all_done) {
coordination_.transitionTo(State::ERROR, stateToString(State::ERROR));

// Propagate failure upward to parent (ChiefArchitectAgent) — Issue #185
const std::string& parent_id = coordination_.getRequesterId();
if (!parent_id.empty()) {
auto failed_msg = core::KeystoneMessage::create(
agent_id_, parent_id, core::ActionType::TASK_FAILED, "", error);
sendMessage(failed_msg);
}
}
}
}

void ComponentLeadAgent::startHeartbeat() {
// Delegate to coordination template
coordination_.startHeartbeat(agent_id_);
Expand Down
45 changes: 43 additions & 2 deletions src/agents/module_lead_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ void ModuleLeadAgent::setAvailableTaskAgents(const std::vector<std::string>& tas
// === Hook Method Implementations (override LeadAgentBase pure virtuals) ===

bool ModuleLeadAgent::isSubordinateResult(const core::KeystoneMessage& msg) {
// Exclude TASK_FAILED so processSubordinateFailure() handles it instead
// Check if this is a task result (from TaskAgent).
// Explicitly exclude TASK_FAILED messages so they are handled by
// processSubordinateFailure() and not silently treated as successes
// (Issue #184).
return msg.command == "response" && msg.action_type != core::ActionType::TASK_FAILED;
}

Expand Down Expand Up @@ -96,7 +99,11 @@ void ModuleLeadAgent::processSubordinateResult(const core::KeystoneMessage& resu

// Check if we've received all results
if (all_complete) {
coordination_.transitionTo(State::SYNTHESIZING, stateToString(State::SYNTHESIZING));
if (coordination_.hasFailures()) {
coordination_.transitionTo(State::ERROR, stateToString(State::ERROR));
} else {
coordination_.transitionTo(State::SYNTHESIZING, stateToString(State::SYNTHESIZING));
}
}
}

Expand Down Expand Up @@ -269,6 +276,40 @@ void ModuleLeadAgent::processYamlModule(const std::string& yaml_spec) {
}
}

void ModuleLeadAgent::processTaskResult(const std::string& subtask_id,
const hmas::TaskResult& result) {
// Feed result into the result aggregator (if present) so it tracks completion
if (result_aggregator_) {
result_aggregator_->addResult(subtask_id, result);
}

if (result.success()) {
// Record success — check if all subtasks are done
bool all_done = coordination_.recordResult(result.result_yaml());
if (all_done && !coordination_.hasFailures()) {
coordination_.transitionTo(State::SYNTHESIZING, stateToString(State::SYNTHESIZING));
} else if (all_done) {
coordination_.transitionTo(State::ERROR, stateToString(State::ERROR));
}
} else {
// gRPC path: record failure so hasFailures() is set correctly (Issue #186)
std::string error = result.error_message().empty() ? "subordinate task failed"
: result.error_message();
bool all_done = coordination_.recordFailure(error);
if (all_done) {
coordination_.transitionTo(State::ERROR, stateToString(State::ERROR));

// Propagate failure upward to parent (ComponentLeadAgent) — Issue #185
const std::string& parent_id = coordination_.getRequesterId();
if (!parent_id.empty()) {
auto failed_msg = core::KeystoneMessage::create(
agent_id_, parent_id, core::ActionType::TASK_FAILED, "", error);
sendMessage(failed_msg);
}
}
}
}

void ModuleLeadAgent::startHeartbeat() {
// Delegate to coordination template
coordination_.startHeartbeat(agent_id_);
Expand Down
52 changes: 52 additions & 0 deletions tests/unit/test_component_lead_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,58 @@ TEST_F(ComponentLeadAgentTest, StateTransitionFlow) {
(void)trace; // Suppress unused variable warning
}

// ============================================================================
// Issue #185: processSubordinateFailure propagates failure upward
// ============================================================================

TEST_F(ComponentLeadAgentTest, ModuleFailurePropagatedUpwardToRequester) {
// When a module reports TASK_FAILED and all modules are terminal, the
// ComponentLeadAgent must forward a TASK_FAILED to its own requester.
auto component = std::make_shared<agents::ComponentLeadAgent>("component_1");
auto module1 = std::make_shared<agents::ModuleLeadAgent>("module_1");

// Dummy parent that will receive the propagated failure
auto parent = std::make_shared<agents::ModuleLeadAgent>("parent_chief");

component->setMessageBus(bus_.get());
module1->setMessageBus(bus_.get());
parent->setMessageBus(bus_.get());

bus_->registerAgent(component->getAgentId(), component);
bus_->registerAgent(module1->getAgentId(), module1);
bus_->registerAgent(parent->getAgentId(), parent);

component->setAvailableModuleLeads({"module_1"});

// parent_chief sends a goal to component_1 for 1 module
component
->processMessage(core::KeystoneMessage::create("parent_chief",
"component_1",
"Implement Core component: Messaging(10)"))
.get();

// component_1 is now in WAITING_FOR_MODULES waiting for module_1
// Deliver a TASK_FAILED from module_1
auto failure_msg =
core::KeystoneMessage::create("module_1", "component_1", "module_result", "module crashed");
failure_msg.action_type = core::ActionType::TASK_FAILED;
component->processMessage(failure_msg).get();

// ComponentLeadAgent must be in ERROR
EXPECT_EQ(component->getCurrentState(), agents::ComponentLeadAgent::State::ERROR);

// parent_chief inbox should contain a TASK_FAILED from component_1
bool received_failure = false;
std::optional<core::KeystoneMessage> msg;
while ((msg = parent->getMessage()).has_value()) {
if (msg->action_type == core::ActionType::TASK_FAILED) {
received_failure = true;
break;
}
}
EXPECT_TRUE(received_failure) << "Parent did not receive TASK_FAILED from ComponentLeadAgent";
}

TEST_F(ComponentLeadAgentTest, ConcurrentCoordination) {
auto component = std::make_shared<agents::ComponentLeadAgent>("component_1");
component->setMessageBus(bus_.get());
Expand Down
130 changes: 130 additions & 0 deletions tests/unit/test_module_lead_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Total: 12 tests
*/

#include "agents/component_lead_agent.hpp"
#include "agents/module_lead_agent.hpp"
#include "agents/task_agent.hpp"
#include "core/message_bus.hpp"
Expand Down Expand Up @@ -337,6 +338,135 @@ TEST_F(ModuleLeadAgentTest, SuccessResultAfterFailureStillCountsTowardCompletion
EXPECT_NE(state, agents::ModuleLeadAgent::State::WAITING_FOR_TASKS);
}

// ============================================================================
// Issue #184: isSubordinateResult does not intercept TASK_FAILED messages
// ============================================================================

TEST_F(ModuleLeadAgentTest, TaskFailedNotTreatedAsSuccessResult) {
// A TASK_FAILED message with command == "response" must NOT be processed by
// processSubordinateResult() — it must reach processSubordinateFailure()
// so the failure is recorded rather than silently counted as a success.
auto module = std::make_shared<agents::ModuleLeadAgent>("module_1");
module->setMessageBus(bus_.get());
bus_->registerAgent(module->getAgentId(), module);

std::vector<std::string> task_ids = {"task_1"};
module->setAvailableTaskAgents(task_ids);

// Prime coordination for 1 task
module->processMessage(core::KeystoneMessage::create("chief", "module_1", "Calculate: 7")).get();

// Send a TASK_FAILED with command == "response" (the ambiguous case from #184)
auto failure_msg =
core::KeystoneMessage::create("task_1", "module_1", "response", "task failed badly");
failure_msg.action_type = core::ActionType::TASK_FAILED;
module->processMessage(failure_msg).get();

// Must be in ERROR, not SYNTHESIZING — the failure was not treated as success
EXPECT_EQ(module->getCurrentState(), agents::ModuleLeadAgent::State::ERROR);
}

// ============================================================================
// Issue #185: processSubordinateFailure propagates failure upward
// ============================================================================

TEST_F(ModuleLeadAgentTest, FailurePropagatedUpwardToRequester) {
// When all subordinates have reported terminal events and at least one has
// failed, processSubordinateFailure() must send a TASK_FAILED message to
// the requester so the ComponentLeadAgent does not permanently stall.
auto component = std::make_shared<agents::ComponentLeadAgent>("component_1");
auto module = std::make_shared<agents::ModuleLeadAgent>("module_1");
auto task1 = std::make_shared<agents::TaskAgent>("task_1");

component->setMessageBus(bus_.get());
module->setMessageBus(bus_.get());
task1->setMessageBus(bus_.get());

bus_->registerAgent(component->getAgentId(), component);
bus_->registerAgent(module->getAgentId(), module);
bus_->registerAgent(task1->getAgentId(), task1);

// Configure component → module → task hierarchy
component->setAvailableModuleLeads({"module_1"});
module->setAvailableTaskAgents({"task_1"});

// ComponentLeadAgent receives a goal and delegates to module_1
// Use a goal pattern that decomposes to exactly 1 module
component
->processMessage(core::KeystoneMessage::create("chief",
"component_1",
"Implement Core component: Messaging(42)"))
.get();

// module_1 should have received a goal — prime module coordination for 1 task
// Sender must be component_1 so requester_id_ is set correctly for upward propagation.
module->processMessage(core::KeystoneMessage::create("component_1", "module_1", "Calculate: 42"))
.get();

// Deliver a TASK_FAILED to the module from task_1
auto failure_msg =
core::KeystoneMessage::create("task_1", "module_1", "response", "execution failed");
failure_msg.action_type = core::ActionType::TASK_FAILED;
module->processMessage(failure_msg).get();

// module_1 must be in ERROR
EXPECT_EQ(module->getCurrentState(), agents::ModuleLeadAgent::State::ERROR);

// component_1 inbox should contain a TASK_FAILED message from module_1
bool received_failure = false;
std::optional<core::KeystoneMessage> msg;
while ((msg = component->getMessage()).has_value()) {
if (msg->action_type == core::ActionType::TASK_FAILED) {
received_failure = true;
break;
}
}
EXPECT_TRUE(received_failure) << "ComponentLeadAgent did not receive TASK_FAILED from module";
}

TEST_F(ModuleLeadAgentTest, FailureNotPropagatedUntilAllTerminal) {
// Upward propagation must be deferred until ALL subordinates have reported
// (success or failure) — partial failure must not immediately fire upward.
auto module = std::make_shared<agents::ModuleLeadAgent>("module_1");
module->setMessageBus(bus_.get());
bus_->registerAgent(module->getAgentId(), module);

// Register a dummy parent to capture upward messages
auto parent = std::make_shared<agents::ModuleLeadAgent>("parent_lead");
parent->setMessageBus(bus_.get());
bus_->registerAgent(parent->getAgentId(), parent);

std::vector<std::string> task_ids = {"task_1", "task_2"};
module->setAvailableTaskAgents(task_ids);

// Prime module with a 2-task goal, setting parent as requester
module
->processMessage(
core::KeystoneMessage::create("parent_lead", "module_1", "Calculate: 10 + 20"))
.get();

// First task fails — only 1 of 2 done, upward propagation must NOT fire yet
auto failure_msg = core::KeystoneMessage::create("task_1", "module_1", "response", "boom");
failure_msg.action_type = core::ActionType::TASK_FAILED;
module->processMessage(failure_msg).get();

// No TASK_FAILED should have been delivered to parent yet
bool premature_failure = false;
std::optional<core::KeystoneMessage> msg;
while ((msg = parent->getMessage()).has_value()) {
if (msg->action_type == core::ActionType::TASK_FAILED) {
premature_failure = true;
}
}
EXPECT_FALSE(premature_failure) << "TASK_FAILED was sent upward before all subtasks terminated";

// Second task succeeds — all done, now upward failure message must arrive
auto success_msg = core::KeystoneMessage::create("task_2", "module_1", "response", "20");
module->processMessage(success_msg).get();

EXPECT_EQ(module->getCurrentState(), agents::ModuleLeadAgent::State::ERROR);
}

TEST_F(ModuleLeadAgentTest, ConcurrentCoordination) {
auto module = std::make_shared<agents::ModuleLeadAgent>("module_1");
module->setMessageBus(bus_.get());
Expand Down
Loading