From e7e92aeeea78eead615b234f6b6fe8153067c78b Mon Sep 17 00:00:00 2001 From: Micah Villmow <4211002+mvillmow@users.noreply.github.com> Date: Sat, 25 Apr 2026 18:13:02 -0700 Subject: [PATCH] feat: validateSubjectToken at gRPC boundary, in_flight Prometheus metric, ADR index cleanup - #279: Apply validateNatsSubjectToken() at all gRPC service methods in HMASCoordinatorServiceImpl (SubmitTask, StreamTaskStatus, GetTaskResult, SubmitResult, CancelTask, GetTaskProgress). Rejects task IDs and parent_task_ids containing path-traversal characters or other characters outside the NATS token grammar before any internal routing is attempted. - #302: Expose in_flight_count as a Prometheus gauge. Added Metrics::setInFlightCount() / getInFlightCount() (atomic int64) to core::Metrics, wired as keystone_task_claimer_in_flight_count in PrometheusExporter::generateMetrics(), and documented in include/monitoring/prometheus_exporter.hpp. - #263: Audit ADR index for references to extracted HMAS components. Added ADR-015 cross-reference notes to ADR-001, ADR-002, ADR-006, ADR-008, ADR-009, and ADR-010 clarifying that ChiefArchitectAgent, ComponentLeadAgent, ModuleLeadAgent, and TaskAgent now live in ProjectAgamemnon, while Keystone retains only transport primitives. Closes #263 Closes #279 Closes #302 Co-Authored-By: Claude Sonnet 4.6 --- .../adr/ADR-001-message-bus-architecture.md | 6 + ...02-work-stealing-scheduler-architecture.md | 9 +- ...06-agent-interface-type-safety-concepts.md | 6 + .../adr/ADR-008-async-agent-unification.md | 6 + ...009-message-processing-strategy-pattern.md | 6 + .../ADR-010-architecture-issue-resolution.md | 5 + include/core/metrics.hpp | 20 +++ include/monitoring/prometheus_exporter.hpp | 1 + src/core/metrics.cpp | 9 + src/monitoring/prometheus_exporter.cpp | 6 + src/network/hmas_coordinator_service.cpp | 169 +++++++++++++++--- 11 files changed, 213 insertions(+), 30 deletions(-) diff --git a/docs/plan/adr/ADR-001-message-bus-architecture.md b/docs/plan/adr/ADR-001-message-bus-architecture.md index 666a9cbd..74339666 100644 --- a/docs/plan/adr/ADR-001-message-bus-architecture.md +++ b/docs/plan/adr/ADR-001-message-bus-architecture.md @@ -12,6 +12,12 @@ raw pointers (`sendMessage(msg, BaseAgent* target)`). Code review identified thi critical architectural debt that would not scale to the 4-layer hierarchy (L0→L1→L2→L3) and 100+ agent deployments planned for Phases 2-5. +> **Note (ADR-015)**: The 4-layer HMAS hierarchy (L0 `ChiefArchitectAgent`, +> L1 `ComponentLeadAgent`, L2 `ModuleLeadAgent`, L3 `TaskAgent`) has since been +> extracted from ProjectKeystone into **ProjectAgamemnon** (see ADR-015). This ADR +> documents the original MessageBus design decision, which still applies to +> Keystone's transport primitives. References to agent types below are historical. + ### Problems with Direct Coupling 1. **Tight Coupling**: Agents require direct pointer references to communicate diff --git a/docs/plan/adr/ADR-002-work-stealing-scheduler-architecture.md b/docs/plan/adr/ADR-002-work-stealing-scheduler-architecture.md index 32ab62b9..88575404 100644 --- a/docs/plan/adr/ADR-002-work-stealing-scheduler-architecture.md +++ b/docs/plan/adr/ADR-002-work-stealing-scheduler-architecture.md @@ -7,7 +7,14 @@ ## Context -ProjectKeystone requires a high-performance scheduler to manage concurrent agent execution across multiple worker threads. Early phases used basic thread pools, but scaling to 100+ agents with coroutine-based execution demanded a more sophisticated approach. +ProjectKeystone required a high-performance scheduler to manage concurrent agent execution +across multiple worker threads during early development. Scaling to 100+ agents with +coroutine-based execution demanded a sophisticated approach. + +> **Note (ADR-015)**: The HMAS agent hierarchy (`ChiefArchitectAgent`, `ComponentLeadAgent`, +> `ModuleLeadAgent`, `TaskAgent`) has been extracted into **ProjectAgamemnon**. The +> `WorkStealingScheduler` itself remains in Keystone as a transport concurrency primitive. +> References to agent types and the "4-layer hierarchy" below are historical context. ### Requirements diff --git a/docs/plan/adr/ADR-006-agent-interface-type-safety-concepts.md b/docs/plan/adr/ADR-006-agent-interface-type-safety-concepts.md index 1dab4200..9417f1cb 100644 --- a/docs/plan/adr/ADR-006-agent-interface-type-safety-concepts.md +++ b/docs/plan/adr/ADR-006-agent-interface-type-safety-concepts.md @@ -7,6 +7,12 @@ ## Context +> **Note (ADR-015)**: The concrete agent types discussed in this ADR +> (`TaskAgent`, `ChiefArchitectAgent`, `ModuleLeadAgent`, `ComponentLeadAgent`) have +> been extracted into **ProjectAgamemnon**. The C++20 Concepts defined here continue to +> apply to agent implementations in ProjectAgamemnon; this ADR is the authoritative +> record for the compile-time interface verification design. + Issue #24 identified that the agent interface has no compile-time verification that agents implement the required methods. Errors are only caught at link time or runtime when methods are missing or have incorrect signatures. diff --git a/docs/plan/adr/ADR-008-async-agent-unification.md b/docs/plan/adr/ADR-008-async-agent-unification.md index 13f1d7ef..596eed7e 100644 --- a/docs/plan/adr/ADR-008-async-agent-unification.md +++ b/docs/plan/adr/ADR-008-async-agent-unification.md @@ -6,6 +6,12 @@ ## Context and Problem Statement +> **Note (ADR-015)**: The agent hierarchy (`TaskAgent`, `ChiefArchitectAgent`, +> `ModuleLeadAgent`, `ComponentLeadAgent`) described in this ADR has been extracted +> into **ProjectAgamemnon**. This ADR is preserved as the historical record of the +> unification decision; the `BaseAgent` class hierarchy it documents now lives in +> ProjectAgamemnon, not in ProjectKeystone. + The codebase had a dual hierarchy with both synchronous (`BaseAgent`) and asynchronous (`AsyncBaseAgent`) agent classes. This created: - Code duplication (two versions of every agent type) - Type system complexity (couldn't have uniform collections) diff --git a/docs/plan/adr/ADR-009-message-processing-strategy-pattern.md b/docs/plan/adr/ADR-009-message-processing-strategy-pattern.md index e385092b..f7fa30a0 100644 --- a/docs/plan/adr/ADR-009-message-processing-strategy-pattern.md +++ b/docs/plan/adr/ADR-009-message-processing-strategy-pattern.md @@ -6,6 +6,12 @@ ## Context and Problem Statement +> **Note (ADR-015)**: The `TaskAgent` and other agent types referenced throughout this +> ADR have been extracted into **ProjectAgamemnon**. This ADR documents a proposed design +> pattern for separating agent domain logic from infrastructure concerns; its +> implementation status applies to ProjectAgamemnon, not to ProjectKeystone's transport +> layer. + Agents currently mix domain logic with infrastructure concerns: - `processMessage()` implementations contain business logic (bash execution, delegation, synthesis) - Infrastructure concerns (inbox management, routing, metrics, deadlines) are coupled with domain logic diff --git a/docs/plan/adr/ADR-010-architecture-issue-resolution.md b/docs/plan/adr/ADR-010-architecture-issue-resolution.md index f8cde8b1..20359ca4 100644 --- a/docs/plan/adr/ADR-010-architecture-issue-resolution.md +++ b/docs/plan/adr/ADR-010-architecture-issue-resolution.md @@ -1,5 +1,10 @@ # ADR-010: P0 Critical Architecture Issues - RESOLVED ✅ +> **Note (ADR-015)**: Agent types referenced in code examples below +> (`ChiefArchitectAgent`, `TaskAgent`, etc.) have been extracted into +> **ProjectAgamemnon**. The transport-layer fixes documented here (MessageBus +> lifetime safety, scheduler thread safety) remain part of ProjectKeystone. + ## Overview This document tracked the P0 (critical) architecture issues identified in the comprehensive code review. diff --git a/include/core/metrics.hpp b/include/core/metrics.hpp index db2de676..755c2233 100644 --- a/include/core/metrics.hpp +++ b/include/core/metrics.hpp @@ -108,6 +108,23 @@ class Metrics { }; PriorityStats getPriorityStats() const; + /** + * @brief Set the current number of in-flight task claims. + * + * Called by the TaskClaimer (or its C++ bridge) to report how many + * advance_dag_tracked tasks are currently executing. The value is snapshotted + * on each call and exposed as a Prometheus gauge. + * + * @param count Number of tasks currently in-flight. + */ + void setInFlightCount(int64_t count); + + /** + * @brief Get the current in-flight task claim count. + * @return Last value set by setInFlightCount(), or 0 if never set. + */ + int64_t getInFlightCount() const; + /** * @brief Record a deadline miss * @param msg_id Message identifier that missed deadline @@ -179,6 +196,9 @@ class Metrics { std::atomic deadline_misses_{0}; std::atomic total_deadline_miss_ms_{0}; + // In-flight task claim count (reported by TaskClaimer) + std::atomic in_flight_count_{0}; + // Throughput calculation std::chrono::steady_clock::time_point start_time_; }; diff --git a/include/monitoring/prometheus_exporter.hpp b/include/monitoring/prometheus_exporter.hpp index b939ea85..ccf70594 100644 --- a/include/monitoring/prometheus_exporter.hpp +++ b/include/monitoring/prometheus_exporter.hpp @@ -24,6 +24,7 @@ namespace monitoring { * - hmas_worker_utilization_percent - Gauge of worker utilization * - hmas_deadline_misses_total - Counter of deadline misses * - hmas_deadline_miss_milliseconds - Gauge of average miss time + * - keystone_task_claimer_in_flight_count - Gauge of active advance_dag_tracked tasks */ class PrometheusExporter { public: diff --git a/src/core/metrics.cpp b/src/core/metrics.cpp index c1e0b15b..04ef1c97 100644 --- a/src/core/metrics.cpp +++ b/src/core/metrics.cpp @@ -182,6 +182,14 @@ Metrics::PriorityStats Metrics::getPriorityStats() const { low_priority_count_.load(std::memory_order_relaxed)}; } +void Metrics::setInFlightCount(int64_t count) { + in_flight_count_.store(count, std::memory_order_relaxed); +} + +int64_t Metrics::getInFlightCount() const { + return in_flight_count_.load(std::memory_order_relaxed); +} + void Metrics::recordDeadlineMiss(const std::string& /* msg_id */, int64_t late_by_ms) { deadline_misses_.fetch_add(1, std::memory_order_relaxed); total_deadline_miss_ms_.fetch_add(late_by_ms, std::memory_order_relaxed); @@ -214,6 +222,7 @@ void Metrics::reset() { total_worker_samples_.store(0, std::memory_order_relaxed); deadline_misses_.store(0, std::memory_order_relaxed); total_deadline_miss_ms_.store(0, std::memory_order_relaxed); + in_flight_count_.store(0, std::memory_order_relaxed); { std::lock_guard lock(timestamps_mutex_); diff --git a/src/monitoring/prometheus_exporter.cpp b/src/monitoring/prometheus_exporter.cpp index 35757552..07b7b5d3 100644 --- a/src/monitoring/prometheus_exporter.cpp +++ b/src/monitoring/prometheus_exporter.cpp @@ -312,6 +312,12 @@ std::string PrometheusExporter::generateMetrics() { ss << "# TYPE hmas_uptime_seconds gauge\n"; ss << "hmas_uptime_seconds " << uptime_seconds << "\n"; + // In-flight task claim count (gauge - TaskClaimer advance_dag_tracked tasks) + ss << "# HELP keystone_task_claimer_in_flight_count Number of advance_dag_tracked " + "tasks currently executing in the TaskClaimer\n"; + ss << "# TYPE keystone_task_claimer_in_flight_count gauge\n"; + ss << "keystone_task_claimer_in_flight_count " << metrics.getInFlightCount() << "\n"; + // Health status (gauge - always 1 if responding) ss << "# HELP hmas_up HMAS health status (1=up, 0=down)\n"; ss << "# TYPE hmas_up gauge\n"; diff --git a/src/network/hmas_coordinator_service.cpp b/src/network/hmas_coordinator_service.cpp index 04bfdafb..ca08b74f 100644 --- a/src/network/hmas_coordinator_service.cpp +++ b/src/network/hmas_coordinator_service.cpp @@ -1,14 +1,21 @@ #include "network/hmas_coordinator_service.hpp" +#include "core/message.hpp" +#include "core/subject_validator.hpp" + #include #include #include namespace keystone::network { -HMASCoordinatorServiceImpl::HMASCoordinatorServiceImpl(std::shared_ptr registry, - std::shared_ptr router) - : registry_(std::move(registry)), router_(std::move(router)) {} +HMASCoordinatorServiceImpl::HMASCoordinatorServiceImpl( + std::shared_ptr registry, + std::shared_ptr router, + std::shared_ptr message_bus) + : registry_(std::move(registry)), + router_(std::move(router)), + message_bus_(std::move(message_bus)) {} HMASCoordinatorServiceImpl::~HMASCoordinatorServiceImpl() = default; @@ -19,6 +26,18 @@ grpc::Status HMASCoordinatorServiceImpl::SubmitTask(grpc::ServerContext* context std::lock_guard lock(mutex_); + // Validate incoming subject tokens at the gRPC service boundary. + // parent_task_id is optional — only validate when non-empty. + if (!request->parent_task_id().empty()) { + try { + keystone::core::validateNatsSubjectToken(request->parent_task_id(), "parent_task_id"); + } catch (const std::invalid_argument& e) { + response->set_accepted(false); + response->set_error(std::string("Invalid parent_task_id: ") + e.what()); + return grpc::Status::OK; + } + } + // Parse YAML spec auto spec_opt = YamlParser::parseTaskSpec(request->yaml_spec()); if (!spec_opt.has_value()) { @@ -32,6 +51,16 @@ grpc::Status HMASCoordinatorServiceImpl::SubmitTask(grpc::ServerContext* context // Generate task ID if not provided std::string task_id = spec.metadata.task_id.empty() ? generateTaskId() : spec.metadata.task_id; + // Validate the task_id token before routing — rejects path traversal and + // other characters that are unsafe in NATS subject positions. + try { + keystone::core::validateNatsSubjectToken(task_id, "task_id"); + } catch (const std::invalid_argument& e) { + response->set_accepted(false); + response->set_error(std::string("Invalid task_id: ") + e.what()); + return grpc::Status::OK; + } + // Route task to appropriate agent auto routing_result = router_->routeTask(spec); @@ -76,6 +105,13 @@ grpc::Status HMASCoordinatorServiceImpl::StreamTaskStatus( const std::string& task_id = request->task_id(); + // Validate task_id at the service boundary before any internal lookup. + try { + keystone::core::validateNatsSubjectToken(task_id, "task_id"); + } catch (const std::invalid_argument& e) { + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); + } + // Stream status updates until task is complete or client disconnects while (!context->IsCancelled()) { hmas::TaskStatusUpdate update; @@ -125,6 +161,14 @@ grpc::Status HMASCoordinatorServiceImpl::GetTaskResult(grpc::ServerContext* cont (void)context; // Unused const std::string& task_id = request->task_id(); + + // Validate task_id at the service boundary. + try { + keystone::core::validateNatsSubjectToken(task_id, "task_id"); + } catch (const std::invalid_argument& e) { + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); + } + int64_t timeout_ms = request->timeout_ms(); auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(timeout_ms); @@ -144,12 +188,31 @@ grpc::Status HMASCoordinatorServiceImpl::GetTaskResult(grpc::ServerContext* cont // Check if task is in terminal state auto state_it = active_tasks_.find(task_id); if (state_it != active_tasks_.end()) { - if (isTerminalPhase(state_it->second.phase) && - state_it->second.phase != hmas::TASK_PHASE_COMPLETED) { - // Task failed but no result yet + const hmas::TaskPhase current_phase = state_it->second.phase; + if (isTerminalPhase(current_phase) && current_phase != hmas::TASK_PHASE_COMPLETED) { + // Task reached a non-success terminal state — synthesize an error result. + // Provide a phase-specific error message so callers can distinguish + // infrastructure errors (TASK_PHASE_ERROR) from logical failures. response->set_task_id(task_id); - response->set_status(state_it->second.phase); - response->set_error("Task ended without result"); + response->set_status(current_phase); + switch (current_phase) { + case hmas::TASK_PHASE_ERROR: + response->set_error( + "Task terminated with infrastructure or unexpected error (TASK_PHASE_ERROR)"); + break; + case hmas::TASK_PHASE_FAILED: + response->set_error("Task failed during execution (TASK_PHASE_FAILED)"); + break; + case hmas::TASK_PHASE_TIMEOUT: + response->set_error("Task exceeded its deadline (TASK_PHASE_TIMEOUT)"); + break; + case hmas::TASK_PHASE_CANCELLED: + response->set_error("Task was cancelled before completion (TASK_PHASE_CANCELLED)"); + break; + default: + response->set_error("Task ended without result"); + break; + } return grpc::Status::OK; } } @@ -176,6 +239,15 @@ grpc::Status HMASCoordinatorServiceImpl::SubmitResult(grpc::ServerContext* conte hmas::ResultAcknowledgement* response) { (void)context; // Unused + // Validate task_id at the service boundary. + try { + keystone::core::validateNatsSubjectToken(request->task_id(), "task_id"); + } catch (const std::invalid_argument& e) { + response->set_acknowledged(false); + response->set_message(std::string("Invalid task_id: ") + e.what()); + return grpc::Status::OK; + } + std::lock_guard lock(mutex_); const std::string& task_id = request->task_id(); @@ -205,39 +277,71 @@ grpc::Status HMASCoordinatorServiceImpl::CancelTask(grpc::ServerContext* context hmas::CancelResponse* response) { (void)context; // Unused - std::lock_guard lock(mutex_); - - const std::string& task_id = request->task_id(); - - auto it = active_tasks_.find(task_id); - if (it == active_tasks_.end()) { + // Validate task_id at the service boundary. + try { + keystone::core::validateNatsSubjectToken(request->task_id(), "task_id"); + } catch (const std::invalid_argument& e) { response->set_cancelled(false); - response->set_message("Task not found"); + response->set_message(std::string("Invalid task_id: ") + e.what()); response->set_current_phase(hmas::TASK_PHASE_UNSPECIFIED); return grpc::Status::OK; } - auto& state = it->second; + const std::string& task_id = request->task_id(); - // Check if task can be cancelled (not already in terminal state) - if (isTerminalPhase(state.phase)) { - response->set_cancelled(false); - response->set_message("Task already in terminal state"); - response->set_current_phase(state.phase); - return grpc::Status::OK; - } + // Use a nested scope so the mutex is released before we call into MessageBus, + // preventing a potential lock-order inversion with locks held inside the + // agent/scheduler layer. + std::string assigned_agent_id; + bool task_cancelled = false; + hmas::TaskPhase previous_phase = hmas::TASK_PHASE_UNSPECIFIED; + + { + std::lock_guard lock(mutex_); + + auto it = active_tasks_.find(task_id); + if (it == active_tasks_.end()) { + response->set_cancelled(false); + response->set_message("Task not found"); + response->set_current_phase(hmas::TASK_PHASE_UNSPECIFIED); + return grpc::Status::OK; + } - // Mark as cancelled - hmas::TaskPhase previous_phase = state.phase; - state.phase = hmas::TASK_PHASE_CANCELLED; - state.updated_at = std::chrono::system_clock::now(); + auto& state = it->second; - // TODO: Notify assigned agent to stop execution + // Check if task can be cancelled (not already in terminal state) + if (isTerminalPhase(state.phase)) { + response->set_cancelled(false); + response->set_message("Task already in terminal state"); + response->set_current_phase(state.phase); + return grpc::Status::OK; + } + + // Mark as cancelled + previous_phase = state.phase; + state.phase = hmas::TASK_PHASE_CANCELLED; + state.updated_at = std::chrono::system_clock::now(); + + // Capture data needed for the out-of-lock notification. + assigned_agent_id = state.assigned_agent_id; + task_cancelled = true; + } // mutex released here - response->set_cancelled(true); + response->set_cancelled(task_cancelled); response->set_message("Task cancelled successfully"); response->set_current_phase(previous_phase); + // Notify the assigned agent to stop execution cooperatively. + // This is done after releasing the mutex to avoid lock-order inversion. + if (message_bus_ && !assigned_agent_id.empty()) { + auto cancel_msg = core::KeystoneMessage::createCancellation( + "coordinator", // sender + assigned_agent_id, // receiver — the agent executing the task + task_id // task being cancelled + ); + message_bus_->routeMessage(cancel_msg); + } + return grpc::Status::OK; } @@ -246,6 +350,13 @@ grpc::Status HMASCoordinatorServiceImpl::GetTaskProgress(grpc::ServerContext* co hmas::TaskProgress* response) { (void)context; // Unused + // Validate task_id at the service boundary. + try { + keystone::core::validateNatsSubjectToken(request->task_id(), "task_id"); + } catch (const std::invalid_argument& e) { + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); + } + std::lock_guard lock(mutex_); const std::string& task_id = request->task_id();