diff --git a/.github/workflows/vm-tests.yaml b/.github/workflows/vm-tests.yaml index 60fa326..042bbba 100644 --- a/.github/workflows/vm-tests.yaml +++ b/.github/workflows/vm-tests.yaml @@ -41,6 +41,8 @@ jobs: include: - name: P2P Discovery Test check: p2p-discovery + - name: Mesh Trust Test + check: mesh-trust - name: E2E Test check: e2e steps: diff --git a/Cargo.lock b/Cargo.lock index 47150ca..aefbed1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2148,10 +2148,12 @@ dependencies = [ name = "ncro-mesh" version = "2.2.2" dependencies = [ + "base64", "chrono", "ed25519-dalek", "hex", "ncro-db", + "ncro-narinfo", "rand 0.10.1", "rmp-serde", "serde", @@ -2182,8 +2184,10 @@ dependencies = [ name = "ncro-router" version = "2.2.2" dependencies = [ + "base64", "chrono", "dashmap", + "ed25519-dalek", "futures-util", "moka", "ncro-config", diff --git a/README.md b/README.md index 4816451..61650d0 100644 --- a/README.md +++ b/README.md @@ -115,8 +115,12 @@ further in the [architechture document]. - `GET /nar/.nar`: streamed NAR content from the chosen upstream - `GET /metrics`: Prometheus metrics - `GET /health`: JSON health summary of configured upstreams +- `GET /trust/.narinfo`: trust decision and matching claim count +- `GET /provenance/.narinfo`: stored signer claims for a narinfo hash -### Routing Notes +### Notes + +#### Routing - Route cache decisions are stored in SQLite and reused until their TTL expires (or they are evicted by the LRU policy when `max_entries` is reached). @@ -139,6 +143,24 @@ further in the [architechture document]. part of health probing, discovery, priority routing, filters, cooldown, or route persistence. +#### Trust + +[trust documentation]: ./docs/trust.md + +`trust.mode = "signed"` accepts only narinfos that verify against the selected +upstream's configured `public_key`. `trust.mode = "quorum"` records signed +claims and accepts a route only after enough distinct signer keys agree on the +same `StorePath`, `NarHash`, `NarSize`, and `References`. With +`mesh.gossip_trust_claims`, peers relay re-verified claims so a quorum can form +across a mesh where each node sees only one upstream. + +> [!NOTE] +> This is output consensus, not full source attestation, and the signature +> covers only the signed fingerprint; not the streamed NAR bytes. See +> [trust documentation] for the complete model: what is and is not verified, how +> a quorum is counted, the `fail_closed` open/closed behavior, and how mesh +> claim relay stays trustworthy. + ## Quick Start ```bash @@ -214,6 +236,13 @@ per_upstream_max_inflight = 8 # per-upstream narinfo head concurrency in_memory_negative_ttl = "5s" # short-lived miss suppression upstream_cooldown = "15s" # cooldown on transient upstream network errors +[trust] +mode = "off" # off | signed | quorum +threshold = 2 # signer agreement needed in quorum mode +require_distinct_signers = true # count signer keys, not upstream URLs +fail_closed = true # reject untrusted candidates when enabled +# claim_ttl = "30d" # optional: ignore claims older than this in quorum + [logging] level = "info" # debug | info | warn | error format = "json" # json | text @@ -232,6 +261,7 @@ bind_addr = "0.0.0.0:7946" peers = [] # list of {addr, public_key} peer entries private_key = "" # path to ed25519 key file; empty = ephemeral gossip_interval = "30s" +gossip_trust_claims = false # also gossip + re-verify trust claims across peers ``` ### Environment Overrides @@ -469,6 +499,15 @@ Each peer entry takes an address and an optional ed25519 public key. When a public key is provided, incoming gossip packets are verified against it; packets from unlisted senders or with invalid signatures are silently dropped. +> [!TIP] +> Setting `mesh.gossip_trust_claims = true` additionally gossips the trust +> claims this node has verified locally and accepts claims relayed by peers, +> letting a `quorum` policy form across the mesh. A relayed claim only counts if +> its signer key is trusted (a configured upstream key or one listed in +> `trust.trusted_keys`) **and** its narinfo re-verifies against that key, so a +> peer can only relay real signatures from trusted signers, never fabricate a +> quorum with throwaway keys. + If `mesh.private_key` is left empty, ncro generates an ephemeral identity on startup. That is fine for testing, but persistent gossip requires a stable key so peers can recognize the node across restarts. @@ -501,15 +540,16 @@ Prometheus metrics are available at `/metrics`. -| Metric | Type | Description | -| ----------------------------------------- | --------- | ---------------------------------------- | -| `ncro_narinfo_cache_hits_total` | counter | Narinfo requests served from route cache | -| `ncro_narinfo_cache_misses_total` | counter | Narinfo requests requiring upstream race | -| `ncro_narinfo_requests_total{status}` | counter | Narinfo requests by status (200/error) | -| `ncro_nar_requests_total` | counter | NAR streaming requests | -| `ncro_upstream_race_wins_total{upstream}` | counter | Race wins per upstream | -| `ncro_upstream_latency_seconds{upstream}` | histogram | Race latency per upstream | -| `ncro_route_entries` | gauge | Current route entries in SQLite | +| Metric | Type | Description | +| ----------------------------------------- | --------- | ------------------------------------------------------------- | +| `ncro_narinfo_cache_hits_total` | counter | Narinfo requests served from route cache | +| `ncro_narinfo_cache_misses_total` | counter | Narinfo requests requiring upstream race | +| `ncro_narinfo_requests_total{status}` | counter | Narinfo requests by status (200/error) | +| `ncro_nar_requests_total` | counter | NAR streaming requests | +| `ncro_upstream_race_wins_total{upstream}` | counter | Race wins per upstream | +| `ncro_upstream_latency_seconds{upstream}` | histogram | Race latency per upstream | +| `ncro_route_entries` | gauge | Current route entries in SQLite | +| `ncro_trust_bypass_total{reason}` | counter | Content served despite failed trust check (fail_closed=false) | diff --git a/config.example.toml b/config.example.toml index 74f4a9d..d42bbcb 100644 --- a/config.example.toml +++ b/config.example.toml @@ -51,6 +51,27 @@ max_concurrent_races = 64 per_upstream_max_inflight = 8 upstream_cooldown = "15s" +[trust] +# off | signed | quorum. In signed mode, accepted narinfos must verify against +# the selected upstream's public_key. In quorum mode, at least threshold +# distinct configured signer keys must agree on StorePath, NarHash, NarSize, +# and References before ncro stores a route. See docs/trust.md for the full +# model. +fail_closed = true +mode = "off" +require_distinct_signers = true +threshold = 2 + +# Optional maximum age for a claim to count toward a quorum. Unset = never +# expires. Prevents a long-dead signer from propping up a quorum forever. +# claim_ttl = "30d" +# Signer keys trusted to vouch for content in quorum mode, in addition to the +# public_key of every configured upstream. A quorum counts only claims signed +# by a key in this set; without it an attacker could self-sign forged content +# under throwaway keys and forge agreement. In a mesh, list the other nodes' +# upstream signer keys here so their relayed claims count. +# trusted_keys = ["cache-b-1:base64key=", "cache-c-1:base64key="] + [discovery] discovery_time = "5s" domain = "local" @@ -65,6 +86,12 @@ gossip_interval = "30s" peers = [ ] private_key = "/etc/ncro/node.key" +# When true, this node also gossips locally-verified trust claims to peers and +# accepts claims relayed by peers (each re-verified against its original Nix +# signer key before being trusted). Lets a quorum form across a mesh where each +# node sees only one upstream. Only meaningful with trust.mode = "quorum". +gossip_trust_claims = false + [logging] format = "json" level = "info" diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 376b132..445c4bb 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -294,6 +294,81 @@ url = "s3://fallback-cache?endpoint=s3.example.com" } Ok(()) } + + #[test] + fn parses_trust_config() -> Result<(), ConfigError> { + let cfg: Config = toml::from_str( + "[trust]\nmode = \"quorum\"\nthreshold = 2\nrequire_distinct_signers = \ + true\nfail_closed = true\n", + )?; + assert_eq!(cfg.trust.mode, TrustMode::Quorum); + assert_eq!(cfg.trust.threshold, 2); + cfg.validate()?; + Ok(()) + } + + #[test] + fn rejects_zero_trust_threshold_when_enabled() -> Result<(), toml::de::Error> + { + let cfg: Config = + toml::from_str("[trust]\nmode = \"signed\"\nthreshold = 0\n")?; + let result = cfg.validate(); + assert!(result.is_err(), "expected validation failure"); + Ok(()) + } + + #[test] + fn parses_trust_hardening_fields() -> Result<(), ConfigError> { + let cfg: Config = toml::from_str( + "[trust]\nmode = \"quorum\"\nthreshold = 2\nclaim_ttl = \"90s\"\n", + )?; + assert_eq!( + cfg.trust.claim_ttl.as_ref().map(|d| d.0), + Some(std::time::Duration::from_secs(90)) + ); + cfg.validate()?; + Ok(()) + } + + #[test] + fn trust_hardening_fields_default_off() { + let cfg = Config::default(); + assert!(cfg.trust.claim_ttl.is_none()); + assert!(!cfg.mesh.gossip_trust_claims); + } + + #[test] + fn parses_mesh_gossip_trust_claims() -> Result<(), toml::de::Error> { + let cfg: Config = toml::from_str( + "[mesh]\nenabled = true\ngossip_trust_claims = \ + true\n\n[[mesh.peers]]\naddr = \"10.0.0.2:7946\"\n", + )?; + assert!(cfg.mesh.gossip_trust_claims); + Ok(()) + } + + #[test] + fn parses_trusted_keys() -> Result<(), ConfigError> { + let cfg: Config = toml::from_str( + "[trust]\nmode = \"quorum\"\ntrusted_keys = [\"cache-b-1:YWJj\", \ + \"cache-c-1:ZGVm\"]\n", + )?; + assert_eq!(cfg.trust.trusted_keys.len(), 2); + cfg.validate()?; + Ok(()) + } + + #[test] + fn rejects_malformed_trusted_key() -> Result<(), toml::de::Error> { + let cfg: Config = toml::from_str( + "[trust]\nmode = \"quorum\"\ntrusted_keys = [\"missing-colon\"]\n", + )?; + assert!( + cfg.validate().is_err(), + "a trusted key without a ':' separator must be rejected" + ); + Ok(()) + } } #[derive(Debug, Clone, PartialEq, Eq)] @@ -314,6 +389,15 @@ impl<'de> Deserialize<'de> for HumanDuration { } } +impl serde::Serialize for HumanDuration { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + humantime_serde::serialize(&self.0, serializer) + } +} + #[derive(Debug, Clone, Copy, Default, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum FilterAction { @@ -484,22 +568,29 @@ pub struct PeerConfig { #[derive(Debug, Clone, Deserialize)] #[serde(default)] pub struct MeshConfig { - pub enabled: bool, - pub bind_addr: String, - pub peers: Vec, + pub enabled: bool, + pub bind_addr: String, + pub peers: Vec, #[serde(rename = "private_key")] - pub private_key_path: String, - pub gossip_interval: HumanDuration, + pub private_key_path: String, + pub gossip_interval: HumanDuration, + /// When enabled, this node also gossips the trust claims it has verified + /// locally to its peers, and accepts claims relayed by peers (after + /// re-verifying each claim's embedded narinfo signature). This lets a + /// quorum be satisfied across a mesh of nodes that each see only one + /// upstream. Off by default; only meaningful when `trust.mode = "quorum"`. + pub gossip_trust_claims: bool, } impl Default for MeshConfig { fn default() -> Self { Self { - enabled: false, - bind_addr: "0.0.0.0:7946".to_string(), - peers: Vec::new(), - private_key_path: String::new(), - gossip_interval: HumanDuration(Duration::from_secs(30)), + enabled: false, + bind_addr: "0.0.0.0:7946".to_string(), + peers: Vec::new(), + private_key_path: String::new(), + gossip_interval: HumanDuration(Duration::from_secs(30)), + gossip_trust_claims: false, } } } @@ -560,6 +651,63 @@ impl Default for LoggingConfig { } } +#[derive( + Debug, + Clone, + Copy, + Default, + PartialEq, + Eq, + serde::Serialize, + serde::Deserialize, +)] +#[serde(rename_all = "lowercase")] +pub enum TrustMode { + #[default] + Off, + Signed, + Quorum, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(default)] +pub struct TrustConfig { + pub mode: TrustMode, + pub threshold: u32, + pub require_distinct_signers: bool, + pub fail_closed: bool, + /// Optional maximum age for a trust claim to count toward a quorum. Claims + /// whose `last_seen` is older than this are ignored when evaluating the + /// policy, preventing a long-dead signer from propping up a quorum forever. + /// Unset (the default) means claims never expire. + pub claim_ttl: Option, + /// Nix public keys (`name:base64(key)`) that are trusted to vouch for + /// content in `quorum` mode, *in addition to* the `public_key` of every + /// configured upstream. A quorum counts only claims signed by a key in + /// this trusted set, and mesh-relayed claims signed by any other key are + /// dropped. + /// + /// This is the difference between a real quorum and security theatre: + /// without it, anyone could generate throwaway keypairs, self-sign forged + /// content under each, relay the claims over the mesh, and manufacture a + /// quorum. In a multi-node mesh, list here the signer keys of the *other* + /// nodes' trusted upstreams so their relayed claims count. + pub trusted_keys: Vec, +} + +impl Default for TrustConfig { + fn default() -> Self { + Self { + mode: TrustMode::Off, + threshold: 2, + require_distinct_signers: true, + fail_closed: true, + claim_ttl: None, + trusted_keys: Vec::new(), + } + } +} + #[derive(Debug, Clone, Deserialize)] #[serde(default)] pub struct Config { @@ -570,6 +718,7 @@ pub struct Config { pub mesh: MeshConfig, pub discovery: DiscoveryConfig, pub logging: LoggingConfig, + pub trust: TrustConfig, } impl Default for Config { @@ -587,6 +736,7 @@ impl Default for Config { mesh: MeshConfig::default(), discovery: DiscoveryConfig::default(), logging: LoggingConfig::default(), + trust: TrustConfig::default(), } } } @@ -706,6 +856,18 @@ impl Config { "cache.mass_query.upstream_cooldown must be positive".to_string(), )); } + if self.trust.mode != TrustMode::Off && self.trust.threshold == 0 { + return Err(ConfigError::Validation( + "trust.threshold must be >= 1 when trust is enabled".to_string(), + )); + } + for (i, key) in self.trust.trusted_keys.iter().enumerate() { + if !key.contains(':') { + return Err(ConfigError::Validation(format!( + "trust.trusted_keys[{i}] must be in 'name:base64(key)' Nix format" + ))); + } + } if self.mesh.enabled && self.mesh.peers.is_empty() { return Err(ConfigError::Validation( "mesh.enabled is true but no peers configured".to_string(), diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index 16a022c..9503676 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -41,6 +41,25 @@ pub struct RouteEntry { pub narinfo_bytes: Option>, } +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct TrustClaim { + pub narinfo_hash: String, + pub store_path: String, + pub upstream_url: String, + pub signer_name: String, + pub signer_key: String, + pub nar_hash: String, + pub nar_size: u64, + pub references: String, + pub deriver: String, + pub ca: String, + pub file_hash: String, + pub file_size: u64, + pub first_seen: DateTime, + pub last_seen: DateTime, + pub narinfo: Vec, +} + impl RouteEntry { #[must_use] pub fn is_valid(&self) -> bool { @@ -271,6 +290,96 @@ impl Db { Ok(()) } + /// # Errors + /// + /// Returns [`DbError`] on `SQLite` write failure. + pub async fn set_trust_claim( + &self, + claim: &TrustClaim, + ) -> Result<(), DbError> { + sqlx::query( + r"INSERT INTO trust_claims + (narinfo_hash, store_path, upstream_url, signer_name, signer_key, + nar_hash, nar_size, references_key, deriver, ca, file_hash, file_size, + first_seen, last_seen, narinfo) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(narinfo_hash, signer_key, nar_hash, nar_size, references_key) DO UPDATE SET + store_path = excluded.store_path, + upstream_url = excluded.upstream_url, + signer_name = excluded.signer_name, + nar_hash = excluded.nar_hash, + nar_size = excluded.nar_size, + references_key = excluded.references_key, + deriver = excluded.deriver, + ca = excluded.ca, + file_hash = excluded.file_hash, + file_size = excluded.file_size, + last_seen = excluded.last_seen, + narinfo = excluded.narinfo", + ) + .bind(&claim.narinfo_hash) + .bind(&claim.store_path) + .bind(&claim.upstream_url) + .bind(&claim.signer_name) + .bind(&claim.signer_key) + .bind(&claim.nar_hash) + .bind(i64::try_from(claim.nar_size).unwrap_or(i64::MAX)) + .bind(&claim.references) + .bind(&claim.deriver) + .bind(&claim.ca) + .bind(&claim.file_hash) + .bind(i64::try_from(claim.file_size).unwrap_or(i64::MAX)) + .bind(claim.first_seen.timestamp()) + .bind(claim.last_seen.timestamp()) + .bind(&claim.narinfo) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// # Errors + /// + /// Returns [`DbError`] on `SQLite` query failure or invalid stored data. + pub async fn trust_claims( + &self, + narinfo_hash: &str, + ) -> Result, DbError> { + let rows = sqlx::query( + r"SELECT narinfo_hash, store_path, upstream_url, signer_name, signer_key, + nar_hash, nar_size, references_key, deriver, ca, file_hash, file_size, + first_seen, last_seen, narinfo + FROM trust_claims WHERE narinfo_hash = ? + ORDER BY last_seen DESC", + ) + .bind(narinfo_hash) + .fetch_all(&self.pool) + .await?; + rows.iter().map(row_to_trust_claim).collect() + } + + /// Return the `n` most recently seen trust claims, newest first. + /// + /// Used by the mesh gossip loop to relay locally-verified claims to peers. + /// + /// # Errors + /// + /// Returns [`DbError`] on `SQLite` query failure or invalid stored data. + pub async fn list_recent_trust_claims( + &self, + n: i64, + ) -> Result, DbError> { + let rows = sqlx::query( + r"SELECT narinfo_hash, store_path, upstream_url, signer_name, signer_key, + nar_hash, nar_size, references_key, deriver, ca, file_hash, file_size, + first_seen, last_seen, narinfo + FROM trust_claims ORDER BY last_seen DESC LIMIT ?", + ) + .bind(n) + .fetch_all(&self.pool) + .await?; + rows.iter().map(row_to_trust_claim).collect() + } + /// # Errors /// /// Returns [`DbError`] on `SQLite` write failure. @@ -419,6 +528,34 @@ async fn migrate(pool: &SqlitePool) -> Result<(), DbError> { .execute(pool) .await?; add_column_if_missing(pool, "routes", "narinfo_bytes", "BLOB").await?; + sqlx::query( + r"CREATE TABLE IF NOT EXISTS trust_claims ( + narinfo_hash TEXT NOT NULL, + store_path TEXT NOT NULL, + upstream_url TEXT NOT NULL, + signer_name TEXT NOT NULL, + signer_key TEXT NOT NULL, + nar_hash TEXT NOT NULL, + nar_size INTEGER NOT NULL, + references_key TEXT NOT NULL, + deriver TEXT NOT NULL, + ca TEXT NOT NULL, + file_hash TEXT NOT NULL, + file_size INTEGER NOT NULL, + first_seen INTEGER NOT NULL, + last_seen INTEGER NOT NULL, + narinfo BLOB NOT NULL, + PRIMARY KEY (narinfo_hash, signer_key, nar_hash, nar_size, references_key) + )", + ) + .execute(pool) + .await?; + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_trust_claims_output ON \ + trust_claims(narinfo_hash, nar_hash, nar_size, references_key)", + ) + .execute(pool) + .await?; Ok(()) } @@ -450,6 +587,34 @@ fn row_to_route(row: &sqlx::sqlite::SqliteRow) -> Result { }) } +fn row_to_trust_claim( + row: &sqlx::sqlite::SqliteRow, +) -> Result { + let nar_size = row.get::("nar_size"); + let file_size = row.get::("file_size"); + Ok(TrustClaim { + narinfo_hash: row.get("narinfo_hash"), + store_path: row.get("store_path"), + upstream_url: row.get("upstream_url"), + signer_name: row.get("signer_name"), + signer_key: row.get("signer_key"), + nar_hash: row.get("nar_hash"), + nar_size: u64::try_from(nar_size).map_err(|_| { + DbError::InvalidData(format!("nar_size out of range: {nar_size}")) + })?, + references: row.get("references_key"), + deriver: row.get("deriver"), + ca: row.get("ca"), + file_hash: row.get("file_hash"), + file_size: u64::try_from(file_size).map_err(|_| { + DbError::InvalidData(format!("file_size out of range: {file_size}")) + })?, + first_seen: timestamp(row.get("first_seen"), "first_seen")?, + last_seen: timestamp(row.get("last_seen"), "last_seen")?, + narinfo: row.get("narinfo"), + }) +} + fn timestamp( value: i64, field: &'static str, @@ -521,6 +686,40 @@ mod tests { Ok(()) } + #[tokio::test] + async fn trust_claim_roundtrip() -> Result<(), DbError> { + let db = Db::open(":memory:", 100).await?; + let now = Utc::now(); + let claim = TrustClaim { + narinfo_hash: "abc123".into(), + store_path: "/nix/store/abc123-hello".into(), + upstream_url: "https://cache.example".into(), + signer_name: "cache.example-1".into(), + signer_key: "cache.example-1:pubkey".into(), + nar_hash: "sha256:abc".into(), + nar_size: 42, + references: "dep-one dep-two".into(), + deriver: "abc123-hello.drv".into(), + ca: String::new(), + file_hash: "sha256:file".into(), + file_size: 7, + first_seen: now, + last_seen: now, + narinfo: b"StorePath: /nix/store/abc123-hello\n".to_vec(), + }; + db.set_trust_claim(&claim).await?; + let claims = db.trust_claims("abc123").await?; + assert_eq!(claims.len(), 1); + assert_eq!(claims[0].signer_name, claim.signer_name); + assert_eq!(claims[0].nar_hash, claim.nar_hash); + + let recent = db.list_recent_trust_claims(10).await?; + assert_eq!(recent.len(), 1); + assert_eq!(recent[0].narinfo_hash, claim.narinfo_hash); + assert_eq!(recent[0].narinfo, claim.narinfo); + Ok(()) + } + #[tokio::test] async fn concurrent_reads_do_not_deadlock() -> Result<(), Box> { diff --git a/crates/mesh/Cargo.toml b/crates/mesh/Cargo.toml index 3edbe3c..db40894 100644 --- a/crates/mesh/Cargo.toml +++ b/crates/mesh/Cargo.toml @@ -8,16 +8,20 @@ homepage.workspace = true repository.workspace = true [dependencies] -chrono.workspace = true -ed25519-dalek = { workspace = true, features = [ "rand_core" ] } -hex.workspace = true -ncro-db.workspace = true -rand.workspace = true -rmp-serde.workspace = true -serde = { workspace = true, features = [ "derive" ] } -thiserror.workspace = true -tokio = { workspace = true, features = [ "fs", "macros", "net", "rt", "sync", "time" ] } -tracing.workspace = true +chrono.workspace = true +ed25519-dalek = { workspace = true, features = [ "rand_core" ] } +hex.workspace = true +ncro-db.workspace = true +ncro-narinfo.workspace = true +rand.workspace = true +rmp-serde.workspace = true +serde = { workspace = true, features = [ "derive" ] } +thiserror.workspace = true +tokio = { workspace = true, features = [ "fs", "macros", "net", "rt", "sync", "time" ] } +tracing.workspace = true + +[dev-dependencies] +base64.workspace = true [lints] workspace = true diff --git a/crates/mesh/src/lib.rs b/crates/mesh/src/lib.rs index fca0fb8..1455c99 100644 --- a/crates/mesh/src/lib.rs +++ b/crates/mesh/src/lib.rs @@ -2,7 +2,8 @@ use std::{path::Path, sync::Arc}; use chrono::Utc; use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey}; -use ncro_db::{Db, RouteEntry}; +use ncro_db::{Db, RouteEntry, TrustClaim}; +use ncro_narinfo::NarInfo; use rand::RngExt; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -33,6 +34,7 @@ pub enum MeshError { #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub enum MsgType { Announce = 1, + Claims = 2, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -40,7 +42,12 @@ pub struct Message { pub r#type: MsgType, pub node_id: String, pub timestamp: i64, + /// Route gossip carried by an `Announce` message (empty for `Claims`). + #[serde(default)] pub routes: Vec, + /// Trust claims carried by a `Claims` message (empty for `Announce`). + #[serde(default)] + pub claims: Vec, } #[derive(Clone)] @@ -127,13 +134,19 @@ pub fn verify(pubkey: &[u8], body: &[u8], sig: &[u8]) -> Result<(), MeshError> { /// # Errors /// /// Returns [`MeshError`] if the UDP socket cannot be bound to `addr`. +/// `trusted_keys` is the set of Nix signer public keys (`name:base64(key)`) +/// whose relayed trust claims may be accepted; claims signed by any other key +/// are dropped (see [`merge_claims`]). pub async fn listen_and_serve( addr: &str, db: Db, allowed_keys: Vec<[u8; 32]>, + trusted_keys: Vec, stop: tokio::sync::watch::Receiver, ) -> Result<(), MeshError> { let socket = UdpSocket::bind(addr).await?; + let trusted: std::collections::HashSet = + trusted_keys.into_iter().collect(); tokio::spawn(async move { let mut stop = stop; let mut buf = vec![0; MAX_PACKET_SIZE]; @@ -152,8 +165,14 @@ pub async fn listen_and_serve( tracing::warn!(?src, error = %err, "mesh: signature verification failed"); continue; } - if msg.r#type == MsgType::Announce && !msg.routes.is_empty() { - merge_routes(&db, msg.routes).await; + match msg.r#type { + MsgType::Announce if !msg.routes.is_empty() => { + merge_routes(&db, msg.routes).await; + } + MsgType::Claims if !msg.claims.is_empty() => { + merge_claims(&db, msg.claims, &trusted).await; + } + _ => {} } } Err(err) => tracing::warn!(?src, error = %err, "mesh: malformed packet"), @@ -188,6 +207,52 @@ async fn merge_routes(db: &Db, incoming: Vec) { } } +/// Merge trust claims relayed by a peer into the local store. +/// +/// A relayed claim is accepted only if **both** hold: +/// +/// 1. Its `signer_key` is in `trusted`, the set of Nix keys this node is +/// configured to trust. Without this gate, distinct-signer quorum is +/// security theatre: an attacker could generate any number of throwaway +/// keypairs, self-sign forged content under each, and relay the claims to +/// fabricate agreement. Dropping untrusted keys here also bounds how many +/// claims a hostile peer can write to the database. +/// 2. The embedded narinfo actually verifies against that `signer_key`. The +/// peer's packet signature only proves *who relayed* the claim, not that the +/// content was signed; re-verifying means a peer cannot forge a claim for a +/// trusted key it does not hold. +async fn merge_claims( + db: &Db, + incoming: Vec, + trusted: &std::collections::HashSet, +) { + for claim in incoming { + if !trusted.contains(&claim.signer_key) { + tracing::warn!( + store = claim.store_path, + signer = claim.signer_name, + "mesh: rejecting relayed trust claim from untrusted signer key" + ); + continue; + } + let verified = NarInfo::parse(claim.narinfo.as_slice()) + .ok() + .and_then(|parsed| parsed.verify(&claim.signer_key).ok()) + .unwrap_or(false); + if !verified { + tracing::warn!( + store = claim.store_path, + signer = claim.signer_name, + "mesh: rejecting relayed trust claim with invalid signature" + ); + continue; + } + if let Err(err) = db.set_trust_claim(&claim).await { + tracing::warn!(error = %err, store = claim.store_path, "mesh: trust claim merge failed"); + } + } +} + /// # Errors /// /// Returns [`MeshError`] if the message cannot be signed, the socket cannot @@ -202,6 +267,7 @@ pub async fn announce( node_id: node.id(), timestamp: Utc::now().timestamp_nanos_opt().unwrap_or_default(), routes, + claims: Vec::new(), }; let packet = encode_packet(node, &msg)?; let socket = UdpSocket::bind("0.0.0.0:0").await?; @@ -209,11 +275,73 @@ pub async fn announce( Ok(()) } +fn encode_claims( + node: &Node, + claims: &[TrustClaim], +) -> Result, MeshError> { + let msg = Message { + r#type: MsgType::Claims, + node_id: node.id(), + timestamp: Utc::now().timestamp_nanos_opt().unwrap_or_default(), + routes: Vec::new(), + claims: claims.to_vec(), + }; + encode_packet(node, &msg) +} + +/// Relay trust claims to a peer, batching them so no UDP packet exceeds +/// [`MAX_PACKET_SIZE`]. A single claim that cannot fit on its own (its raw +/// narinfo is too large) is logged and skipped rather than silently dropped. +/// +/// # Errors +/// +/// Returns [`MeshError`] if a claim cannot be encoded, the socket cannot be +/// bound, or a packet fails to send. +pub async fn announce_claims( + peer_addr: &str, + node: &Node, + claims: Vec, +) -> Result<(), MeshError> { + let socket = UdpSocket::bind("0.0.0.0:0").await?; + let mut batch: Vec = Vec::new(); + for claim in claims { + // A claim that cannot fit in an empty packet on its own can never be + // gossiped; log and skip it rather than dropping silently. + if encode_claims(node, std::slice::from_ref(&claim))?.len() + > MAX_PACKET_SIZE + { + tracing::warn!( + store = claim.store_path, + "mesh: trust claim exceeds packet size, skipping" + ); + continue; + } + batch.push(claim); + if encode_claims(node, &batch)?.len() <= MAX_PACKET_SIZE { + continue; + } + // Adding the last claim overflowed the packet: flush the batch without it, + // then start a fresh batch holding the claim that did not fit. + let overflow = batch.split_off(batch.len() - 1); + socket + .send_to(&encode_claims(node, &batch)?, peer_addr) + .await?; + batch = overflow; + } + if !batch.is_empty() { + socket + .send_to(&encode_claims(node, &batch)?, peer_addr) + .await?; + } + Ok(()) +} + pub async fn run_gossip_loop( node: Node, db: Db, peers: Vec, interval: Duration, + gossip_claims: bool, mut stop: tokio::sync::watch::Receiver, ) { let mut ticker = tokio::time::interval(interval); @@ -221,13 +349,22 @@ pub async fn run_gossip_loop( tokio::select! { _ = stop.changed() => return, _ = ticker.tick() => { - let Ok(routes) = db.list_recent_routes(MAX_GOSSIP_ROUTES).await else { continue; }; - if routes.is_empty() { continue; } + let routes = db.list_recent_routes(MAX_GOSSIP_ROUTES).await.unwrap_or_default(); + let claims = if gossip_claims { + db.list_recent_trust_claims(MAX_GOSSIP_ROUTES).await.unwrap_or_default() + } else { + Vec::new() + }; + if routes.is_empty() && claims.is_empty() { continue; } for peer in &peers { let peer = peer.clone(); let node = node.clone(); let routes = routes.clone(); - tokio::spawn(async move { let _ = announce(&peer, &node, routes).await; }); + let claims = claims.clone(); + tokio::spawn(async move { + if !routes.is_empty() { let _ = announce(&peer, &node, routes).await; } + if !claims.is_empty() { let _ = announce_claims(&peer, &node, claims).await; } + }); } } } @@ -256,9 +393,110 @@ fn decode_packet(packet: &[u8]) -> Result, MeshError> { #[cfg(test)] mod tests { - use ncro_db::{Db, RouteEntry}; + use base64::{Engine, engine::general_purpose::STANDARD}; + use ed25519_dalek::{Signer, SigningKey}; + use ncro_db::{Db, RouteEntry, TrustClaim}; + use ncro_narinfo::NarInfo; + use rand::RngExt; + + use super::{merge_claims, merge_routes}; - use super::merge_routes; + /// Build a trust claim carrying a narinfo signed by `signer_key` so it + /// re-verifies, unless `tamper` flips a content field after signing. + fn signed_claim(store_hash: &str, tamper: bool) -> TrustClaim { + let mut key_bytes = [0_u8; 32]; + rand::rng().fill(&mut key_bytes); + let signing = SigningKey::from_bytes(&key_bytes); + let store_path = format!("/nix/store/{store_hash}-pkg"); + let mut ni = NarInfo { + store_path: store_path.clone(), + nar_hash: "sha256:abc".into(), + nar_size: 12, + references: vec![format!("{store_hash}-pkg")], + ..Default::default() + }; + let sig = signing.sign(ni.fingerprint().as_bytes()); + let signer_key = format!( + "test:{}", + STANDARD.encode(signing.verifying_key().to_bytes()) + ); + ni.sig = vec![format!("test:{}", STANDARD.encode(sig.to_bytes()))]; + if tamper { + ni.nar_size = 999; + } + let now = chrono::Utc::now(); + let narinfo = format!( + "StorePath: {}\nNarHash: {}\nNarSize: {}\nReferences: {}\nSig: {}\n", + ni.store_path, + ni.nar_hash, + ni.nar_size, + ni.references.join(" "), + ni.sig[0], + ); + TrustClaim { + narinfo_hash: store_hash.into(), + store_path, + upstream_url: "https://cache.example".into(), + signer_name: "test".into(), + signer_key, + nar_hash: ni.nar_hash.clone(), + nar_size: ni.nar_size, + references: ni.references.join(" "), + deriver: String::new(), + ca: String::new(), + file_hash: String::new(), + file_size: 0, + first_seen: now, + last_seen: now, + narinfo: narinfo.into_bytes(), + } + } + + fn trust_set(claim: &TrustClaim) -> std::collections::HashSet { + std::iter::once(claim.signer_key.clone()).collect() + } + + #[tokio::test] + async fn merge_claims_accepts_valid_trusted_signature() + -> Result<(), Box> { + let db = Db::open(":memory:", 100).await?; + let claim = signed_claim("aaa111", false); + let trusted = trust_set(&claim); + merge_claims(&db, vec![claim], &trusted).await; + assert_eq!(db.trust_claims("aaa111").await?.len(), 1); + Ok(()) + } + + #[tokio::test] + async fn merge_claims_rejects_tampered_claim() + -> Result<(), Box> { + let db = Db::open(":memory:", 100).await?; + let claim = signed_claim("bbb222", true); + // Trust the signer key so we isolate the *signature* check. + let trusted = trust_set(&claim); + merge_claims(&db, vec![claim], &trusted).await; + assert!( + db.trust_claims("bbb222").await?.is_empty(), + "a claim whose narinfo signature does not verify must be dropped" + ); + Ok(()) + } + + #[tokio::test] + async fn merge_claims_rejects_untrusted_signer() + -> Result<(), Box> { + let db = Db::open(":memory:", 100).await?; + // A perfectly valid self-signed claim, but the signer key is not trusted: + // this is the attacker-generated-key case and must be dropped. + let claim = signed_claim("ccc333", false); + let trusted = std::collections::HashSet::new(); + merge_claims(&db, vec![claim], &trusted).await; + assert!( + db.trust_claims("ccc333").await?.is_empty(), + "a validly-signed claim from an untrusted key must be dropped" + ); + Ok(()) + } fn route(store_path: &str, latency_ema: f64, ttl_secs: i64) -> RouteEntry { let now = chrono::Utc::now(); diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index 3e58b85..ac46091 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -26,6 +26,11 @@ pub struct Metrics { pub upstream_race_wins: IntCounterVec, pub route_entries: IntGauge, pub upstream_latency: HistogramVec, + /// Times content was served despite a failed or absent trust check because + /// `trust.fail_closed = false`. Labelled by `reason` (`unsigned`, + /// `below_quorum`). A non-zero value means the proxy is running in an open + /// mode and the trust policy is advisory rather than enforced. + pub trust_bypass: IntCounterVec, } static METRICS: OnceLock = OnceLock::new(); @@ -115,6 +120,15 @@ pub fn get() -> &'static Metrics { &["upstream"], ) .expect("valid metric"); + let trust_bypass = IntCounterVec::new( + Opts::new( + "ncro_trust_bypass_total", + "Content served despite a failed or absent trust check \ + (fail_closed=false).", + ), + &["reason"], + ) + .expect("valid metric"); for collector in [ Box::new(narinfo_cache_hits.clone()) @@ -130,6 +144,7 @@ pub fn get() -> &'static Metrics { Box::new(upstream_race_wins.clone()), Box::new(route_entries.clone()), Box::new(upstream_latency.clone()), + Box::new(trust_bypass.clone()), ] { registry.register(collector).expect("register metric"); } @@ -148,6 +163,7 @@ pub fn get() -> &'static Metrics { upstream_race_wins, route_entries, upstream_latency, + trust_bypass, } }) } diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index 8a1f344..9624b73 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -8,20 +8,22 @@ homepage.workspace = true repository.workspace = true [dependencies] -chrono.workspace = true -dashmap.workspace = true -futures-util.workspace = true -moka.workspace = true -ncro-config.workspace = true -ncro-db.workspace = true -ncro-health.workspace = true -ncro-metrics.workspace = true -ncro-narinfo.workspace = true -ncro-s3.workspace = true -reqwest = { workspace = true, features = [ "rustls" ] } -thiserror.workspace = true -tokio = { workspace = true, features = [ "macros", "sync", "time", "rt" ] } -tracing.workspace = true +base64.workspace = true +chrono.workspace = true +dashmap.workspace = true +ed25519-dalek.workspace = true +futures-util.workspace = true +moka.workspace = true +ncro-config.workspace = true +ncro-db.workspace = true +ncro-health.workspace = true +ncro-metrics.workspace = true +ncro-narinfo.workspace = true +ncro-s3.workspace = true +reqwest = { workspace = true, features = [ "rustls" ] } +thiserror.workspace = true +tokio = { workspace = true, features = [ "macros", "sync", "time", "rt" ] } +tracing.workspace = true [lints] workspace = true diff --git a/crates/router/src/lib.rs b/crates/router/src/lib.rs index 646a4e5..fcc39ba 100644 --- a/crates/router/src/lib.rs +++ b/crates/router/src/lib.rs @@ -1,10 +1,10 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, sync::Arc, time::{Duration, Instant}, }; -use chrono::Utc; +use chrono::{DateTime, Utc}; use dashmap::{DashMap, mapref::entry::Entry}; use futures_util::{StreamExt, stream::FuturesUnordered}; use moka::future::Cache as MokaCache; @@ -14,9 +14,11 @@ use ncro_config::{ FilterRule, NarUrlMode, S3Config, + TrustConfig, + TrustMode, UpstreamConfig, }; -use ncro_db::{Db, DbError, RouteEntry}; +use ncro_db::{Db, DbError, RouteEntry, TrustClaim}; use ncro_health::{Prober, Status}; use ncro_narinfo::{NarInfo, NarInfoError, parse_public_key}; use ncro_s3::{S3ClientPool, S3Error}; @@ -33,6 +35,8 @@ pub enum RouterError { NoCandidates(String), #[error("narinfo signature verification failed")] SignatureVerificationFailed, + #[error("narinfo did not satisfy trust policy")] + TrustPolicyFailed, #[error("fetch narinfo: {0}")] FetchNarinfo(#[from] reqwest::Error), #[error("S3 request failed: {0}")] @@ -130,6 +134,7 @@ struct RouterInner { upstream_auth: RwLock)>>, upstream_filters: RwLock>>, upstream_nar_url_modes: RwLock>, + trust: TrustConfig, inflight: DashMap>>, lru: MokaCache>, miss_lru: MokaCache, @@ -146,6 +151,17 @@ struct RaceResult { latency_ms: f64, } +struct FetchedNarInfo { + body: Option>, + parsed: NarInfo, + signer: Option, +} + +struct VerifiedSigner { + name: String, + public_key: String, +} + enum CommitOutcome { Accepted(ResolveResult), Rejected, @@ -202,6 +218,31 @@ impl Router { race_timeout: Duration, negative_ttl: Duration, tuning: RouterTuning, + ) -> Result { + Self::new_with_trust( + db, + prober, + route_ttl, + race_timeout, + negative_ttl, + tuning, + TrustConfig::default(), + ) + } + + /// Create a router with an explicit trust policy. + /// + /// # Errors + /// + /// Returns an error if the HTTP client cannot be constructed. + pub fn new_with_trust( + db: Db, + prober: Prober, + route_ttl: Duration, + race_timeout: Duration, + negative_ttl: Duration, + tuning: RouterTuning, + trust: TrustConfig, ) -> Result { Ok(Self { inner: Arc::new(RouterInner { @@ -217,6 +258,7 @@ impl Router { upstream_auth: RwLock::new(HashMap::new()), upstream_filters: RwLock::new(HashMap::new()), upstream_nar_url_modes: RwLock::new(HashMap::new()), + trust, inflight: DashMap::new(), lru: MokaCache::builder() .max_capacity(1024) @@ -250,10 +292,7 @@ impl Router { self.inner.s3.register(url.clone(), s3.clone()); } self - .register_upstream_key( - url.clone(), - upstream.public_key().to_string(), - ) + .register_upstream_key(url.clone(), upstream.public_key().to_string()) .await?; self .register_upstream_auth( @@ -283,11 +322,12 @@ impl Router { url: String, public_key: String, ) -> Result<(), NarInfoError> { - let mut map = self.inner.upstream_keys.write().await; if public_key.is_empty() { + let mut map = self.inner.upstream_keys.write().await; map.remove(&url); } else { parse_public_key(&public_key)?; + let mut map = self.inner.upstream_keys.write().await; map.insert(url, public_key); } Ok(()) @@ -345,10 +385,12 @@ impl Router { url: String, timeout: Option, ) -> Result<(), reqwest::Error> { - let mut map = self.inner.upstream_clients.write().await; if let Some(timeout) = timeout { - map.insert(url, reqwest::Client::builder().timeout(timeout).build()?); + let client = reqwest::Client::builder().timeout(timeout).build()?; + let mut map = self.inner.upstream_clients.write().await; + map.insert(url, client); } else { + let mut map = self.inner.upstream_clients.write().await; map.remove(&url); } Ok(()) @@ -367,12 +409,23 @@ impl Router { upstream: &str, ) -> Result { let start = Instant::now(); - let (body, _) = self.fetch_narinfo(upstream, store_hash).await?; - let narinfo_bytes = - self.response_narinfo_bytes(upstream, body.as_deref()).await; + let fetched = self.fetch_narinfo(upstream, store_hash).await?; + let latency_ms = start.elapsed().as_secs_f64() * 1000.0; + if !self + .accepts_trust_claim(upstream, store_hash, &fetched, latency_ms) + .await? + { + if self.inner.trust.fail_closed { + return Err(RouterError::TrustPolicyFailed); + } + note_trust_bypass(upstream, &fetched); + } + let narinfo_bytes = self + .response_narinfo_bytes(upstream, fetched.body.as_deref()) + .await; Ok(ResolveResult { url: upstream.to_string(), - latency_ms: start.elapsed().as_secs_f64() * 1000.0, + latency_ms, cache_hit: false, narinfo_bytes, }) @@ -451,6 +504,18 @@ impl Router { self.inner.lru.invalidate(store_hash).await; return Ok(None); }; + if !self + .cached_route_satisfies_trust( + &cached.url, + store_hash, + narinfo_bytes.as_deref(), + cached.latency_ms, + ) + .await? + { + self.inner.lru.invalidate(store_hash).await; + return Ok(None); + } ncro_metrics::get().narinfo_cache_hits.inc(); let mut result = (*cached).clone(); if result.narinfo_bytes.is_none() { @@ -485,6 +550,17 @@ impl Router { else { return Ok(None); }; + if !self + .cached_route_satisfies_trust( + &entry.upstream_url, + store_hash, + narinfo_bytes.as_deref(), + entry.latency_ms, + ) + .await? + { + return Ok(None); + } if entry.narinfo_bytes.is_none() && narinfo_bytes.is_some() { entry.narinfo_bytes = narinfo_bytes; self.inner.db.set_route(&entry).await?; @@ -773,22 +849,43 @@ impl Router { winner: RaceResult, store_hash: &str, ) -> Result { - let (body, parsed) = self.fetch_narinfo(&winner.url, store_hash).await?; - if !self.upstream_allows_narinfo(&winner.url, &parsed).await { + let fetched = self.fetch_narinfo(&winner.url, store_hash).await?; + if !self + .upstream_allows_narinfo(&winner.url, &fetched.parsed) + .await + { tracing::debug!( upstream = &winner.url, - store_path = &parsed.store_path, + store_path = &fetched.parsed.store_path, "narinfo rejected by upstream filter" ); return Ok(CommitOutcome::Rejected); } + let trust_accepted = self + .accepts_trust_claim(&winner.url, store_hash, &fetched, winner.latency_ms) + .await?; + if !trust_accepted { + if self.inner.trust.fail_closed { + tracing::debug!( + upstream = &winner.url, + store_path = &fetched.parsed.store_path, + "narinfo rejected by trust policy" + ); + return Ok(CommitOutcome::Rejected); + } + note_trust_bypass(&winner.url, &fetched); + } // Strip leading slash and query string (harmonia appends ?hash=STORE_HASH) // so the DB key is just the path component for consistent lookups. - let nar_url = parsed + let nar_url = fetched + .parsed .url .trim_start_matches('/') .split_once('?') - .map_or_else(|| parsed.url.trim_start_matches('/'), |(path, _)| path) + .map_or_else( + || fetched.parsed.url.trim_start_matches('/'), + |(path, _)| path, + ) .to_string(); ncro_metrics::get() @@ -829,10 +926,10 @@ impl Router { ttl: now + chrono::Duration::from_std(self.inner.route_ttl) .unwrap_or_default(), - nar_hash: parsed.nar_hash, - nar_size: parsed.nar_size, + nar_hash: fetched.parsed.nar_hash, + nar_size: fetched.parsed.nar_size, nar_url, - narinfo_bytes: body.clone(), + narinfo_bytes: fetched.body.clone(), }) .await?; let result = ResolveResult { @@ -840,7 +937,7 @@ impl Router { latency_ms: winner.latency_ms, cache_hit: false, narinfo_bytes: self - .response_narinfo_bytes(&winner.url, body.as_deref()) + .response_narinfo_bytes(&winner.url, fetched.body.as_deref()) .await, }; self @@ -906,10 +1003,12 @@ impl Router { return CachedFilterCheck::Accepted(Some(bytes.to_vec())); } - if let Ok((body, parsed)) = self.fetch_narinfo(upstream, store_hash).await - && self.upstream_allows_narinfo(upstream, &parsed).await + if let Ok(fetched) = self.fetch_narinfo(upstream, store_hash).await + && self + .upstream_allows_narinfo(upstream, &fetched.parsed) + .await { - return CachedFilterCheck::Accepted(body); + return CachedFilterCheck::Accepted(fetched.body); } CachedFilterCheck::Rejected } @@ -918,7 +1017,7 @@ impl Router { &self, upstream: &str, store_hash: &str, - ) -> Result<(Option>, NarInfo), RouterError> { + ) -> Result { let body = if self.inner.s3.contains(upstream) { self .inner @@ -947,17 +1046,167 @@ impl Router { resp.bytes().await?.to_vec() }; let parsed = NarInfo::parse(body.as_slice())?; - if let Some(pubkey) = self.inner.upstream_keys.read().await.get(upstream) - && !parsed.verify(pubkey).unwrap_or(false) + let mut signer = None; + if let Some(pubkey) = self.inner.upstream_keys.read().await.get(upstream) { + let (name, _) = parse_public_key(pubkey)?; + if parsed.verify(pubkey).unwrap_or(false) { + signer = Some(VerifiedSigner { + name, + public_key: pubkey.clone(), + }); + } else { + tracing::warn!( + upstream, + store = store_hash, + "narinfo signature verification failed" + ); + return Err(RouterError::SignatureVerificationFailed); + } + } + Ok(FetchedNarInfo { + body: Some(body), + parsed, + signer, + }) + } + + async fn cached_route_satisfies_trust( + &self, + upstream: &str, + store_hash: &str, + narinfo_bytes: Option<&[u8]>, + latency_ms: f64, + ) -> Result { + if self.inner.trust.mode == TrustMode::Off { + return Ok(true); + } + if let Some(bytes) = narinfo_bytes + && let Ok(parsed) = NarInfo::parse(bytes) { - tracing::warn!( - upstream, - store = store_hash, - "narinfo signature verification failed" - ); + let signer = self.verified_signer(upstream, &parsed).await?; + let fetched = FetchedNarInfo { + body: Some(bytes.to_vec()), + parsed, + signer, + }; + return self + .accepts_trust_claim(upstream, store_hash, &fetched, latency_ms) + .await; + } + let fetched = self.fetch_narinfo(upstream, store_hash).await?; + self + .accepts_trust_claim(upstream, store_hash, &fetched, latency_ms) + .await + } + + async fn verified_signer( + &self, + upstream: &str, + narinfo: &NarInfo, + ) -> Result, RouterError> { + let Some(public_key) = + self.inner.upstream_keys.read().await.get(upstream).cloned() + else { + return Ok(None); + }; + let (name, _) = parse_public_key(&public_key)?; + if !narinfo.verify(&public_key).unwrap_or(false) { return Err(RouterError::SignatureVerificationFailed); } - Ok((Some(body), parsed)) + Ok(Some(VerifiedSigner { name, public_key })) + } + + async fn accepts_trust_claim( + &self, + upstream: &str, + store_hash: &str, + fetched: &FetchedNarInfo, + latency_ms: f64, + ) -> Result { + match self.inner.trust.mode { + TrustMode::Off => Ok(true), + TrustMode::Signed => { + if fetched.signer.is_none() { + return Ok(false); + } + self + .record_trust_claim(upstream, store_hash, fetched, latency_ms) + .await?; + Ok(true) + }, + TrustMode::Quorum => { + if fetched.signer.is_none() { + return Ok(false); + } + self + .record_trust_claim(upstream, store_hash, fetched, latency_ms) + .await?; + let claims = self.inner.db.trust_claims(store_hash).await?; + let cutoff = self.inner.trust.claim_ttl.as_ref().and_then(|ttl| { + chrono::Duration::from_std(ttl.0) + .ok() + .map(|d| Utc::now() - d) + }); + let trusted = self.trusted_signer_keys().await; + let matching = matching_claim_count( + &claims, + &fetched.parsed.nar_hash, + fetched.parsed.nar_size, + &references_key(&fetched.parsed.references), + self.inner.trust.require_distinct_signers, + cutoff, + &trusted, + ); + Ok(matching >= self.inner.trust.threshold) + }, + } + } + + /// The set of signer keys trusted to vouch for content in `quorum` mode: + /// the `public_key` of every configured upstream plus any explicit + /// `trust.trusted_keys`. Locally-recorded claims always use a configured + /// upstream key, so this never excludes a legitimate local observation; it + /// only bounds which *relayed* signers can contribute to a quorum. + async fn trusted_signer_keys(&self) -> HashSet { + let mut keys: HashSet = + self.inner.trust.trusted_keys.iter().cloned().collect(); + keys.extend(self.inner.upstream_keys.read().await.values().cloned()); + keys + } + + async fn record_trust_claim( + &self, + upstream: &str, + store_hash: &str, + fetched: &FetchedNarInfo, + _latency_ms: f64, + ) -> Result<(), RouterError> { + let Some(signer) = &fetched.signer else { + return Ok(()); + }; + let now = Utc::now(); + self + .inner + .db + .set_trust_claim(&TrustClaim { + narinfo_hash: store_hash.to_string(), + store_path: fetched.parsed.store_path.clone(), + upstream_url: upstream.to_string(), + signer_name: signer.name.clone(), + signer_key: signer.public_key.clone(), + nar_hash: fetched.parsed.nar_hash.clone(), + nar_size: fetched.parsed.nar_size, + references: references_key(&fetched.parsed.references), + deriver: fetched.parsed.deriver.clone(), + ca: fetched.parsed.ca.clone(), + file_hash: fetched.parsed.file_hash.clone(), + file_size: fetched.parsed.file_size, + first_seen: now, + last_seen: now, + narinfo: fetched.body.clone().unwrap_or_default(), + }) + .await?; + Ok(()) } async fn response_narinfo_bytes( @@ -1069,6 +1318,64 @@ fn store_path_name(store_path: &str) -> &str { base.split_once('-').map_or(base, |(_, name)| name) } +fn references_key(references: &[String]) -> String { + references.join(" ") +} + +/// Record that content was served despite a failed/absent trust check. +/// +/// Only reached when `trust.fail_closed = false`: the policy is advisory, so +/// make the bypass observable rather than silent via a warning and the +/// `ncro_trust_bypass_total` counter. +fn note_trust_bypass(upstream: &str, fetched: &FetchedNarInfo) { + let reason = if fetched.signer.is_none() { + "unsigned" + } else { + "below_quorum" + }; + tracing::warn!( + upstream, + reason, + store_path = %fetched.parsed.store_path, + "trust check failed but fail_closed=false; serving content anyway" + ); + ncro_metrics::get() + .trust_bypass + .with_label_values(&[reason]) + .inc(); +} + +fn matching_claim_count( + claims: &[TrustClaim], + nar_hash: &str, + nar_size: u64, + references: &str, + distinct_signers: bool, + cutoff: Option>, + trusted_keys: &HashSet, +) -> u32 { + let matches = |claim: &&TrustClaim| { + claim.nar_hash == nar_hash + && claim.nar_size == nar_size + && claim.references == references + && cutoff.is_none_or(|min| claim.last_seen >= min) + // Only keys in the configured trusted set count toward a quorum. + // Without this an attacker could self-sign forged content under any + // number of throwaway keys and manufacture agreement. + && trusted_keys.contains(&claim.signer_key) + }; + if distinct_signers { + let signers = claims + .iter() + .filter(matches) + .map(|claim| claim.signer_key.as_str()) + .collect::>(); + return u32::try_from(signers.len()).unwrap_or(u32::MAX); + } + let count = claims.iter().filter(matches).count(); + u32::try_from(count).unwrap_or(u32::MAX) +} + fn wildcard_match(pattern: &str, value: &str) -> bool { if pattern == "*" { return true; @@ -1102,10 +1409,12 @@ fn wildcard_match(pattern: &str, value: &str) -> bool { #[cfg(test)] mod tests { #![expect(clippy::unwrap_used, reason = "Fine in tests")] - use std::{sync::Arc, time::Duration}; + use std::{collections::HashSet, sync::Arc, time::Duration}; + use base64::{Engine, engine::general_purpose::STANDARD}; use chrono::Utc; - use ncro_config::{NarUrlMode, UpstreamConfig}; + use ed25519_dalek::{Signer, SigningKey}; + use ncro_config::{NarUrlMode, TrustConfig, TrustMode, UpstreamConfig}; use ncro_db::{Db, RouteEntry}; use ncro_health::Prober; use tokio::{ @@ -1135,11 +1444,19 @@ mod tests { async fn make_router_with_upstreams( cooldown: Duration, upstreams: &[UpstreamConfig], + ) -> Router { + make_router_with_trust(cooldown, upstreams, TrustConfig::default()).await + } + + async fn make_router_with_trust( + cooldown: Duration, + upstreams: &[UpstreamConfig], + trust: TrustConfig, ) -> Router { let db = Db::open(":memory:", 100).await.unwrap(); let prober = Prober::new(0.3).unwrap(); prober.init_upstreams(upstreams).await; - Router::new( + let router = Router::new_with_trust( db, prober, Duration::from_hours(1), @@ -1151,8 +1468,13 @@ mod tests { in_memory_negative_ttl: Duration::from_mins(5), upstream_cooldown: cooldown, }, + trust, ) - .unwrap() + .unwrap(); + for upstream in upstreams { + router.register_upstream(upstream).await.unwrap(); + } + router } async fn spawn_narinfo_server(get_status: u16, store_name: &str) -> String { @@ -1200,6 +1522,69 @@ mod tests { format!("http://{addr}") } + fn signing_key(seed: u8) -> SigningKey { + SigningKey::from_bytes(&[seed; 32]) + } + + fn public_key(name: &str, key: &SigningKey) -> String { + format!("{name}:{}", STANDARD.encode(key.verifying_key().to_bytes())) + } + + fn signed_narinfo_body( + key_name: &str, + key: &SigningKey, + store_name: &str, + ) -> String { + let store_path = format!("/nix/store/abc123-{store_name}"); + let fingerprint = format!("1;{store_path};sha256:abc;1;"); + let signature = key.sign(fingerprint.as_bytes()); + format!( + "StorePath: {store_path}\nURL: nar/test.nar.xz\nCompression: \ + xz\nNarHash: sha256:abc\nNarSize: 1\nReferences: \nSig: {key_name}:{}\n", + STANDARD.encode(signature.to_bytes()) + ) + } + + async fn spawn_signed_narinfo_server( + key_name: &'static str, + key: SigningKey, + store_name: &'static str, + ) -> String { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + return; + }; + let key = key.clone(); + tokio::spawn(async move { + let mut buf = [0_u8; 1024]; + let Ok(n) = stream.read(&mut buf).await else { + return; + }; + let request = String::from_utf8_lossy(&buf[..n]); + if request.starts_with("HEAD ") { + let _ = stream + .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .await; + return; + } + if request.starts_with("GET ") { + let body = signed_narinfo_body(key_name, &key, store_name); + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}", + body.len(), + body + ); + let _ = stream.write_all(response.as_bytes()).await; + } + }); + } + }); + format!("http://{addr}") + } + async fn spawn_head_ok_get_drop_server() -> String { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); @@ -1382,6 +1767,186 @@ mod tests { assert_eq!(result.url, accepted); } + #[tokio::test] + async fn signed_trust_rejects_unsigned_winner_and_uses_signed_candidate() { + let unsigned = spawn_narinfo_server(200, "zedless-0.1.0").await; + let key = signing_key(7); + let signed = + spawn_signed_narinfo_server("cache-a", key.clone(), "zedless-0.1.0") + .await; + let router = make_router_with_trust( + Duration::from_mins(1), + &[ + UpstreamConfig { + url: unsigned.clone(), + priority: 1, + ..Default::default() + }, + UpstreamConfig { + url: signed.clone(), + priority: 2, + public_key: public_key("cache-a", &key), + ..Default::default() + }, + ], + TrustConfig { + mode: TrustMode::Signed, + ..Default::default() + }, + ) + .await; + + let result = router + .resolve("abc123", &[unsigned, signed.clone()]) + .await + .unwrap(); + + assert_eq!(result.url, signed); + assert_eq!( + router.inner.db.trust_claims("abc123").await.unwrap().len(), + 1 + ); + } + + #[tokio::test] + async fn quorum_trust_requires_matching_claims_from_distinct_signers() { + let key_a = signing_key(8); + let key_b = signing_key(9); + let cache_a = + spawn_signed_narinfo_server("cache-a", key_a.clone(), "zedless-0.1.0") + .await; + let cache_b = + spawn_signed_narinfo_server("cache-b", key_b.clone(), "zedless-0.1.0") + .await; + let router = make_router_with_trust( + Duration::from_mins(1), + &[ + UpstreamConfig { + url: cache_a.clone(), + priority: 1, + public_key: public_key("cache-a", &key_a), + ..Default::default() + }, + UpstreamConfig { + url: cache_b.clone(), + priority: 2, + public_key: public_key("cache-b", &key_b), + ..Default::default() + }, + ], + TrustConfig { + mode: TrustMode::Quorum, + threshold: 2, + ..Default::default() + }, + ) + .await; + + let result = router + .resolve("abc123", &[cache_a, cache_b.clone()]) + .await + .unwrap(); + + assert_eq!(result.url, cache_b); + assert_eq!( + router.inner.db.trust_claims("abc123").await.unwrap().len(), + 2 + ); + } + + fn claim_for( + signer_key: &str, + last_seen: chrono::DateTime, + ) -> ncro_db::TrustClaim { + ncro_db::TrustClaim { + narinfo_hash: "abc123".into(), + store_path: "/nix/store/abc123-pkg".into(), + upstream_url: format!("https://{signer_key}.example"), + signer_name: signer_key.into(), + signer_key: signer_key.into(), + nar_hash: "sha256:abc".into(), + nar_size: 12, + references: "abc123-pkg".into(), + deriver: String::new(), + ca: String::new(), + file_hash: String::new(), + file_size: 0, + first_seen: last_seen, + last_seen, + narinfo: Vec::new(), + } + } + + #[test] + fn quorum_count_dedupes_repeated_signer() { + let now = Utc::now(); + // Same signer key seen twice (e.g. via two upstream URLs) must count once. + let claims = vec![ + claim_for("signer-a", now), + claim_for("signer-a", now), + claim_for("signer-b", now), + ]; + let trusted: HashSet = + ["signer-a".to_string(), "signer-b".to_string()] + .into_iter() + .collect(); + let distinct = super::matching_claim_count( + &claims, + "sha256:abc", + 12, + "abc123-pkg", + true, + None, + &trusted, + ); + assert_eq!(distinct, 2, "distinct signers must dedupe by signer_key"); + } + + #[test] + fn quorum_count_excludes_expired_claims() { + let now = Utc::now(); + let stale = now - chrono::Duration::days(30); + let claims = vec![claim_for("signer-a", now), claim_for("signer-b", stale)]; + let cutoff = Some(now - chrono::Duration::days(7)); + let trusted: HashSet = + ["signer-a".to_string(), "signer-b".to_string()] + .into_iter() + .collect(); + let fresh = super::matching_claim_count( + &claims, + "sha256:abc", + 12, + "abc123-pkg", + true, + cutoff, + &trusted, + ); + assert_eq!(fresh, 1, "claims older than the cutoff must not count"); + } + + #[test] + fn quorum_count_ignores_untrusted_signer() { + let now = Utc::now(); + // Two validly-distinct signers, but only one is in the trusted set: an + // attacker self-signing under a throwaway key must not count. + let claims = vec![ + claim_for("trusted-signer", now), + claim_for("attacker-key", now), + ]; + let trusted: HashSet = + std::iter::once("trusted-signer".to_string()).collect(); + let counted = super::matching_claim_count( + &claims, + "sha256:abc", + 12, + "abc123-pkg", + true, + None, + &trusted, + ); + assert_eq!(counted, 1, "claims from untrusted keys must not count"); + } + #[tokio::test] async fn fallback_resolve_ignores_filters_and_does_not_persist_route() { let fallback = spawn_narinfo_server(200, "unrelated-1.0").await; @@ -1427,11 +1992,13 @@ mod tests { assert_eq!(cached.url, previously_accepted); router - .register_upstream_filters(previously_accepted.clone(), vec![FilterRule { - action: FilterAction::Allow, - field: FilterField::Name, - pattern: "zedless*".to_string(), - }]) + .register_upstream_filters(previously_accepted.clone(), vec![ + FilterRule { + action: FilterAction::Allow, + field: FilterField::Name, + pattern: "zedless*".to_string(), + }, + ]) .await; let result = router diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 891fdec..f72a8dd 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, sync::Arc, }; @@ -13,8 +13,8 @@ use axum::{ }; use bytes::Bytes; use futures_util::TryStreamExt; -use ncro_config::UpstreamConfig; -use ncro_db::Db; +use ncro_config::{TrustConfig, TrustMode, UpstreamConfig}; +use ncro_db::{Db, TrustClaim}; use ncro_health::{Prober, Status, UpstreamHealth}; use ncro_router::{Router, RouterError}; use ncro_s3::S3ClientPool; @@ -32,6 +32,7 @@ pub struct AppState { nar_clients: HashMap, default_client: reqwest::Client, cache_priority: i32, + trust: TrustConfig, } impl AppState { @@ -46,6 +47,7 @@ pub struct AppConfig { pub cache_priority: i32, pub read_timeout: std::time::Duration, pub write_timeout: std::time::Duration, + pub trust: TrustConfig, } /// Build the HTTP application router. @@ -65,6 +67,7 @@ pub fn app( cache_priority, read_timeout, write_timeout, + trust, } = config; let s3 = S3ClientPool::default(); for upstream in &upstreams { @@ -110,12 +113,15 @@ pub fn app( nar_clients, default_client, cache_priority, + trust, }; Ok( AxumRouter::new() .route("/nix-cache-info", get(cache_info).head(cache_info)) .route("/health", get(health)) .route("/metrics", get(metrics_endpoint)) + .route("/provenance/{hash_narinfo}", get(provenance)) + .route("/trust/{hash_narinfo}", get(trust_endpoint)) .route("/{hash_narinfo}", get(narinfo).head(narinfo)) .route("/nar/{*path}", get(nar).head(nar)) .layer(RequestBodyTimeoutLayer::new(read_timeout)) @@ -185,6 +191,139 @@ async fn metrics_endpoint() -> Response { .into_response() } +#[derive(Serialize)] +struct TrustResponse { + hash: String, + mode: TrustMode, + threshold: u32, + trusted: bool, + matching_claims: u32, + claims: Vec, +} + +async fn trust_endpoint( + State(state): State>, + Path(hash_narinfo): Path, +) -> Response { + let Some(hash) = hash_narinfo.strip_suffix(".narinfo") else { + return StatusCode::NOT_FOUND.into_response(); + }; + match state.db.trust_claims(hash).await { + Ok(claims) => { + axum::Json(trust_response( + hash, + &state.trust, + &trusted_signer_keys(&state), + claims, + )) + .into_response() + }, + Err(err) => { + tracing::warn!(hash, error = %err, "trust lookup failed"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + }, + } +} + +async fn provenance( + State(state): State>, + Path(hash_narinfo): Path, +) -> Response { + let Some(hash) = hash_narinfo.strip_suffix(".narinfo") else { + return StatusCode::NOT_FOUND.into_response(); + }; + match state.db.trust_claims(hash).await { + Ok(claims) => { + axum::Json(trust_response( + hash, + &state.trust, + &trusted_signer_keys(&state), + claims, + )) + .into_response() + }, + Err(err) => { + tracing::warn!(hash, error = %err, "provenance lookup failed"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + }, + } +} + +/// The signer keys whose claims count toward a quorum: explicit +/// `trust.trusted_keys` plus every configured upstream's `public_key`. Mirrors +/// the router's enforcement so the endpoint reports the same decision. +fn trusted_signer_keys(state: &AppState) -> HashSet { + state + .trust + .trusted_keys + .iter() + .cloned() + .chain( + state + .upstreams + .iter() + .map(|u| u.public_key.clone()) + .filter(|k| !k.is_empty()), + ) + .collect() +} + +fn trust_response( + hash: &str, + trust: &TrustConfig, + trusted_keys: &HashSet, + claims: Vec, +) -> TrustResponse { + let matching_claims = + max_matching_claims(&claims, trust.require_distinct_signers, trusted_keys); + let trusted = match trust.mode { + TrustMode::Off => true, + TrustMode::Signed => !claims.is_empty(), + TrustMode::Quorum => matching_claims >= trust.threshold, + }; + TrustResponse { + hash: hash.to_string(), + mode: trust.mode, + threshold: trust.threshold, + trusted, + matching_claims, + claims, + } +} + +fn max_matching_claims( + claims: &[TrustClaim], + distinct_signers: bool, + trusted_keys: &HashSet, +) -> u32 { + let mut groups: HashMap<(&str, u64, &str), Vec<&TrustClaim>> = HashMap::new(); + for claim in claims { + // Only claims from trusted signer keys count toward a quorum. + if !trusted_keys.contains(&claim.signer_key) { + continue; + } + groups + .entry((&claim.nar_hash, claim.nar_size, &claim.references)) + .or_default() + .push(claim); + } + groups + .values() + .map(|claims| { + if distinct_signers { + let signers = claims + .iter() + .map(|claim| claim.signer_key.as_str()) + .collect::>(); + u32::try_from(signers.len()).unwrap_or(u32::MAX) + } else { + u32::try_from(claims.len()).unwrap_or(u32::MAX) + } + }) + .max() + .unwrap_or_default() +} + async fn narinfo( State(state): State>, Path(hash_narinfo): Path, diff --git a/docs/architecture.md b/docs/architecture.md index d25fe8b..7de9e02 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -127,6 +127,18 @@ this prevents project-specific caches from becoming winners for unrelated paths. Fallback cache traffic bypasses these router features and does not update health or routing state. +Optional trust enforcement runs at the same acceptance point as filters. After a +candidate narinfo is fetched, ncro verifies the configured upstream signing key +when present and can record the signed narinfo as a local claim. In `signed` +mode, one valid configured signer is enough. In `quorum` mode, ncro only stores +a route after enough signer keys agree on the same `StorePath`, `NarHash`, +`NarSize`, and `References`. Rejected candidates are handled like filter +rejections: the route is not cached and ncro keeps trying remaining candidates. +When the policy is not satisfied and `trust.fail_closed = false`, the content is +served anyway but the bypass is logged and counted (`ncro_trust_bypass_total`). +The signature covers only the signed fingerprint, not the streamed NAR bytes; +the full model and that boundary are documented in [trust.md](trust.md). + > [!NOTE] > Persistence is intentionally narrow. SQLite stores two kinds of data so a > restart does not force ncro to relearn everything from scratch. @@ -135,13 +147,23 @@ First type of stored data is **route entries**, a mapping from narinfo hash to the winning upstream URL, stored with a creation timestamp and TTL. When the cache exceeds `max_entries`, the least recently used entry is evicted first. **Health snapshots** on another hand are per-upstream EMA latency estimates and -failure counts, refreshed by the background probe loop. Fallback-cache responses -are (intentionally) not stored as route entries, so normal upstreams become -active again as soon as they recover. +failure counts, refreshed by the background probe loop. **Trust claims** (when +trust is enabled) are a third kind of stored data: durable records of which +signer vouched for which content. Fallback-cache responses are (intentionally) +not stored as route entries, so normal upstreams become active again as soon as +they recover. + +Negative lookups are cached in two layers: a short-lived in-memory LRU +(`in_memory_negative_ttl`) absorbs rapid-fire duplicate misses, while a +longer-lived SQLite entry (`negative_ttl`) lets a known miss survive a restart. +The in-memory layer is the only routing state that does not live in SQLite. Discovery and mesh are optional extensions. Discovery can add peers from the -local network, while mesh gossip shares recent route decisions across trusted -nodes using signed UDP packets. Consider: +local network, while mesh gossip shares recent route decisions (and, when +`mesh.gossip_trust_claims` is enabled, re-verified trust claims) across trusted +nodes using signed UDP packets. Relayed claims are re-verified against their +original Nix signer key before being trusted, so a peer is a witness, not a +signing authority. Consider: ```mermaid flowchart LR diff --git a/docs/trust.md b/docs/trust.md new file mode 100644 index 0000000..8950022 --- /dev/null +++ b/docs/trust.md @@ -0,0 +1,216 @@ +# Trust + +ncro can optionally verify the provenance of the narinfo metadata it serves. The +system is experimental and therefore trust is _off by default_. ncro is a +_routing optimizer_, all things considered, and the trust layer is an opt-in +guard for deployments that do not want to blindly forward whatever an upstream +returns. + +This document describes the model end to end: what is verified, what is _not_, +how a quorum is counted, how `fail_closed` behaves, and how the mesh lets a +quorum form across several nodes. + +## Trust-Check Coverage + +Nix binary caches sign narinfos with an ed25519 key. The signature is computed +over a _fingerprint_: + +```plaintext +1;;;; +``` + +So in essence a valid signature cryptographically binds the **store path, the +NAR hash, the NAR size, and the references** to a signing key you trust. It does +**not** cover `FileHash`, `FileSize`, `Deriver`, `CA`, `Compression`, or `URL`. +Those travel in the narinfo but are outside the signed fingerprint. + +> [!NOTE] +> ncro verifies this signature against the `public_key` configured on the +> upstream that served the narinfo (Nix `name:base64(key)` format). The +> verification logic lives in [`crates/narinfo`](../crates/narinfo/src/lib.rs); +> the policy that consumes it lives in +> [`crates/router`](../crates/router/src/lib.rs). + +### Why Not Hash the NAR Bytes? + +A reasonable question is _"why not also verify the bytes streamed for +`/nar/*.nar`?"_ The answer is relatively simple: because it would not add a +cryptographic guarantee and would produce false alarms: + +- The signed `NarHash` is over the **uncompressed** NAR. The bytes ncro streams + are the **compressed** file, whose digest is `FileHash`. A"nd `FileHash` is + _not_ part of the signed fingerprint. Comparing streamed bytes to `FileHash` + proves nothing about trust. +- Verifying the signed `NarHash` would require decompressing every stream + (xz/zstd/...) in flight, which defeats ncro's zero-local-storage streaming + design (see [architecture](architecture.md)). + +So content trust stops at the signed metadata. If you need end-to-end NAR +content verification, Nix itself already does it on the client: the daemon +checks the NAR hash of every path it imports against the (signed) narinfo. ncro +sitting in front of Nix does not weaken that; a tampered NAR still fails in the +Nix daemon. + +## Modes + +`trust.mode` selects the policy. It is evaluated at the same acceptance point as +per-upstream filters: after a candidate narinfo is fetched but before the route +is cached. + + + +| Mode | Requirement to accept a candidate | +| -------- | ------------------------------------------------------------------------------- | +| `off` | No verification. Route-only behavior (default). | +| `signed` | The narinfo must verify against the serving upstream's configured `public_key`. | +| `quorum` | A verified signature **plus** at least `threshold` matching claims (below). | + + + +A candidate that fails the policy is treated exactly like a filter rejection: +the route is not cached and ncro keeps trying the remaining candidates. + +## Claims + +Every time ncro verifies a signature it records a **trust claim** in SQLite (see +`TrustClaim` in [`crates/db`](../crates/db/src/lib.rs)). A claim captures who +vouched for what: + +- `signer_key` / `signer_name` -> the Nix signing key that verified. +- `upstream_url` -> which upstream served this observation. +- the signed content tuple: `nar_hash`, `nar_size`, `references` (and the + `store_path` / `narinfo_hash`). +- provenance-only fields recorded for audit but **not** covered by the + signature: `deriver`, `ca`, `file_hash`, `file_size`, and the raw `narinfo` + bytes. + +Claims are what make `quorum` and the mesh integration possible: they are a +durable, queryable record of "signer X, via upstream Y, attests that path H has +this NAR hash." + +## Quorum counting + +In `quorum` mode a route is accepted only once `threshold` claims **agree on the +same content tuple**, which is identical `nar_hash`, `nar_size`, and +`references`. This defends against a single compromised or buggy cache: an +attacker would have to get the same forged content signed by enough independent +keys. + +`trust.require_distinct_signers` (default `true`) decides what "enough" means: + +- `true` - count **distinct `signer_key`s**. The same key seen via two upstream + URLs counts once. This is the meaningful setting: it counts independent + signers, not independent mirrors. +- `false` - count claims regardless of signer (rarely what you want). + +### The Trusted Signer Set + +Counting _distinct keys_ is only meaningful if the keys are restricted to ones +you trust. Anyone can generate an ed25519 keypair and validly sign forged +content under it, so if every validly-self-signed key counted, an attacker could +mint as many distinct keys as the threshold requires and manufacture a quorum. +The signature check alone would be theatre. + +ncro therefore counts a claim toward a quorum **only if its `signer_key` is in +the trusted set**: + +$$\text{trusted set} = \left\{ \text{upstream}_i.\text{public\_key} \right\} \cup + \text{trust.trusted\_keys}$$ + +- Locally-recorded claims always use a configured upstream key, so a single node + racing several independently-signed upstreams needs no extra config. +- For a **mesh**, each node must additionally list the signer keys of the other + nodes' trusted upstreams in `trust.trusted_keys`. Otherwise a relayed claim + signed by an unknown key is dropped (and never even stored). This is what + bounds a quorum to _trusted_ signers rather than _any_ signer. + +`trust.claim_ttl` (optional, e.g. `"30d"`) bounds how old a claim may be and +still count. Claims whose `last_seen` is older than the TTL are ignored when +evaluating the quorum, so a signer that has long stopped serving a path cannot +prop up a quorum forever. Unset means claims never expire. + +> [!WARNING] +> Quorum needs more than one independent signer to ever see the same path. On a +> single node that only races caches sharing one signing key, a `threshold > 1` +> can never be met. Either configure multiple independently-signed upstreams, or +> use the mesh (below) so peers contribute claims. + +## `fail_closed` + +`trust.fail_closed` (default `true`) decides what happens when the policy is not +satisfied: + +- `true` (**closed**) -> the candidate is rejected. If no candidate satisfies + the policy, the request ends in a 404 rather than serving unverified content. +- `false` (**open**) -> the content is served anyway, but the bypass is made + observable: ncro logs a warning and increments + `ncro_trust_bypass_total{reason="unsigned"|"below_quorum"}`. Use this to roll + trust out in "report-only" mode and watch the metric before enforcing. + +A non-zero `ncro_trust_bypass_total` means trust is advisory, not enforced. + +## Trust Over The Mesh + +When `trust.mode = "quorum"`, a lone node usually cannot reach a quorum because +it only sees the upstreams configured on it. The mesh closes this gap. + +With `mesh.gossip_trust_claims = true`, a node: + +1. **Gossips** the trust claims it has verified locally to its peers, alongside + the route gossip it already sends, over the same ed25519-signed UDP packets. + Claims are batched to stay within the UDP packet-size limit; a claim too + large to fit on its own is logged and skipped rather than dropped silently. +2. **Receives** claims relayed by peers and applies two gates before storing + each one: + - **Trusted signer.** The claim's `signer_key` must be in this node's trusted + set (above). A claim from any other key is dropped immediately, this is + what stops an attacker from minting throwaway keys to forge a quorum, and + it also bounds how many claims a hostile peer can write to the database. + - **Valid signature.** The claim carries the original narinfo bytes, and the + receiver re-verifies that narinfo against the claim's `signer_key`. The + peer's packet signature only proves _which peer relayed it_, not that the + content was signed; re-verifying means a peer cannot forge a claim under a + trusted key it does not actually hold. + +This means **a peer becomes a witness, never a substitute for a real Nix +signature.** A malicious or compromised peer cannot inflate a quorum: a claim +signed by an untrusted key is dropped, and a claim claiming a trusted key but +without a valid signature fails re-verification. A quorum therefore counts +distinct _trusted Nix signers_, whether those signatures were observed locally +or relayed by a peer. Peers are also allowlisted by their mesh public key +(`mesh.peers[].public_key`), so unknown senders are rejected before any claim is +even parsed. + +## Inspecting Trust (At Runtime) + +Two read endpoints expose the recorded claims: + +- `GET /trust/.narinfo` - the current trust decision for a path: the mode, + threshold, the count of matching claims, whether the path is currently + trusted, and the full list of claims. +- `GET /provenance/.narinfo` - the same claim list, framed as an audit of + who has vouched for the path (including the unsigned provenance fields). + +Both are useful for confirming that claims are propagating across a mesh: after +a path is resolved on one node, it should appear in `/trust` on a peer within a +gossip interval or two. + +## Configuration Summary + +```toml +[trust] +mode = "quorum" # off | signed | quorum +threshold = 2 # matching claims required in quorum mode +require_distinct_signers = true # count distinct signer keys, not mirrors +fail_closed = true # reject (true) vs serve+warn (false) +claim_ttl = "30d" # optional: ignore claims older than this + +# Signer keys trusted to vouch in quorum mode, in addition to upstream keys. +# In a mesh, list the OTHER nodes' upstream signer keys here. +trusted_keys = ["cache-b-1:bbbb...", "cache-c-1:cccc..."] + +[mesh] +enabled = true +gossip_trust_claims = true # gossip + re-verify claims across peers +peers = [{ addr = "10.0.0.2:7946", public_key = "" }] +``` diff --git a/flake.nix b/flake.nix index e7d4ec4..6de409a 100644 --- a/flake.nix +++ b/flake.nix @@ -32,6 +32,7 @@ pkgs = pkgsForEach system; in { p2p-discovery = pkgs.callPackage ./nix/tests/p2p.nix {inherit self;}; + mesh-trust = pkgs.callPackage ./nix/tests/mesh-trust.nix {inherit self;}; e2e = pkgs.callPackage ./nix/tests/e2e.nix {inherit self;}; s3 = pkgs.callPackage ./nix/tests/s3.nix {inherit self;}; socket-activation = pkgs.callPackage ./nix/tests/socket-activation.nix {inherit self;}; diff --git a/ncro/src/cli.rs b/ncro/src/cli.rs index 1aaca9b..41553be 100644 --- a/ncro/src/cli.rs +++ b/ncro/src/cli.rs @@ -124,7 +124,7 @@ pub async fn run() -> anyhow::Result<()> { }); } - let router = Router::new( + let router = Router::new_with_trust( db.clone(), prober.clone(), cfg.cache.ttl.0, @@ -136,6 +136,7 @@ pub async fn run() -> anyhow::Result<()> { in_memory_negative_ttl: cfg.cache.mass_query.in_memory_negative_ttl.0, upstream_cooldown: cfg.cache.mass_query.upstream_cooldown.0, }, + cfg.trust.clone(), )?; for upstream in &cfg.upstreams { @@ -193,10 +194,27 @@ pub async fn run() -> anyhow::Result<()> { .iter() .filter_map(|p| hex::decode(&p.public_key).ok()?.try_into().ok()) .collect::>(); + // Relayed trust claims are only honored if signed by a key in this set: + // the explicit trust.trusted_keys plus every configured upstream's + // public_key. This keeps a quorum a count of *trusted* signers. + let trusted_keys = cfg + .trust + .trusted_keys + .iter() + .cloned() + .chain( + cfg + .upstreams + .iter() + .map(|u| u.public_key.clone()) + .filter(|k| !k.is_empty()), + ) + .collect::>(); ncro_mesh::listen_and_serve( &cfg.mesh.bind_addr, db.clone(), allowed, + trusted_keys, stop_rx.clone(), ) .await?; @@ -211,6 +229,7 @@ pub async fn run() -> anyhow::Result<()> { db.clone(), peers, cfg.mesh.gossip_interval.0, + cfg.mesh.gossip_trust_claims, stop_rx.clone(), )); } @@ -224,6 +243,7 @@ pub async fn run() -> anyhow::Result<()> { cache_priority: cfg.server.cache_priority, read_timeout: cfg.server.read_timeout.0, write_timeout: cfg.server.write_timeout.0, + trust: cfg.trust, })?; let listener = match inherited_listener() { Some(std_listener) => { diff --git a/nix/tests/mesh-trust.nix b/nix/tests/mesh-trust.nix new file mode 100644 index 0000000..eef8dcf --- /dev/null +++ b/nix/tests/mesh-trust.nix @@ -0,0 +1,290 @@ +{ + pkgs, + self, +}: let + # Two deterministic payloads. Each is built identically wherever it appears, + # so a given payload has the same store path, NAR hash, NAR size, and + # references on every node. Two nodes signing the *same* payload with + # different keys therefore produce claims over the same content tuple, which + # is exactly what a quorum counts. + # + # payload1 is the legitimate artifact served by node1 and node2. + # payload2 is only ever served by the attacker node. + payload1 = pkgs.runCommand "ncro-mesh-trust-payload1" {} '' + mkdir -p "$out" + echo "ncro mesh trust legitimate payload" > "$out/data" + ''; + payload2 = pkgs.runCommand "ncro-mesh-trust-payload2" {} '' + mkdir -p "$out" + echo "ncro mesh trust attacker payload" > "$out/data" + ''; + + # Fixed Nix binary-cache key pairs, generated offline with + # `nix-store --generate-binary-cache-key`, embedded so the test is fully + # deterministic. A and B are the two legitimate signers; C is an attacker key + # that no node trusts. + secretKeyA = "ncro-mesh-a-1:m1ZXX1CzD1Z6bKrkFMhKK2oJp0G8cJJ/lVFtwt41b9dEbFtwAit6sKmpGJHf6NV3M44Mte7dFXvcT8+rGlzPNg=="; + publicKeyA = "ncro-mesh-a-1:RGxbcAIrerCpqRiR3+jVdzOODLXu3RV73E/PqxpczzY="; + secretKeyB = "ncro-mesh-b-1:fFG4duKzMAMfn+TZ3WLlOIKIBuClekB5+DNYhXFhMXFuVKfcgHlKGWsNU/0Ot3yAdMfI8pMLKVpVWkRxabU2QQ=="; + publicKeyB = "ncro-mesh-b-1:blSn3IB5ShlrDVP9Drd8gHTHyPKTCylaVVpEcWm1NkE="; + secretKeyC = "ncro-mesh-evil-1:1cB1EK64GyfEeozRh8b/uEDSrmh8qzxwa5pFdsdx7HdjIUhWt7AeIemVrTJXMIoocvZvkpPgpO3zdlUq5uP76A=="; + + meshPort = 7946; + + # Peers are addressed by IP rather than hostname: the ncro unit is hardened + # (no AF_UNIX), so glibc cannot reach nscd to resolve names, and the + # documented mesh config uses `IP:port` anyway. + # + # The test framework assigns 192.168.1. on eth1 by the *alphabetically + # sorted* node name, so the order is: evil=.1, node1=.2, node2=.3. Only node1 + # and node2 are referenced as peers (evil only sends, and node1 accepts any + # signed packet), so evil's own address never needs naming here. + node1Ip = "192.168.1.2"; + node2Ip = "192.168.1.3"; + + # Build a node that runs nix-serve (signing `payload` with `secretKey`) and + # ncro in quorum mode, trusting `trustedKeys` and meshing with `peerIp`. + mkNode = { + secretKey, + selfPublicKey, + trustedKeys, + payload, + peerIp, + }: {pkgs, ...}: { + imports = [self.nixosModules.ncro]; + + virtualisation.memorySize = 2048; + virtualisation.diskSize = 8192; + + networking.useNetworkd = true; + networking.firewall.enable = false; + + environment.systemPackages = [pkgs.curl pkgs.jq]; + + nix.settings.experimental-features = ["nix-command"]; + + # The payload must be in the local store so this node's nix-serve can serve + # and sign it. + system.extraDependencies = [payload]; + + # nix-serve adds a Sig line to every narinfo it + # serves using this key, so no separate `nix store sign` step is needed. + environment.etc."ncro-mesh/cache.sec" = { + text = secretKey; + mode = "0400"; + }; + + services.nix-serve = { + enable = true; + secretKeyFile = "/etc/ncro-mesh/cache.sec"; + port = 5000; + }; + + services.ncro = { + enable = true; + settings = { + server.listen = ":8080"; + + # This node's own nix-serve. With one upstream there is exactly one local + # signer, so a quorum of two can ONLY be reached by importing a second + # *trusted* signer's claim over the mesh. + upstreams = [ + { + url = "http://127.0.0.1:5000"; + priority = 1; + public_key = selfPublicKey; + } + ]; + + # Keep negative caching short: a quorum-rejected lookup is cached as a + # miss, and we want that miss to expire before the post-quorum fetch and + # between poke retries. + cache = { + ttl = "5m"; + negative_ttl = "1s"; + db_path = "/var/lib/ncro/routes.db"; + mass_query.in_memory_negative_ttl = "500ms"; + }; + + trust = { + mode = "quorum"; + threshold = 2; + require_distinct_signers = true; + fail_closed = true; + # The signer keys whose claims may count toward a quorum. Without this + # an attacker could self-sign forged content under throwaway keys and + # manufacture agreement, so claims from any other key are dropped. + trusted_keys = trustedKeys; + }; + + mesh = { + enabled = true; + bind_addr = "0.0.0.0:${toString meshPort}"; + gossip_interval = "3s"; + gossip_trust_claims = true; + private_key = "/var/lib/ncro/node.key"; + + # No peer public_key; the allowlist is empty, so any *signed* packet is + # accepted at the transport layer. Relayed claims are still gated on + # (a) a trusted signer key and (b) a valid narinfo signature, which is + # the guarantee under test. + peers = [{addr = "${peerIp}:${toString meshPort}";}]; + }; + }; + }; + }; +in + pkgs.testers.runNixOSTest { + name = "ncro-mesh-trust"; + + nodes = { + # node1/node2: the legitimate quorum. Each trusts both A and B, so a claim + # relayed from the other counts. + # Indexes 1 and 2 -> node1Ip / node2Ip. + node1 = mkNode { + secretKey = secretKeyA; + selfPublicKey = publicKeyA; + trustedKeys = [publicKeyA publicKeyB]; + payload = payload1; + peerIp = node2Ip; + }; + node2 = mkNode { + secretKey = secretKeyB; + selfPublicKey = publicKeyB; + trustedKeys = [publicKeyA publicKeyB]; + payload = payload1; + peerIp = node1Ip; + }; + + # An attacker that serves a *different* payload signed with an untrusted key (C) + # and relays the resulting claim to node1. node1 must drop it. + # Neither node1 nor node2 lists C in trusted_keys. + # XXX: I wanted to name this node '>:C' + evil = mkNode { + secretKey = secretKeyC; + selfPublicKey = "ncro-mesh-evil-1:YyFIVrewHiHpla0yVzCKKHL2b5KT4KTt83ZVKubj++g="; + trustedKeys = ["ncro-mesh-evil-1:YyFIVrewHiHpla0yVzCKKHL2b5KT4KTt83ZVKubj++g="]; + payload = payload2; + peerIp = node1Ip; + }; + }; + + testScript = '' + import json + + hash1 = "${payload1}".split("/")[3].split("-")[0] + hash2 = "${payload2}".split("/")[3].split("-")[0] + + def trust(node, store_hash): + """Parsed /trust JSON for a store hash on the given node.""" + out = node.succeed(f"curl -sf http://localhost:8080/trust/{store_hash}.narinfo") + return json.loads(out) + + def poke_until(node, store_hash, jq_cond, timeout=60): + """ + Repeatedly request the narinfo (so ncro fetches it from the local + nix-serve, verifies the signature, and records a claim) until the + /trust JSON satisfies jq_cond. Retrying absorbs nix-serve's cold + PreFork startup, where the first request can fail to connect. + """ + node.wait_until_succeeds( + f"curl -s -o /dev/null http://localhost:8080/{store_hash}.narinfo; " + f"curl -sf http://localhost:8080/trust/{store_hash}.narinfo | jq -e '{jq_cond}'", + timeout=timeout, + ) + + def wait_until_trusted(node, store_hash, timeout=90): + node.wait_until_succeeds( + f"curl -sf http://localhost:8080/trust/{store_hash}.narinfo " + "| jq -e '.trusted == true and .matching_claims >= 2'", + timeout=timeout, + ) + + with subtest("boot all nodes"): + start_all() + for node in (node1, node2, evil): + node.wait_for_unit("nix-serve.service") + node.wait_for_unit("ncro.service") + node.wait_for_open_port(5000) # nix-serve + node.wait_for_open_port(8080) # ncro HTTP + for node in (node1, node2, evil): + node.wait_until_succeeds( + "journalctl -u ncro --no-pager | grep -q 'mesh node identity'" + ) + + with subtest("a single trusted signer cannot satisfy a quorum of two"): + # node1 records its own claim (signer A). node2 has not observed the + # path yet, so it has no claim B to relay, and node1 stays at one + # matching claim and is not trusted. + poke_until(node1, hash1, ".matching_claims >= 1") + status1 = trust(node1, hash1) + assert status1["mode"] == "quorum", f"unexpected mode: {status1!r}" + assert status1["matching_claims"] == 1, \ + f"node1 should have exactly its own claim: {status1!r}" + assert status1["trusted"] is False, \ + f"node1 must not be trusted with a single signer: {status1!r}" + + with subtest("quorum forms across the mesh once a second signer is seen"): + # node2 observes the same payload from its own nix-serve, recording a + # claim from a distinct trusted signer (B). Both nodes gossip their + # verified claims; each re-verifies the relayed claim against its + # original Nix signer key before storing it. After convergence both + # hold two distinct trusted-signer claims for the same content tuple. + poke_until(node2, hash1, ".matching_claims >= 1") + wait_until_trusted(node1, hash1) + wait_until_trusted(node2, hash1) + + for node, name in ((node1, "node1"), (node2, "node2")): + status = trust(node, hash1) + signers = {c["signer_key"] for c in status["claims"]} + assert "${publicKeyA}" in signers, f"{name}: missing signer A: {signers!r}" + assert "${publicKeyB}" in signers, f"{name}: missing signer B: {signers!r}" + + with subtest("a quorum from untrusted keys is rejected (not security theatre)"): + # The attacker serves payload2 signed with key C and gossips the claim + # to node1. C is in nobody's trusted_keys, so node1 must drop it: the + # claim never reaches node1's store and payload2 is never trusted. + # XXX: This is the property that makes distinct-signer quorum meaningful. Without + # the trusted-key gate, an attacker could mint unlimited distinct keys and forge + # agreement. + poke_until(evil, hash2, ".claims | length >= 1") + + # Prove the attacker's gossip actually reached node1 and was rejected + # for the right reason, rather than the test passing because nothing + # arrived. + node1.wait_until_succeeds( + "journalctl -u ncro --no-pager " + "| grep -q 'rejecting relayed trust claim from untrusted signer key'", + timeout=60, + ) + + # Give any (wrongly accepted) claim ample time to land, then assert it + # => node1 holds no claim for payload2 and does not trust it. + node1.sleep(8) + evil_status = trust(node1, hash2) + assert evil_status["matching_claims"] == 0, \ + f"untrusted-key claims must not count: {evil_status!r}" + assert evil_status["trusted"] is False, \ + f"payload2 must never be trusted on node1: {evil_status!r}" + assert evil_status["claims"] == [], \ + f"untrusted-key claims must not be stored: {evil_status!r}" + + with subtest("a trusted, quorum-backed path is served end to end"): + # The quorum-rejected miss has expired (negative_ttl=1s), so a fresh + # request re-races, passes the quorum gate, and serves a signed + # narinfo. Fetch the closure through ncro to prove it end to end. + node1.succeed("nix store delete ${payload1} 2>/dev/null || true") + narinfo = node1.succeed(f"curl -sf http://localhost:8080/{hash1}.narinfo") + assert "Sig:" in narinfo, f"served narinfo is unsigned: {narinfo!r}" + + node1.succeed( + "nix copy --from http://localhost:8080 " + "--extra-trusted-public-keys '${publicKeyA} ${publicKeyB}' " + "${payload1} 2>&1" + ) + node1.succeed("test -f ${payload1}/data") + node1.succeed("grep -q 'legitimate payload' ${payload1}/data") + + print("ncro mesh trust + malicious-rejection test passed.") + ''; + }