Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/daemon/main.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
#include "monitoring/health_check_server.hpp"
#include "monitoring/nats_status.hpp"
#include "network/nats_listener.hpp"
#include "transport/nats_connection.hpp"

#include <atomic>
#include <chrono>
#include <csignal>
#include <cstdlib>
#include <iostream>
#include <string>
#include <thread>

namespace {
std::atomic<bool> g_stop{false};

void signalHandler(int /*sig*/) {
g_stop.store(true, std::memory_order_release);
}

std::string envOr(const char* name, std::string def) {
const char* v = std::getenv(name); // NOLINT(concurrency-mt-unsafe)
return (v != nullptr && v[0] != '\0') ? std::string(v) : std::move(def);
}
} // namespace

int main() {
Expand All @@ -29,10 +40,63 @@ int main() {
"http://0.0.0.0:"
<< health_server.getPort() << "/v1/health\n";

// -------------------------------------------------------------------------
// Wire NATSListener into the daemon startup path (Issue #180).
//
// Configuration is drawn from environment variables so the binary stays
// zero-config by default:
// KEYSTONE_NATS_URL — NATS server URL (default: nats://localhost:4222)
// KEYSTONE_NATS_SUBJECT — subject pattern (default: hi.tasks.>)
// KEYSTONE_NATS_DURABLE — durable consumer (default: keystone-daemon)
// -------------------------------------------------------------------------

keystone::transport::NatsConfig nats_cfg;
nats_cfg.url = envOr("KEYSTONE_NATS_URL", "nats://localhost:4222");

keystone::network::NATSListenerConfig listener_cfg;
listener_cfg.subject = envOr("KEYSTONE_NATS_SUBJECT", "hi.tasks.>");
listener_cfg.durable_name = envOr("KEYSTONE_NATS_DURABLE", "keystone-daemon");
listener_cfg.max_ack_pending = 1;

// DAG-advance callback: log the event (production code would call the real
// DAG advancer once it is wired in from ProjectAgamemnon).
auto dag_advance = [](std::string_view team_id, std::string_view task_id) {
std::cout << "keystone-daemon: dag_advance team=" << team_id << " task=" << task_id << '\n';
};

keystone::transport::NatsConnection nats_conn(nats_cfg);
keystone::network::NATSListener listener(listener_cfg, dag_advance);

// Attempt to connect to NATS; log a warning but continue if unavailable so
// the health endpoint remains reachable.
if (nats_conn.connect()) {
jsCtx* js = nats_conn.jsContext();
if (js != nullptr) {
natsStatus s = listener.start(js);
if (s != NATS_OK) {
std::cerr << "keystone-daemon: NATSListener::start failed status=" << static_cast<int>(s)
<< " (continuing without NATS)\n";
} else {
std::cout << "keystone-daemon: NATSListener active subject=" << listener_cfg.subject
<< '\n';
}
} else {
std::cerr
<< "keystone-daemon: failed to obtain JetStream context (continuing without NATS)\n";
}
} else {
std::cerr << "keystone-daemon: NATS unavailable at " << nats_cfg.url
<< " (continuing without NATS)\n";
}

while (!g_stop.load(std::memory_order_acquire)) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

// Graceful shutdown: stop NATSListener before closing the connection.
listener.stop();
nats_conn.disconnect();

health_server.stop();
std::cout << "Keystone daemon stopped.\n";
return 0;
Expand Down
122 changes: 121 additions & 1 deletion tests/unit/test_component_lead_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
* - Module Decomposition (4 tests)
* - Coordination (4 tests)
* - State Machine (2 tests)
* - Failure Handling / DAG Deadlock Prevention (4 tests) — Issue #183
*
* Total: 12 tests
* Total: 16 tests
*/

#include "agents/component_lead_agent.hpp"
Expand Down Expand Up @@ -250,3 +251,122 @@ TEST_F(ComponentLeadAgentTest, ConcurrentCoordination) {
}
EXPECT_EQ(count, 50);
}

// ============================================================================
// Issue #183: DAG Deadlock Prevention — Failure Handling Tests (4 tests)
// Mirrors the ModuleLeadAgent failure-handling tests added for Issue #87.
// ============================================================================

TEST_F(ComponentLeadAgentTest, SingleModuleFailureTransitionsToError) {
auto component = std::make_shared<agents::ComponentLeadAgent>("component_1");
component->setMessageBus(bus_.get());
bus_->registerAgent(component->getAgentId(), component);

std::vector<std::string> module_ids = {"module_1"};
component->setAvailableModuleLeads(module_ids);

// Decompose "Messaging(10)" into 1 module goal, which initialises coordination
// for 1 expected result.
component
->processMessage(
core::KeystoneMessage::create("chief", "component_1", "Build Core: Messaging(10)"))
.get();

// Simulate the single module reporting failure
auto failure_msg =
core::KeystoneMessage::create("module_1", "component_1", "response", "compile error");
failure_msg.action_type = core::ActionType::TASK_FAILED;
component->processMessage(failure_msg).get();

// Agent must be in ERROR state — DAG is not deadlocked
EXPECT_EQ(component->getCurrentState(), agents::ComponentLeadAgent::State::ERROR);
}

TEST_F(ComponentLeadAgentTest, SynthesizeAfterModuleFailureReturnsErrorMessage) {
auto component = std::make_shared<agents::ComponentLeadAgent>("component_1");
component->setMessageBus(bus_.get());
bus_->registerAgent(component->getAgentId(), component);

std::vector<std::string> module_ids = {"module_1", "module_2"};
component->setAvailableModuleLeads(module_ids);

// Decompose into 2 module goals
component
->processMessage(core::KeystoneMessage::create(
"chief", "component_1", "Build Core: Messaging(10) and Concurrency(20)"))
.get();

// One success
auto success_msg =
core::KeystoneMessage::create("module_1", "component_1", "module_result", "messaging done");
component->processMessage(success_msg).get();

// One failure
auto failure_msg =
core::KeystoneMessage::create("module_2", "component_1", "response", "linker error");
failure_msg.action_type = core::ActionType::TASK_FAILED;
component->processMessage(failure_msg).get();

EXPECT_EQ(component->getCurrentState(), agents::ComponentLeadAgent::State::ERROR);

// synthesizeComponentResult() must surface the error, not silently succeed
auto result = component->synthesizeComponentResult();
EXPECT_NE(result.find("ERROR"), std::string::npos);
}

TEST_F(ComponentLeadAgentTest, ModuleFailureBeforeAllResultsDoesNotDeadlock) {
auto component = std::make_shared<agents::ComponentLeadAgent>("component_1");
component->setMessageBus(bus_.get());
bus_->registerAgent(component->getAgentId(), component);

std::vector<std::string> module_ids = {"module_1", "module_2", "module_3"};
component->setAvailableModuleLeads(module_ids);

// Decompose into 3 module goals
component
->processMessage(core::KeystoneMessage::create(
"chief",
"component_1",
"Build Core: Messaging(10) and Concurrency(20) and Storage(30)"))
.get();

// First module fails immediately — must not leave the other two permanently pending
auto failure_msg =
core::KeystoneMessage::create("module_1", "component_1", "response", "fatal error");
failure_msg.action_type = core::ActionType::TASK_FAILED;
component->processMessage(failure_msg).get();

// State must not remain stuck in WAITING_FOR_MODULES after a terminal event
auto state = component->getCurrentState();
EXPECT_TRUE(state == agents::ComponentLeadAgent::State::ERROR ||
state == agents::ComponentLeadAgent::State::WAITING_FOR_MODULES);
}

TEST_F(ComponentLeadAgentTest, SuccessResultAfterModuleFailureStillCountsTowardCompletion) {
auto component = std::make_shared<agents::ComponentLeadAgent>("component_1");
component->setMessageBus(bus_.get());
bus_->registerAgent(component->getAgentId(), component);

std::vector<std::string> module_ids = {"module_1", "module_2"};
component->setAvailableModuleLeads(module_ids);

// Decompose into 2 module goals
component
->processMessage(core::KeystoneMessage::create(
"chief", "component_1", "Build Core: Messaging(10) and Concurrency(20)"))
.get();

// Failure arrives first
auto failure_msg =
core::KeystoneMessage::create("module_1", "component_1", "response", "timeout");
failure_msg.action_type = core::ActionType::TASK_FAILED;
component->processMessage(failure_msg).get();

// Then a success — all results are now terminal; agent must not remain WAITING
auto success_msg =
core::KeystoneMessage::create("module_2", "component_1", "module_result", "concurrency done");
component->processMessage(success_msg).get();

auto state = component->getCurrentState();
EXPECT_NE(state, agents::ComponentLeadAgent::State::WAITING_FOR_MODULES);
}
Loading