Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions agent/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,20 +195,19 @@ impl ApiClient {
&self,
api_key: &str,
) -> Result<Vec<AssignedWorkload>> {
let url = format!("{}/api/workloads", self.gateway_url);
let url = format!("{}/api/agents/self/workloads", self.gateway_url);

let resp = self
.client
.get(&url)
.header("X-API-Key", api_key)
.header("Authorization", format!("Bearer {}", api_key))
.send()
.await
.context("Failed to fetch workloads")?;

if !resp.status().is_success() {
let status = resp.status();
anyhow::bail!("Failed to fetch workloads status={}", status);
anyhow::bail!("Failed to fetch workloads status={} {}", status, resp.text().await.unwrap_or_default());
}

let all: Vec<AssignedWorkload> = resp
Expand All @@ -218,32 +217,54 @@ impl ApiClient {

Ok(all
.into_iter()
.filter(|w| {
w.status == "scheduled"
&& w.container_id.is_none()
})
.filter(|w| w.status == "scheduled" && w.container_id.is_none())
.collect())
}

pub async fn fetch_bootstrap_token(&self) -> Result<String> {
let url = format!("{}/api/registry/internal/bootstrap-token", self.gateway_url);

let resp = self
.client
.get(&url)
.send()
.await
.context("Failed to fetch bootstrap token")?;

if !resp.status().is_success() {
let status = resp.status();
anyhow::bail!("Bootstrap token fetch failed status={}", status);
}

let body: serde_json::Value = resp
.json()
.await
.context("Failed to parse bootstrap token response")?;

body["token"]
.as_str()
.map(|s| s.to_string())
.context("Bootstrap token response missing 'token' field")
}

pub async fn fetch_assigned_volumes(
&self,
_agent_id: Uuid,
api_key: &str,
) -> Result<Vec<AssignedVolume>> {
let url = format!("{}/api/volumes", self.gateway_url);
let url = format!("{}/api/agents/self/volumes", self.gateway_url);

let resp = self
.client
.get(&url)
.header("X-API-Key", api_key)
.header("Authorization", format!("Bearer {}", api_key))
.send()
.await
.context("Failed to fetch volumes")?;

if !resp.status().is_success() {
let status = resp.status();
anyhow::bail!("Failed to fetch volumes status={}", status);
anyhow::bail!("Failed to fetch volumes status={} {}", status, resp.text().await.unwrap_or_default());
}

let all: Vec<AssignedVolume> = resp
Expand All @@ -253,10 +274,7 @@ impl ApiClient {

Ok(all
.into_iter()
.filter(|v| {
v.status == "in_use"
&& v.mapped_device.is_none()
})
.filter(|v| v.status == "in_use" && v.mapped_device.is_none())
.collect())
}
}
12 changes: 10 additions & 2 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,16 @@ async fn perform_registration(
heartbeat_interval_secs: u64,
agent_pki: &pki::AgentPki,
) -> Result<(uuid::Uuid, String)> {
let token = std::env::var("CSF_REGISTRATION_TOKEN")
.context("CSF_REGISTRATION_TOKEN is required for first-time registration")?;
let token = match std::env::var("CSF_REGISTRATION_TOKEN") {
Ok(t) => t,
Err(_) => {
info!("CSF_REGISTRATION_TOKEN not set, fetching bootstrap token from gateway");
client
.fetch_bootstrap_token()
.await
.context("Failed to fetch bootstrap token from gateway")?
}
};

let info = system::collect_info();

Expand Down
56 changes: 56 additions & 0 deletions control-plane/api-gateway/src/auth/agent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use crate::AppState;
use axum::{
async_trait,
extract::FromRequestParts,
http::{request::Parts, StatusCode},
};
use entity::entities::{agent_api_keys, agents};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use sha2::{Digest, Sha256};
use uuid::Uuid;

pub struct AgentApiKey {
pub agent_id: Uuid,
}

fn hash_key(key: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(key.as_bytes());
format!("{:x}", hasher.finalize())
}

#[async_trait]
impl FromRequestParts<AppState> for AgentApiKey {
type Rejection = StatusCode;

async fn from_request_parts(
parts: &mut Parts,
state: &AppState,
) -> Result<Self, Self::Rejection> {
let raw_key = parts
.headers
.get("X-API-Key")
.and_then(|v| v.to_str().ok())
.ok_or(StatusCode::UNAUTHORIZED)?;

let key_hash = hash_key(raw_key);

let key_record = agent_api_keys::Entity::find()
.filter(agent_api_keys::Column::KeyHash.eq(&key_hash))
.filter(agent_api_keys::Column::IsActive.eq(true))
.one(&state.db_conn)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::UNAUTHORIZED)?;

agents::Entity::find_by_id(key_record.agent_id)
.one(&state.db_conn)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::UNAUTHORIZED)?;

Ok(AgentApiKey {
agent_id: key_record.agent_id,
})
}
}
1 change: 0 additions & 1 deletion control-plane/api-gateway/src/auth/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ impl FromRequestParts<AppState> for AuthenticatedUser {
parts: &mut Parts,
state: &AppState,
) -> Result<Self, Self::Rejection> {
tracing::info!("Request headers: {:?}", parts.headers);
let token = parts
.headers
.get("Authorization")
Expand Down
1 change: 1 addition & 0 deletions control-plane/api-gateway/src/auth/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod agent;
pub mod crypto;
pub mod jwt;
pub mod middleware;
Expand Down
69 changes: 67 additions & 2 deletions control-plane/api-gateway/src/routes/agents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use axum::{
routing::{get, post},
Router,
};
use entity::entities::{agent_metrics, agents};
use entity::entities::{agent_metrics, agents, volumes, workloads};
use sea_orm::{
ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::auth::agent::AgentApiKey;
use crate::auth::rbac::CanViewAgents;
use crate::AppState;

Expand Down Expand Up @@ -248,6 +249,11 @@ pub async fn receive_metrics(
Ok(StatusCode::CREATED)
}

fn is_container_id(hostname: &str) -> bool {
let h = hostname.trim();
h.len() == 12 && h.chars().all(|c| c.is_ascii_hexdigit())
}

/// List all agents
pub async fn list_agents(
State(state): State<AppState>,
Expand All @@ -262,7 +268,11 @@ pub async fn list_agents(
StatusCode::INTERNAL_SERVER_ERROR
})?;

let response: Vec<AgentResponse> = agents.into_iter().map(Into::into).collect();
let response: Vec<AgentResponse> = agents
.into_iter()
.filter(|a| !is_container_id(&a.hostname))
.map(Into::into)
.collect();
Ok(Json(response))
}

Expand Down Expand Up @@ -304,14 +314,69 @@ pub async fn get_agent_metrics(
Ok(Json(metrics))
}

pub async fn get_agent_metrics_latest(
State(state): State<AppState>,
_perm: CanViewAgents,
axum::extract::Path(agent_id): axum::extract::Path<Uuid>,
) -> Result<impl IntoResponse, StatusCode> {
let metric = agent_metrics::Entity::find()
.filter(agent_metrics::Column::AgentId.eq(agent_id))
.order_by_desc(agent_metrics::Column::Timestamp)
.one(&state.db_conn)
.await
.map_err(|e| {
tracing::error!(error = %e, "failed to fetch latest agent metrics");
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;

Ok(Json(metric))
}

pub async fn get_self_workloads(
agent: AgentApiKey,
State(state): State<AppState>,
) -> Result<impl IntoResponse, StatusCode> {
let rows = workloads::Entity::find()
.filter(workloads::Column::AssignedAgentId.eq(agent.agent_id))
.all(&state.db_conn)
.await
.map_err(|e| {
tracing::error!(error = %e, "failed to fetch workloads for agent");
StatusCode::INTERNAL_SERVER_ERROR
})?;

Ok(Json(rows))
}

pub async fn get_self_volumes(
agent: AgentApiKey,
State(state): State<AppState>,
) -> Result<impl IntoResponse, StatusCode> {
let rows = volumes::Entity::find()
.filter(volumes::Column::AttachedToAgent.eq(agent.agent_id))
.all(&state.db_conn)
.await
.map_err(|e| {
tracing::error!(error = %e, "failed to fetch volumes for agent");
StatusCode::INTERNAL_SERVER_ERROR
})?;

Ok(Json(rows))
}

pub fn agents_routes() -> Router<AppState> {
Router::new()
// Public endpoints (for agents)
.route("/agents/register", post(register_agent))
.route("/agents/heartbeat", post(heartbeat))
.route("/agents/metrics", post(receive_metrics))
// Agent-authenticated endpoints (X-API-Key)
.route("/agents/self/workloads", get(get_self_workloads))
.route("/agents/self/volumes", get(get_self_volumes))
// Protected endpoints (for frontend)
.route("/agents", get(list_agents))
.route("/agents/:id", get(get_agent))
.route("/agents/:id/metrics", get(get_agent_metrics))
.route("/agents/:id/metrics/latest", get(get_agent_metrics_latest))
}
35 changes: 35 additions & 0 deletions control-plane/api-gateway/src/routes/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,43 @@ pub async fn registry_health(State(state): State<AppState>) -> impl IntoResponse
}
}

pub async fn internal_bootstrap_token(
State(state): State<AppState>,
axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
let ip = addr.ip();
let allowed = ip.is_loopback()
|| match ip {
std::net::IpAddr::V4(v4) => {
let octets = v4.octets();
octets[0] == 10
|| (octets[0] == 172 && octets[1] >= 16 && octets[1] <= 31)
|| (octets[0] == 192 && octets[1] == 168)
}
std::net::IpAddr::V6(_) => false,
};

if !allowed {
return Err((
StatusCode::FORBIDDEN,
Json(json!({"error": "internal endpoint not accessible from this network"})),
));
}

proxy_to_registry(
&state,
reqwest::Method::POST,
"/admin/bootstrap-tokens",
Some(json!({"description": "auto-issued for node bootstrap", "ttl_hours": 1, "max_uses": 1})),
None,
)
.await
}

pub fn registry_routes() -> Router<AppState> {
Router::new()
// Internal - node bootstrap (no auth, IP-restricted)
.route("/registry/internal/bootstrap-token", get(internal_bootstrap_token))
// Admin routes - Pre-Registration (NEW WORKFLOW)
.route(
"/registry/admin/agents/pre-register",
Expand Down
Loading
Loading