Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/core/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ struct KeystoneMessage {
// Phase 1.2 (Issue #52): Task cancellation
std::optional<std::string> task_id; ///< Optional task ID for tracking/cancellation

// Issue #285: Cross-host tracing
std::optional<std::string> correlation_id; ///< Optional correlation ID for distributed tracing

// Payload and timing
std::string command; ///< Command string to execute (legacy/convenience)
std::optional<std::string> payload; ///< Optional payload data
Expand Down
4 changes: 4 additions & 0 deletions include/core/message_serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ struct SerializableMessage {
bool has_payload;
int64_t timestamp_ns; // Timestamp as nanoseconds since epoch

// Issue #285: Cross-host tracing correlation ID
cista::offset::string correlation_id;
bool has_correlation_id{false};

/**
* @brief Convert KeystoneMessage to SerializableMessage
*/
Expand Down
26 changes: 24 additions & 2 deletions include/monitoring/health_check_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,27 @@ class HealthCheckServer {
*/
using ReadinessCheck = std::function<bool()>;

/**
* @brief NatsConnection connectivity check function type (issue #204)
*
* Lambda/callback that returns true when the NATS connection is established.
* Supplied by the caller to decouple HealthCheckServer from NatsConnection.
* When set, the readiness probe returns 503 until this callback returns true.
*/
using NatsConnectionCheck = std::function<bool()>;

/**
* @brief Construct health check server with configuration
* @param port HTTP server port (default: 8080 for Kubernetes)
* @param readiness_check Optional custom readiness check function
* @param nats_status Optional NATS status tracker (non-owning); used by /v1/health
* @param nats_connection_check Optional NATS connection check (issue #204);
* when supplied the readiness probe is not ready until this returns true
*/
explicit HealthCheckServer(uint16_t port = 8080,
ReadinessCheck readiness_check = nullptr,
NatsStatusTracker* nats_status = nullptr);
NatsStatusTracker* nats_status = nullptr,
NatsConnectionCheck nats_connection_check = nullptr);

/**
* @brief Destructor - stops server if running
Expand Down Expand Up @@ -85,6 +97,15 @@ class HealthCheckServer {
*/
void setReadinessCheck(ReadinessCheck check);

/**
* @brief Set NATS connection check function (issue #204)
*
* Wires a NatsConnection::isConnected()-style lambda into the readiness
* probe. When set, /ready returns 503 until the callback returns true.
* Pass nullptr to clear (readiness probe ignores NATS connectivity again).
*/
void setNatsConnectionCheck(NatsConnectionCheck check);

private:
/**
* @brief HTTP server loop (runs in background thread)
Expand Down Expand Up @@ -123,7 +144,8 @@ class HealthCheckServer {
std::atomic<int> server_fd_{-1};
mutable std::mutex readiness_mutex_;
ReadinessCheck readiness_check_;
NatsStatusTracker* nats_status_{nullptr}; // non-owning
NatsConnectionCheck nats_connection_check_; // issue #204: gates /ready on NATS connectivity
NatsStatusTracker* nats_status_{nullptr}; // non-owning
};

} // namespace monitoring
Expand Down
1 change: 1 addition & 0 deletions include/network/nats_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct NATSListenerConfig {
std::string subject; ///< NATS subject pattern, e.g. "hi.tasks.>"
std::string durable_name; ///< Durable consumer name for JetStream
int max_ack_pending{1}; ///< Max unacked messages per CLAUDE.md rate-limit
int max_attempts{3}; ///< Maximum subscribe attempts before giving up (issue #331)
};

/// Callback invoked when a terminal task event advances the DAG.
Expand Down
16 changes: 16 additions & 0 deletions src/core/message_serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ SerializableMessage SerializableMessage::fromKeystoneMessage(const KeystoneMessa
auto duration = msg.timestamp.time_since_epoch();
smsg.timestamp_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();

// Issue #285: Propagate correlation_id for cross-host tracing
if (msg.correlation_id.has_value()) {
smsg.correlation_id = cista::offset::string{msg.correlation_id.value().c_str()};
smsg.has_correlation_id = true;
} else {
smsg.correlation_id = cista::offset::string{""};
smsg.has_correlation_id = false;
}

return smsg;
}

Expand Down Expand Up @@ -76,6 +85,13 @@ KeystoneMessage SerializableMessage::toKeystoneMessage() const {
msg.priority = Priority::NORMAL;
msg.deadline = std::nullopt;

// Issue #285: Restore correlation_id from serialized form
if (has_correlation_id) {
msg.correlation_id = std::string{correlation_id.data(), correlation_id.size()};
} else {
msg.correlation_id = std::nullopt;
}

return msg;
}

Expand Down
15 changes: 13 additions & 2 deletions src/monitoring/health_check_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ class SocketHandle {

HealthCheckServer::HealthCheckServer(uint16_t port,
ReadinessCheck readiness_check,
NatsStatusTracker* nats_status)
NatsStatusTracker* nats_status,
NatsConnectionCheck nats_connection_check)
: port_(port),
server_fd_(-1),
readiness_check_(std::move(readiness_check)),
nats_connection_check_(std::move(nats_connection_check)),
nats_status_(nats_status) {}

HealthCheckServer::~HealthCheckServer() {
Expand Down Expand Up @@ -165,6 +167,11 @@ void HealthCheckServer::setReadinessCheck(ReadinessCheck check) {
readiness_check_ = std::move(check);
}

void HealthCheckServer::setNatsConnectionCheck(NatsConnectionCheck check) {
std::lock_guard<std::mutex> lock(readiness_mutex_);
nats_connection_check_ = std::move(check);
}

void HealthCheckServer::serverLoop() {
while (running_.load()) {
// Use poll() with timeout to check for incoming connections
Expand Down Expand Up @@ -308,7 +315,11 @@ void HealthCheckServer::handleRequest(int client_fd) {
bool ready = true;
{
std::lock_guard<std::mutex> lock(readiness_mutex_);
if (readiness_check_) {
// issue #204: gate on NATS connection readiness first
if (nats_connection_check_ && !nats_connection_check_()) {
ready = false;
}
if (ready && readiness_check_) {
ready = readiness_check_();
}
}
Expand Down
23 changes: 17 additions & 6 deletions src/network/nats_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,24 @@ natsStatus NATSListener::start(jsCtx* js) {
jsSubOptions_Init(&sub_opts);
sub_opts.Config.MaxAckPending = cfg_.max_ack_pending;

jsErrCode jerr = jsErrCode(0);
natsStatus s =
js_Subscribe(&sub_, js, cfg_.subject.c_str(), on_msg, this, nullptr, &sub_opts, &jerr);
const int attempts = cfg_.max_attempts > 0 ? cfg_.max_attempts : 1;
natsStatus s = NATS_ERR;
for (int attempt = 1; attempt <= attempts; ++attempt) {
jsErrCode jerr = jsErrCode(0);
s = js_Subscribe(&sub_, js, cfg_.subject.c_str(), on_msg, this, nullptr, &sub_opts, &jerr);
if (s == NATS_OK) {
break;
}
spdlog::warn("NATSListener: subscribe attempt {}/{} failed status={} jerr={}",
attempt,
attempts,
static_cast<int>(s),
static_cast<int>(jerr));
}
if (s != NATS_OK) {
spdlog::error("NATSListener: subscribe failed status={} jerr={}",
static_cast<int>(s),
static_cast<int>(jerr));
spdlog::error("NATSListener: all {} subscribe attempt(s) failed status={}",
attempts,
static_cast<int>(s));
}
return s;
}
Expand Down
Loading