From 8ed2d60dc0a98474801ef8a1b3ad624917c2ca52 Mon Sep 17 00:00:00 2001 From: CodeMaster4711 Date: Sun, 8 Mar 2026 19:50:44 +0100 Subject: [PATCH 1/5] fix: nix os config version --- nixos-node/modules/server-configuration.nix | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nixos-node/modules/server-configuration.nix b/nixos-node/modules/server-configuration.nix index 9b9d1a0..1508dde 100644 --- a/nixos-node/modules/server-configuration.nix +++ b/nixos-node/modules/server-configuration.nix @@ -158,7 +158,7 @@ services: restart: unless-stopped api-gateway: - image: ghcr.io/cs-foundry/csf-ce-api-gateway:0.2.2-alpha.353 + image: ghcr.io/cs-foundry/csf-ce-api-gateway:0.2.2-alpha.361 container_name: csf-api-gateway environment: DATABASE_URL: postgres://csf:csfpassword@patroni:5432/csf_core @@ -186,7 +186,7 @@ services: start_period: 30s registry: - image: ghcr.io/cs-foundry/csf-ce-registry:0.2.2-alpha.353 + image: ghcr.io/cs-foundry/csf-ce-registry:0.2.2-alpha.361 container_name: csf-registry environment: DATABASE_URL: postgres://csf:csfpassword@patroni:5432/csf_core @@ -202,7 +202,7 @@ services: restart: unless-stopped scheduler: - image: ghcr.io/cs-foundry/csf-ce-scheduler:0.2.2-alpha.353 + image: ghcr.io/cs-foundry/csf-ce-scheduler:0.2.2-alpha.361 container_name: csf-scheduler environment: DATABASE_URL: postgres://csf:csfpassword@patroni:5432/csf_core @@ -217,7 +217,7 @@ services: restart: unless-stopped volume-manager: - image: ghcr.io/cs-foundry/csf-ce-volume-manager:0.2.2-alpha.353 + image: ghcr.io/cs-foundry/csf-ce-volume-manager:0.2.2-alpha.361 container_name: csf-volume-manager environment: DATABASE_URL: postgres://csf:csfpassword@patroni:5432/csf_core @@ -234,7 +234,7 @@ services: restart: unless-stopped failover-controller: - image: ghcr.io/cs-foundry/csf-ce-failover-controller:0.2.2-alpha.353 + image: ghcr.io/cs-foundry/csf-ce-failover-controller:0.2.2-alpha.361 container_name: csf-failover-controller environment: DATABASE_URL: postgres://csf:csfpassword@patroni:5432/csf_core @@ -250,7 +250,7 @@ services: restart: unless-stopped sdn-controller: - image: ghcr.io/cs-foundry/csf-ce-sdn-controller:0.2.2-alpha.353 + image: ghcr.io/cs-foundry/csf-ce-sdn-controller:0.2.2-alpha.361 container_name: csf-sdn-controller environment: DATABASE_URL: postgres://csf:csfpassword@patroni:5432/csf_core From 4ac4ce842d18dfa0d281d83b59628855d8de7207 Mon Sep 17 00:00:00 2001 From: CodeMaster4711 Date: Sun, 8 Mar 2026 20:43:59 +0100 Subject: [PATCH 2/5] fix: metrics error agent --- agent/src/client.rs | 20 ++----- control-plane/api-gateway/src/auth/agent.rs | 56 ++++++++++++++++++ .../api-gateway/src/auth/middleware.rs | 1 - control-plane/api-gateway/src/auth/mod.rs | 1 + .../api-gateway/src/routes/agents.rs | 58 ++++++++++++++++++- 5 files changed, 120 insertions(+), 16 deletions(-) create mode 100644 control-plane/api-gateway/src/auth/agent.rs diff --git a/agent/src/client.rs b/agent/src/client.rs index dbb9f3c..bf94ba4 100644 --- a/agent/src/client.rs +++ b/agent/src/client.rs @@ -195,20 +195,19 @@ impl ApiClient { &self, api_key: &str, ) -> Result> { - let url = format!("{}/api/workloads", self.gateway_url); + let url = format!("{}/api/agents/self/workloads", self.gateway_url); let resp = self .client .get(&url) .header("X-API-Key", api_key) - .header("Authorization", format!("Bearer {}", api_key)) .send() .await .context("Failed to fetch workloads")?; if !resp.status().is_success() { let status = resp.status(); - anyhow::bail!("Failed to fetch workloads status={}", status); + anyhow::bail!("Failed to fetch workloads status={} {}", status, resp.text().await.unwrap_or_default()); } let all: Vec = resp @@ -218,10 +217,7 @@ impl ApiClient { Ok(all .into_iter() - .filter(|w| { - w.status == "scheduled" - && w.container_id.is_none() - }) + .filter(|w| w.status == "scheduled" && w.container_id.is_none()) .collect()) } @@ -230,20 +226,19 @@ impl ApiClient { _agent_id: Uuid, api_key: &str, ) -> Result> { - let url = format!("{}/api/volumes", self.gateway_url); + let url = format!("{}/api/agents/self/volumes", self.gateway_url); let resp = self .client .get(&url) .header("X-API-Key", api_key) - .header("Authorization", format!("Bearer {}", api_key)) .send() .await .context("Failed to fetch volumes")?; if !resp.status().is_success() { let status = resp.status(); - anyhow::bail!("Failed to fetch volumes status={}", status); + anyhow::bail!("Failed to fetch volumes status={} {}", status, resp.text().await.unwrap_or_default()); } let all: Vec = resp @@ -253,10 +248,7 @@ impl ApiClient { Ok(all .into_iter() - .filter(|v| { - v.status == "in_use" - && v.mapped_device.is_none() - }) + .filter(|v| v.status == "in_use" && v.mapped_device.is_none()) .collect()) } } diff --git a/control-plane/api-gateway/src/auth/agent.rs b/control-plane/api-gateway/src/auth/agent.rs new file mode 100644 index 0000000..f49853c --- /dev/null +++ b/control-plane/api-gateway/src/auth/agent.rs @@ -0,0 +1,56 @@ +use crate::AppState; +use axum::{ + async_trait, + extract::FromRequestParts, + http::{request::Parts, StatusCode}, +}; +use entity::entities::{agent_api_keys, agents}; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; +use sha2::{Digest, Sha256}; +use uuid::Uuid; + +pub struct AgentApiKey { + pub agent_id: Uuid, +} + +fn hash_key(key: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(key.as_bytes()); + format!("{:x}", hasher.finalize()) +} + +#[async_trait] +impl FromRequestParts for AgentApiKey { + type Rejection = StatusCode; + + async fn from_request_parts( + parts: &mut Parts, + state: &AppState, + ) -> Result { + let raw_key = parts + .headers + .get("X-API-Key") + .and_then(|v| v.to_str().ok()) + .ok_or(StatusCode::UNAUTHORIZED)?; + + let key_hash = hash_key(raw_key); + + let key_record = agent_api_keys::Entity::find() + .filter(agent_api_keys::Column::KeyHash.eq(&key_hash)) + .filter(agent_api_keys::Column::IsActive.eq(true)) + .one(&state.db_conn) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::UNAUTHORIZED)?; + + agents::Entity::find_by_id(key_record.agent_id) + .one(&state.db_conn) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::UNAUTHORIZED)?; + + Ok(AgentApiKey { + agent_id: key_record.agent_id, + }) + } +} diff --git a/control-plane/api-gateway/src/auth/middleware.rs b/control-plane/api-gateway/src/auth/middleware.rs index 261f2de..c9ffed8 100644 --- a/control-plane/api-gateway/src/auth/middleware.rs +++ b/control-plane/api-gateway/src/auth/middleware.rs @@ -22,7 +22,6 @@ impl FromRequestParts for AuthenticatedUser { parts: &mut Parts, state: &AppState, ) -> Result { - tracing::info!("Request headers: {:?}", parts.headers); let token = parts .headers .get("Authorization") diff --git a/control-plane/api-gateway/src/auth/mod.rs b/control-plane/api-gateway/src/auth/mod.rs index 89bb67b..95cde4b 100644 --- a/control-plane/api-gateway/src/auth/mod.rs +++ b/control-plane/api-gateway/src/auth/mod.rs @@ -1,3 +1,4 @@ +pub mod agent; pub mod crypto; pub mod jwt; pub mod middleware; diff --git a/control-plane/api-gateway/src/routes/agents.rs b/control-plane/api-gateway/src/routes/agents.rs index 069c847..c46e046 100644 --- a/control-plane/api-gateway/src/routes/agents.rs +++ b/control-plane/api-gateway/src/routes/agents.rs @@ -5,13 +5,14 @@ use axum::{ routing::{get, post}, Router, }; -use entity::entities::{agent_metrics, agents}; +use entity::entities::{agent_metrics, agents, volumes, workloads}; use sea_orm::{ ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, }; use serde::{Deserialize, Serialize}; use uuid::Uuid; +use crate::auth::agent::AgentApiKey; use crate::auth::rbac::CanViewAgents; use crate::AppState; @@ -304,14 +305,69 @@ pub async fn get_agent_metrics( Ok(Json(metrics)) } +pub async fn get_agent_metrics_latest( + State(state): State, + _perm: CanViewAgents, + axum::extract::Path(agent_id): axum::extract::Path, +) -> Result { + let metric = agent_metrics::Entity::find() + .filter(agent_metrics::Column::AgentId.eq(agent_id)) + .order_by_desc(agent_metrics::Column::Timestamp) + .one(&state.db_conn) + .await + .map_err(|e| { + tracing::error!(error = %e, "failed to fetch latest agent metrics"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + Ok(Json(metric)) +} + +pub async fn get_self_workloads( + agent: AgentApiKey, + State(state): State, +) -> Result { + let rows = workloads::Entity::find() + .filter(workloads::Column::AssignedAgentId.eq(agent.agent_id)) + .all(&state.db_conn) + .await + .map_err(|e| { + tracing::error!(error = %e, "failed to fetch workloads for agent"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(Json(rows)) +} + +pub async fn get_self_volumes( + agent: AgentApiKey, + State(state): State, +) -> Result { + let rows = volumes::Entity::find() + .filter(volumes::Column::AttachedToAgent.eq(agent.agent_id)) + .all(&state.db_conn) + .await + .map_err(|e| { + tracing::error!(error = %e, "failed to fetch volumes for agent"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(Json(rows)) +} + pub fn agents_routes() -> Router { Router::new() // Public endpoints (for agents) .route("/agents/register", post(register_agent)) .route("/agents/heartbeat", post(heartbeat)) .route("/agents/metrics", post(receive_metrics)) + // Agent-authenticated endpoints (X-API-Key) + .route("/agents/self/workloads", get(get_self_workloads)) + .route("/agents/self/volumes", get(get_self_volumes)) // Protected endpoints (for frontend) .route("/agents", get(list_agents)) .route("/agents/:id", get(get_agent)) .route("/agents/:id/metrics", get(get_agent_metrics)) + .route("/agents/:id/metrics/latest", get(get_agent_metrics_latest)) } From 2259013cd36d995360cc79e35699132616174f48 Mon Sep 17 00:00:00 2001 From: CodeMaster4711 Date: Sun, 8 Mar 2026 20:55:29 +0100 Subject: [PATCH 3/5] fix: node deduplication and cluster telemetry --- .../api-gateway/src/routes/agents.rs | 11 +- .../api-gateway/src/routes/system.rs | 148 ++++++++++++++++-- 2 files changed, 147 insertions(+), 12 deletions(-) diff --git a/control-plane/api-gateway/src/routes/agents.rs b/control-plane/api-gateway/src/routes/agents.rs index c46e046..92e77e4 100644 --- a/control-plane/api-gateway/src/routes/agents.rs +++ b/control-plane/api-gateway/src/routes/agents.rs @@ -249,6 +249,11 @@ pub async fn receive_metrics( Ok(StatusCode::CREATED) } +fn is_container_id(hostname: &str) -> bool { + let h = hostname.trim(); + h.len() == 12 && h.chars().all(|c| c.is_ascii_hexdigit()) +} + /// List all agents pub async fn list_agents( State(state): State, @@ -263,7 +268,11 @@ pub async fn list_agents( StatusCode::INTERNAL_SERVER_ERROR })?; - let response: Vec = agents.into_iter().map(Into::into).collect(); + let response: Vec = agents + .into_iter() + .filter(|a| !is_container_id(&a.hostname)) + .map(Into::into) + .collect(); Ok(Json(response)) } diff --git a/control-plane/api-gateway/src/routes/system.rs b/control-plane/api-gateway/src/routes/system.rs index 1c97d44..760876c 100644 --- a/control-plane/api-gateway/src/routes/system.rs +++ b/control-plane/api-gateway/src/routes/system.rs @@ -1,4 +1,6 @@ -use axum::{extract::State, response::Json, routing::get, Router}; +use axum::{extract::State, http::StatusCode, response::Json, routing::get, Router}; +use entity::entities::{agent_metrics, agents}; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder}; use serde::{Deserialize, Serialize}; use crate::auth::middleware::AuthenticatedUser; @@ -22,16 +24,47 @@ pub struct SystemMetricsResponse { pub metrics: LocalSystemMetrics, } +#[derive(Debug, Serialize, Deserialize)] +pub struct NodeStats { + pub agent_id: String, + pub name: String, + pub hostname: String, + pub status: String, + pub cpu_usage_percent: Option, + pub memory_total_bytes: Option, + pub memory_used_bytes: Option, + pub disk_total_bytes: Option, + pub disk_used_bytes: Option, + pub uptime_seconds: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ClusterStats { + pub node_count: usize, + pub online_count: usize, + pub total_cpu_cores: i64, + pub avg_cpu_usage_percent: f32, + pub total_memory_bytes: i64, + pub used_memory_bytes: i64, + pub total_disk_bytes: i64, + pub used_disk_bytes: i64, + pub nodes: Vec, +} + +fn is_container_id(hostname: &str) -> bool { + let trimmed = hostname.trim(); + trimmed.len() == 12 + && trimmed.chars().all(|c| c.is_ascii_hexdigit()) +} + pub fn routes() -> Router { Router::new() .route("/system/health", get(health_check)) .route("/system/info", get(get_system_info)) .route("/system/metrics", get(get_system_metrics)) + .route("/system/stats", get(get_cluster_stats)) } -/// Health check endpoint -/// -/// Simple endpoint to check if the service is running async fn health_check() -> Json { Json(serde_json::json!({ "status": "healthy", @@ -39,9 +72,6 @@ async fn health_check() -> Json { })) } -/// Get local system information -/// -/// Returns static system information like hostname, OS, CPU details async fn get_system_info( _auth: AuthenticatedUser, State(_state): State, @@ -61,15 +91,111 @@ async fn get_system_info( }) } -/// Get current system metrics -/// -/// Returns real-time metrics like CPU usage, memory usage, disk usage, network stats async fn get_system_metrics( _auth: AuthenticatedUser, State(_state): State, ) -> Json { let collector = LocalSystemCollector::new(); let metrics = collector.collect(); - Json(SystemMetricsResponse { metrics }) } + +async fn get_cluster_stats( + _auth: AuthenticatedUser, + State(state): State, +) -> Result, StatusCode> { + let all_agents = agents::Entity::find() + .all(&state.db_conn) + .await + .map_err(|e| { + tracing::error!(error = %e, "failed to fetch agents for cluster stats"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let physical_agents: Vec<_> = all_agents + .into_iter() + .filter(|a| !is_container_id(&a.hostname)) + .collect(); + + let mut nodes = Vec::with_capacity(physical_agents.len()); + let mut total_cpu_cores: i64 = 0; + let mut cpu_usage_sum: f32 = 0.0; + let mut cpu_usage_count: usize = 0; + let mut total_memory: i64 = 0; + let mut used_memory: i64 = 0; + let mut total_disk: i64 = 0; + let mut used_disk: i64 = 0; + let mut online_count: usize = 0; + + for agent in &physical_agents { + if agent.status == "online" { + online_count += 1; + } + + let latest_metric = agent_metrics::Entity::find() + .filter(agent_metrics::Column::AgentId.eq(agent.id)) + .order_by_desc(agent_metrics::Column::Timestamp) + .one(&state.db_conn) + .await + .map_err(|e| { + tracing::error!(error = %e, agent_id = %agent.id, "failed to fetch agent metrics"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let (cpu_pct, mem_total, mem_used, disk_total, disk_used, uptime, cores) = + latest_metric.as_ref().map(|m| { + ( + m.cpu_usage_percent, + m.memory_total_bytes, + m.memory_used_bytes, + m.disk_total_bytes, + m.disk_used_bytes, + m.uptime_seconds, + m.cpu_cores, + ) + }).unwrap_or_default(); + + if let Some(c) = cores { + total_cpu_cores += c as i64; + } + if let Some(p) = cpu_pct { + cpu_usage_sum += p; + cpu_usage_count += 1; + } + total_memory += mem_total.unwrap_or(0); + used_memory += mem_used.unwrap_or(0); + total_disk += disk_total.unwrap_or(0); + used_disk += disk_used.unwrap_or(0); + + nodes.push(NodeStats { + agent_id: agent.id.to_string(), + name: agent.name.clone(), + hostname: agent.hostname.clone(), + status: agent.status.clone(), + cpu_usage_percent: cpu_pct, + memory_total_bytes: mem_total, + memory_used_bytes: mem_used, + disk_total_bytes: disk_total, + disk_used_bytes: disk_used, + uptime_seconds: uptime, + }); + } + + let avg_cpu = if cpu_usage_count > 0 { + cpu_usage_sum / cpu_usage_count as f32 + } else { + 0.0 + }; + + Ok(Json(ClusterStats { + node_count: physical_agents.len(), + online_count, + total_cpu_cores, + avg_cpu_usage_percent: avg_cpu, + total_memory_bytes: total_memory, + used_memory_bytes: used_memory, + total_disk_bytes: total_disk, + used_disk_bytes: used_disk, + nodes, + })) +} From a6a4fc9bcfceca80bb3a10dae50ff3cd7b389e5e Mon Sep 17 00:00:00 2001 From: CodeMaster4711 Date: Wed, 11 Mar 2026 19:40:37 +0100 Subject: [PATCH 4/5] fix: ssh error for dev --- nixos-node/modules/node-configuration.nix | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/nixos-node/modules/node-configuration.nix b/nixos-node/modules/node-configuration.nix index 021f9d3..408e0ec 100644 --- a/nixos-node/modules/node-configuration.nix +++ b/nixos-node/modules/node-configuration.nix @@ -20,7 +20,13 @@ users.users.root.hashedPassword = "!"; - services.openssh.enable = false; + services.openssh = { + enable = true; + settings = { + PermitRootLogin = "no"; + PasswordAuthentication = false; + }; + }; services.csf-daemon = { enable = true; From 1547ee838877589f000946f0278a70baff698f26 Mon Sep 17 00:00:00 2001 From: CodeMaster4711 Date: Wed, 11 Mar 2026 20:00:10 +0100 Subject: [PATCH 5/5] fix: dev bootstrap --- agent/src/client.rs | 26 ++++++++++++++ agent/src/main.rs | 12 +++++-- .../api-gateway/src/routes/registry.rs | 35 +++++++++++++++++++ nixos-node/modules/node-configuration.nix | 1 - 4 files changed, 71 insertions(+), 3 deletions(-) diff --git a/agent/src/client.rs b/agent/src/client.rs index bf94ba4..58722ef 100644 --- a/agent/src/client.rs +++ b/agent/src/client.rs @@ -221,6 +221,32 @@ impl ApiClient { .collect()) } + pub async fn fetch_bootstrap_token(&self) -> Result { + let url = format!("{}/api/registry/internal/bootstrap-token", self.gateway_url); + + let resp = self + .client + .get(&url) + .send() + .await + .context("Failed to fetch bootstrap token")?; + + if !resp.status().is_success() { + let status = resp.status(); + anyhow::bail!("Bootstrap token fetch failed status={}", status); + } + + let body: serde_json::Value = resp + .json() + .await + .context("Failed to parse bootstrap token response")?; + + body["token"] + .as_str() + .map(|s| s.to_string()) + .context("Bootstrap token response missing 'token' field") + } + pub async fn fetch_assigned_volumes( &self, _agent_id: Uuid, diff --git a/agent/src/main.rs b/agent/src/main.rs index afb9255..320afa6 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -103,8 +103,16 @@ async fn perform_registration( heartbeat_interval_secs: u64, agent_pki: &pki::AgentPki, ) -> Result<(uuid::Uuid, String)> { - let token = std::env::var("CSF_REGISTRATION_TOKEN") - .context("CSF_REGISTRATION_TOKEN is required for first-time registration")?; + let token = match std::env::var("CSF_REGISTRATION_TOKEN") { + Ok(t) => t, + Err(_) => { + info!("CSF_REGISTRATION_TOKEN not set, fetching bootstrap token from gateway"); + client + .fetch_bootstrap_token() + .await + .context("Failed to fetch bootstrap token from gateway")? + } + }; let info = system::collect_info(); diff --git a/control-plane/api-gateway/src/routes/registry.rs b/control-plane/api-gateway/src/routes/registry.rs index 4199626..ae687d7 100644 --- a/control-plane/api-gateway/src/routes/registry.rs +++ b/control-plane/api-gateway/src/routes/registry.rs @@ -838,8 +838,43 @@ pub async fn registry_health(State(state): State) -> impl IntoResponse } } +pub async fn internal_bootstrap_token( + State(state): State, + axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo, +) -> Result)> { + let ip = addr.ip(); + let allowed = ip.is_loopback() + || match ip { + std::net::IpAddr::V4(v4) => { + let octets = v4.octets(); + octets[0] == 10 + || (octets[0] == 172 && octets[1] >= 16 && octets[1] <= 31) + || (octets[0] == 192 && octets[1] == 168) + } + std::net::IpAddr::V6(_) => false, + }; + + if !allowed { + return Err(( + StatusCode::FORBIDDEN, + Json(json!({"error": "internal endpoint not accessible from this network"})), + )); + } + + proxy_to_registry( + &state, + reqwest::Method::POST, + "/admin/bootstrap-tokens", + Some(json!({"description": "auto-issued for node bootstrap", "ttl_hours": 1, "max_uses": 1})), + None, + ) + .await +} + pub fn registry_routes() -> Router { Router::new() + // Internal - node bootstrap (no auth, IP-restricted) + .route("/registry/internal/bootstrap-token", get(internal_bootstrap_token)) // Admin routes - Pre-Registration (NEW WORKFLOW) .route( "/registry/admin/agents/pre-register", diff --git a/nixos-node/modules/node-configuration.nix b/nixos-node/modules/node-configuration.nix index 408e0ec..41b2f18 100644 --- a/nixos-node/modules/node-configuration.nix +++ b/nixos-node/modules/node-configuration.nix @@ -32,7 +32,6 @@ enable = true; package = csf.agentPackage; apiGateway = "http://gateway.csf.local:8000"; - registrationToken = "csf-bootstrap.change_me"; heartbeatInterval = 60; logLevel = "info"; };