diff --git a/include/core/message.hpp b/include/core/message.hpp index ad3bd9c1..7eb6e4e7 100644 --- a/include/core/message.hpp +++ b/include/core/message.hpp @@ -133,6 +133,9 @@ struct KeystoneMessage { // Phase 1.2 (Issue #52): Task cancellation std::optional task_id; ///< Optional task ID for tracking/cancellation + // Issue #285: Cross-host tracing + std::optional correlation_id; ///< Optional correlation ID for distributed tracing + // Payload and timing std::string command; ///< Command string to execute (legacy/convenience) std::optional payload; ///< Optional payload data diff --git a/include/core/message_serializer.hpp b/include/core/message_serializer.hpp index dfdc4e1c..7aa75752 100644 --- a/include/core/message_serializer.hpp +++ b/include/core/message_serializer.hpp @@ -33,6 +33,10 @@ struct SerializableMessage { bool has_payload; int64_t timestamp_ns; // Timestamp as nanoseconds since epoch + // Issue #285: Cross-host tracing correlation ID + cista::offset::string correlation_id; + bool has_correlation_id{false}; + /** * @brief Convert KeystoneMessage to SerializableMessage */ diff --git a/include/monitoring/health_check_server.hpp b/include/monitoring/health_check_server.hpp index 18aa8c55..363a639e 100644 --- a/include/monitoring/health_check_server.hpp +++ b/include/monitoring/health_check_server.hpp @@ -37,15 +37,27 @@ class HealthCheckServer { */ using ReadinessCheck = std::function; + /** + * @brief NatsConnection connectivity check function type (issue #204) + * + * Lambda/callback that returns true when the NATS connection is established. + * Supplied by the caller to decouple HealthCheckServer from NatsConnection. + * When set, the readiness probe returns 503 until this callback returns true. + */ + using NatsConnectionCheck = std::function; + /** * @brief Construct health check server with configuration * @param port HTTP server port (default: 8080 for Kubernetes) * @param readiness_check Optional custom readiness check function * @param nats_status Optional NATS status tracker (non-owning); used by /v1/health + * @param nats_connection_check Optional NATS connection check (issue #204); + * when supplied the readiness probe is not ready until this returns true */ explicit HealthCheckServer(uint16_t port = 8080, ReadinessCheck readiness_check = nullptr, - NatsStatusTracker* nats_status = nullptr); + NatsStatusTracker* nats_status = nullptr, + NatsConnectionCheck nats_connection_check = nullptr); /** * @brief Destructor - stops server if running @@ -85,6 +97,15 @@ class HealthCheckServer { */ void setReadinessCheck(ReadinessCheck check); + /** + * @brief Set NATS connection check function (issue #204) + * + * Wires a NatsConnection::isConnected()-style lambda into the readiness + * probe. When set, /ready returns 503 until the callback returns true. + * Pass nullptr to clear (readiness probe ignores NATS connectivity again). + */ + void setNatsConnectionCheck(NatsConnectionCheck check); + private: /** * @brief HTTP server loop (runs in background thread) @@ -123,7 +144,8 @@ class HealthCheckServer { std::atomic server_fd_{-1}; mutable std::mutex readiness_mutex_; ReadinessCheck readiness_check_; - NatsStatusTracker* nats_status_{nullptr}; // non-owning + NatsConnectionCheck nats_connection_check_; // issue #204: gates /ready on NATS connectivity + NatsStatusTracker* nats_status_{nullptr}; // non-owning }; } // namespace monitoring diff --git a/include/network/nats_listener.hpp b/include/network/nats_listener.hpp index b4499dcc..55d1912c 100644 --- a/include/network/nats_listener.hpp +++ b/include/network/nats_listener.hpp @@ -15,6 +15,7 @@ struct NATSListenerConfig { std::string subject; ///< NATS subject pattern, e.g. "hi.tasks.>" std::string durable_name; ///< Durable consumer name for JetStream int max_ack_pending{1}; ///< Max unacked messages per CLAUDE.md rate-limit + int max_attempts{3}; ///< Maximum subscribe attempts before giving up (issue #331) }; /// Callback invoked when a terminal task event advances the DAG. diff --git a/src/core/message_serializer.cpp b/src/core/message_serializer.cpp index 25945455..9e7d0c17 100644 --- a/src/core/message_serializer.cpp +++ b/src/core/message_serializer.cpp @@ -40,6 +40,15 @@ SerializableMessage SerializableMessage::fromKeystoneMessage(const KeystoneMessa auto duration = msg.timestamp.time_since_epoch(); smsg.timestamp_ns = std::chrono::duration_cast(duration).count(); + // Issue #285: Propagate correlation_id for cross-host tracing + if (msg.correlation_id.has_value()) { + smsg.correlation_id = cista::offset::string{msg.correlation_id.value().c_str()}; + smsg.has_correlation_id = true; + } else { + smsg.correlation_id = cista::offset::string{""}; + smsg.has_correlation_id = false; + } + return smsg; } @@ -76,6 +85,13 @@ KeystoneMessage SerializableMessage::toKeystoneMessage() const { msg.priority = Priority::NORMAL; msg.deadline = std::nullopt; + // Issue #285: Restore correlation_id from serialized form + if (has_correlation_id) { + msg.correlation_id = std::string{correlation_id.data(), correlation_id.size()}; + } else { + msg.correlation_id = std::nullopt; + } + return msg; } diff --git a/src/monitoring/health_check_server.cpp b/src/monitoring/health_check_server.cpp index ee02c574..2a29682d 100644 --- a/src/monitoring/health_check_server.cpp +++ b/src/monitoring/health_check_server.cpp @@ -56,10 +56,12 @@ class SocketHandle { HealthCheckServer::HealthCheckServer(uint16_t port, ReadinessCheck readiness_check, - NatsStatusTracker* nats_status) + NatsStatusTracker* nats_status, + NatsConnectionCheck nats_connection_check) : port_(port), server_fd_(-1), readiness_check_(std::move(readiness_check)), + nats_connection_check_(std::move(nats_connection_check)), nats_status_(nats_status) {} HealthCheckServer::~HealthCheckServer() { @@ -165,6 +167,11 @@ void HealthCheckServer::setReadinessCheck(ReadinessCheck check) { readiness_check_ = std::move(check); } +void HealthCheckServer::setNatsConnectionCheck(NatsConnectionCheck check) { + std::lock_guard lock(readiness_mutex_); + nats_connection_check_ = std::move(check); +} + void HealthCheckServer::serverLoop() { while (running_.load()) { // Use poll() with timeout to check for incoming connections @@ -308,7 +315,11 @@ void HealthCheckServer::handleRequest(int client_fd) { bool ready = true; { std::lock_guard lock(readiness_mutex_); - if (readiness_check_) { + // issue #204: gate on NATS connection readiness first + if (nats_connection_check_ && !nats_connection_check_()) { + ready = false; + } + if (ready && readiness_check_) { ready = readiness_check_(); } } diff --git a/src/network/nats_listener.cpp b/src/network/nats_listener.cpp index 6e793ab4..bb4700bb 100644 --- a/src/network/nats_listener.cpp +++ b/src/network/nats_listener.cpp @@ -115,13 +115,24 @@ natsStatus NATSListener::start(jsCtx* js) { jsSubOptions_Init(&sub_opts); sub_opts.Config.MaxAckPending = cfg_.max_ack_pending; - jsErrCode jerr = jsErrCode(0); - natsStatus s = - js_Subscribe(&sub_, js, cfg_.subject.c_str(), on_msg, this, nullptr, &sub_opts, &jerr); + const int attempts = cfg_.max_attempts > 0 ? cfg_.max_attempts : 1; + natsStatus s = NATS_ERR; + for (int attempt = 1; attempt <= attempts; ++attempt) { + jsErrCode jerr = jsErrCode(0); + s = js_Subscribe(&sub_, js, cfg_.subject.c_str(), on_msg, this, nullptr, &sub_opts, &jerr); + if (s == NATS_OK) { + break; + } + spdlog::warn("NATSListener: subscribe attempt {}/{} failed status={} jerr={}", + attempt, + attempts, + static_cast(s), + static_cast(jerr)); + } if (s != NATS_OK) { - spdlog::error("NATSListener: subscribe failed status={} jerr={}", - static_cast(s), - static_cast(jerr)); + spdlog::error("NATSListener: all {} subscribe attempt(s) failed status={}", + attempts, + static_cast(s)); } return s; }