Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ jobs:
crates/omnigraph/src/storage.rs) run_rustfs_ci=true ;;
crates/omnigraph/src/db/manifest.rs|crates/omnigraph/src/db/manifest/*) run_rustfs_ci=true ;;
crates/omnigraph/tests/s3_storage.rs|crates/omnigraph/tests/helpers/*) run_rustfs_ci=true ;;
crates/omnigraph-server/tests/server.rs) run_rustfs_ci=true ;;
crates/omnigraph-cluster/src/store.rs|crates/omnigraph-cluster/src/serve.rs) run_rustfs_ci=true ;;
crates/omnigraph-cluster/tests/s3_cluster.rs) run_rustfs_ci=true ;;
crates/omnigraph-server/tests/s3.rs|crates/omnigraph-server/tests/support/*) run_rustfs_ci=true ;;
crates/omnigraph-cli/tests/system_local.rs) run_rustfs_ci=true ;;
esac
done
Expand Down Expand Up @@ -351,10 +353,14 @@ jobs:
run: cargo test --locked -p omnigraph-engine --test s3_storage -- --nocapture

- name: Run RustFS server smoke
# The exact test name (not a loose substring): a filter that matches
# nothing passes vacuously, which silently ran zero tests here for a
# while (the old filter said s3_repo; the test says s3_graph).
run: cargo test --locked -p omnigraph-server --test s3 server_opens_s3_graph_directly_and_serves_snapshot_and_read -- --nocapture
# No name filter: every test in the s3 target is bucket-gated, and a
# filter matching nothing passes vacuously (which silently ran zero
# tests here for a while — the old filter said s3_repo, the test
# said s3_graph).
run: cargo test --locked -p omnigraph-server --test s3 -- --nocapture

- name: Run RustFS cluster e2e
run: cargo test --locked -p omnigraph-cluster --test s3_cluster -- --nocapture

- name: Run RustFS CLI smoke
run: cargo test --locked -p omnigraph-cli --test system_local local_cli_s3_end_to_end_init_load_read_flow -- --nocapture
Expand Down
2 changes: 1 addition & 1 deletion crates/omnigraph-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mod store;
use store::{ClusterStore, StateLockGuard, StateSnapshot};
pub use types::*;
use types::*;
pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot};
pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot, read_serving_snapshot_from_storage};
use config::{QueriesDecl, observe_declared_graphs, validate_cluster_header, future_field_diagnostics, initial_import_state, observe_live_graph, preview_schema_migration, state_resource_digests, graph_address, policy_address, query_address, schema_address, load_desired, normalize_policy_target, parse_cluster_config, resolve_config_path, resolve_query_decls, validate_id, validate_query_source};
use diff::{FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind};
use sweep::{mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, tombstone_graph_subtree, warn_pending_recovery_sidecars};
Expand Down
33 changes: 25 additions & 8 deletions crates/omnigraph-cluster/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ pub struct ServingQuery {
#[derive(Debug, Clone)]
pub struct ServingPolicy {
pub name: String,
pub blob_path: PathBuf,
/// The policy bundle CONTENT, digest-verified against the applied
/// revision at read time. Content, not a path: the catalog may live on
/// object storage, and the server must not re-read mutable state.
pub source: String,
pub applies_to: Vec<String>,
}

Expand All @@ -44,7 +47,6 @@ pub async fn read_serving_snapshot(
config_dir: impl AsRef<Path>,
) -> Result<ServingSnapshot, Vec<Diagnostic>> {
let config_dir = config_dir.as_ref().to_path_buf();
let mut diagnostics: Vec<Diagnostic> = Vec::new();
// The declared storage: root decides where the ledger/catalog/graphs
// live; config parse errors surface through the normal validation path.
let parsed = parse_cluster_config(&config_dir);
Expand All @@ -62,6 +64,25 @@ pub async fn read_serving_snapshot(
},
None => ClusterStore::for_config_dir(&config_dir),
};
read_snapshot_with_store(backend).await
}

/// Read the applied revision directly from a storage root URI — config-free
/// serving: a `--cluster s3://bucket/prefix` server needs no local files at
/// all, only the bucket and credentials. The ledger and catalog ARE the
/// deployment artifact.
pub async fn read_serving_snapshot_from_storage(
storage_root: &str,
) -> Result<ServingSnapshot, Vec<Diagnostic>> {
let backend =
ClusterStore::for_storage_root(storage_root).map_err(|diagnostic| vec![diagnostic])?;
read_snapshot_with_store(backend).await
}

async fn read_snapshot_with_store(
backend: ClusterStore,
) -> Result<ServingSnapshot, Vec<Diagnostic>> {
let mut diagnostics: Vec<Diagnostic> = Vec::new();

// A ledger a sweep is about to rewrite must not start serving.
let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await;
Expand Down Expand Up @@ -136,13 +157,9 @@ pub async fn read_serving_snapshot(
continue;
};
match backend.read_verified_payload(&kind, &entry.digest, address).await {
Ok(_) => policies.push(ServingPolicy {
Ok(source) => policies.push(ServingPolicy {
name: name.clone(),
blob_path: PathBuf::from(
backend
.payload_display(&kind, &entry.digest)
.expect("policy kind always has a payload path"),
),
source,
applies_to,
}),
Err(diagnostic) => diagnostics.push(diagnostic),
Expand Down
4 changes: 3 additions & 1 deletion crates/omnigraph-cluster/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2843,7 +2843,9 @@ policies:
assert!(snapshot.queries[0].source.contains("query find_person"));
assert_eq!(snapshot.policies.len(), 1);
assert_eq!(snapshot.policies[0].applies_to, vec!["graph.knowledge"]);
assert!(snapshot.policies[0].blob_path.exists());
// Content, not a path: the catalog may live on object storage.
// The fixture bundle is `rules: []` — assert the verified text.
assert!(snapshot.policies[0].source.contains("rules:"));
}

#[tokio::test]
Expand Down
162 changes: 162 additions & 0 deletions crates/omnigraph-cluster/tests/s3_cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
//! Cluster-on-object-storage end-to-end (RFC-006): the full control-plane
//! lifecycle with `storage: s3://…` — import, apply (graph roots + catalog
//! on the bucket), serving snapshots from both the config dir and the bare
//! storage URI, schema evolution, and the approved delete (prefix removal).
//!
//! Gated like every S3 suite: skips unless `OMNIGRAPH_S3_TEST_BUCKET` is
//! set (CI runs it against containerized RustFS; locally use the RustFS
//! binary + `AWS_*` env, see docs/dev/testing.md).
//!
//! Runtime flavor is multi_thread on purpose: the state-lock guard's
//! drop-time release uses block_in_place on object stores, which is the
//! production (CLI) runtime shape — and the lock-release regression this
//! suite pins (a spawned delete dying with a short-lived runtime) only
//! reproduces realistically under it.

use std::env;
use std::fs;

use omnigraph_cluster::{
apply_config_dir, import_config_dir, read_serving_snapshot,
read_serving_snapshot_from_storage, status_config_dir, validate_config_dir,
};
use ulid::Ulid;

const SCHEMA_V1: &str = "node Person {\n name: String @key\n}\n";
const SCHEMA_V2: &str = "node Person {\n name: String @key\n title: String?\n}\n";
const FIND_PERSON_GQ: &str = "query find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n";
const POLICY_YAML: &str = r#"
version: 1
actors:
- id: act-admin
roles: [admin]
rules:
- effect: permit
actions: [read, change, schema_apply, branch_create, branch_delete, branch_merge]
roles: [admin]
"#;

/// Unique per-run storage root under the test bucket, or None to skip.
fn s3_storage_root(suite: &str) -> Option<String> {
let bucket = env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?;
Some(format!("s3://{bucket}/cluster-e2e/{suite}-{}", Ulid::new()))
}

fn write_cluster_fixture(dir: &std::path::Path, storage_root: &str, schema: &str) {
fs::write(dir.join("people.pg"), schema).unwrap();
fs::create_dir_all(dir.join("queries")).unwrap();
fs::write(dir.join("queries/people.gq"), FIND_PERSON_GQ).unwrap();
fs::write(dir.join("intel.policy.yaml"), POLICY_YAML).unwrap();
fs::write(
dir.join("cluster.yaml"),
format!(
r#"version: 1
storage: {storage_root}
graphs:
knowledge:
schema: people.pg
queries: queries/
policies:
intel:
file: intel.policy.yaml
applies_to: [graph.knowledge]
"#
),
)
.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn s3_cluster_full_lifecycle_import_apply_serve_evolve_delete() {
let Some(root) = s3_storage_root("lifecycle") else {
eprintln!("skipping s3 cluster e2e: OMNIGRAPH_S3_TEST_BUCKET is not set");
return;
};
let dir = tempfile::tempdir().unwrap();
write_cluster_fixture(dir.path(), &root, SCHEMA_V1);

// validate is config-only and must pass before any bucket I/O.
let validate = validate_config_dir(dir.path());
assert!(validate.ok, "{:?}", validate.diagnostics);

let import = import_config_dir(dir.path()).await;
assert!(import.ok, "{:?}", import.diagnostics);

// The lock-release regression (caught live on the first smoke): the
// guard's drop must COMPLETE its bucket delete before the command
// returns — a follow-up command finding `state_lock_held` means the
// release was spawned into a dying runtime.
let status = status_config_dir(dir.path()).await;
assert!(status.ok, "{:?}", status.diagnostics);
assert!(
!status.state_observations.locked,
"import leaked the state lock on the bucket: {:?}",
status.state_observations
);

let apply = apply_config_dir(dir.path()).await;
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);

// Nothing stored locally: the config dir holds only declared sources.
assert!(!dir.path().join("__cluster").exists());
assert!(!dir.path().join("graphs").exists());

// Serving snapshot resolves through cluster.yaml's storage: key…
let via_config = read_serving_snapshot(dir.path()).await.unwrap();
assert_eq!(via_config.graphs.len(), 1);
let graph_root = via_config.graphs[0].root.to_string_lossy().to_string();
assert!(
graph_root.starts_with("s3://") && graph_root.ends_with("graphs/knowledge.omni"),
"{graph_root}"
);
assert_eq!(via_config.queries.len(), 1);
assert_eq!(via_config.policies.len(), 1);
assert!(
via_config.policies[0].source.contains("act-admin"),
"policy must carry verified content, not a path"
);

// …and config-free, straight from the bucket URI (the deployment
// payoff: a server needs only the URI and credentials).
let via_uri = read_serving_snapshot_from_storage(&root).await.unwrap();
assert_eq!(via_uri.graphs.len(), 1);
assert_eq!(
via_uri.graphs[0].root.to_string_lossy(),
via_config.graphs[0].root.to_string_lossy()
);
assert_eq!(via_uri.policies.len(), 1);

// Schema evolution converges on the bucket.
write_cluster_fixture(dir.path(), &root, SCHEMA_V2);
let evolve = apply_config_dir(dir.path()).await;
assert!(evolve.ok && evolve.converged, "{:?}", evolve.diagnostics);

// Approved delete: drop the graph from the config; the plan demands an
// approval, the approved apply prefix-deletes the bucket root.
fs::write(
dir.path().join("cluster.yaml"),
format!("version: 1\nstorage: {root}\ngraphs: {{}}\n"),
)
.unwrap();
let plan = omnigraph_cluster::plan_config_dir(dir.path()).await;
assert!(plan.ok, "{:?}", plan.diagnostics);
let approval = plan
.approvals_required
.first()
.expect("graph delete requires approval");
let approve = omnigraph_cluster::approve_config_dir(
dir.path(),
&approval.resource,
"e2e-operator",
)
.await;
assert!(approve.ok, "{:?}", approve.diagnostics);
let delete = apply_config_dir(dir.path()).await;
assert!(delete.ok && delete.converged, "{:?}", delete.diagnostics);

let after = read_serving_snapshot_from_storage(&root).await;
assert!(
after.is_err(),
"an empty cluster must refuse to serve, got {after:?}"
);
}
47 changes: 34 additions & 13 deletions crates/omnigraph-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,28 @@ pub enum ServerConfigMode {
config_path: PathBuf,
/// `server.policy.file` (server-level Cedar policy for the
/// management endpoints). Wired into `GET /graphs` authorization.
server_policy_file: Option<PathBuf>,
server_policy: Option<PolicySource>,
},
}

/// Where a Cedar policy bundle comes from at startup. File-based for
/// omnigraph.yaml deployments; inline (digest-verified catalog content)
/// for cluster-mode boots, where the catalog may live on object storage
/// and the server must not re-read mutable state after the snapshot.
#[derive(Debug, Clone)]
pub enum PolicySource {
File(PathBuf),
Inline(String),
}

/// One graph's startup-time configuration: id, opened URI, optional
/// per-graph policy file path. Constructed by `load_server_settings`
/// per-graph policy source. Constructed by `load_server_settings`
/// in multi mode; consumed by `serve`'s parallel open loop.
#[derive(Debug, Clone)]
pub struct GraphStartupConfig {
pub graph_id: String,
pub uri: String,
pub policy_file: Option<PathBuf>,
pub policy: Option<PolicySource>,
/// Per-graph stored-query registry, loaded and identity-checked at
/// settings-build time; type-checked against the schema when this
/// graph's engine opens.
Expand Down Expand Up @@ -994,9 +1004,9 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
ServerConfigMode::Single { policy_file, .. } => policy_file.is_some(),
ServerConfigMode::Multi {
graphs,
server_policy_file,
server_policy,
..
} => server_policy_file.is_some() || graphs.iter().any(|g| g.policy_file.is_some()),
} => server_policy.is_some() || graphs.iter().any(|g| g.policy.is_some()),
};
let runtime_state = classify_server_runtime_state(
!tokens.is_empty(),
Expand Down Expand Up @@ -1045,7 +1055,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
ServerConfigMode::Multi {
graphs,
config_path,
server_policy_file,
server_policy,
} => {
info!(
bind = %bind,
Expand All @@ -1054,7 +1064,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
config = %config_path.display(),
"serving omnigraph"
);
open_multi_graph_state(graphs, tokens, server_policy_file.as_ref(), config_path).await?
open_multi_graph_state(graphs, tokens, server_policy.as_ref(), config_path).await?
}
};

Expand All @@ -1065,6 +1075,14 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
Ok(())
}

/// Load a graph-scoped policy bundle from either source kind.
fn load_graph_policy(source: &PolicySource, graph_id: &str) -> Result<PolicyEngine> {
match source {
PolicySource::File(path) => Ok(PolicyEngine::load_graph(path, graph_id)?),
PolicySource::Inline(text) => Ok(PolicyEngine::load_graph_from_source(text, graph_id)?),
}
}

/// Parallel open of every graph in the startup config, with bounded
/// concurrency (`buffer_unordered(4)`). Fail-fast — the first open error
/// aborts startup; other in-flight opens are dropped (their `Omnigraph`
Expand All @@ -1076,7 +1094,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
pub async fn open_multi_graph_state(
graphs: Vec<GraphStartupConfig>,
tokens: Vec<(String, String)>,
server_policy_file: Option<&PathBuf>,
server_policy_source: Option<&PolicySource>,
config_path: PathBuf,
) -> Result<AppState> {
use futures::{StreamExt, TryStreamExt};
Expand All @@ -1089,8 +1107,11 @@ pub async fn open_multi_graph_state(
// The placeholder graph_id `"server"` is the sentinel the Cedar
// resource-model refactor maps to the singleton
// `Omnigraph::Server::"root"` entity at evaluation time.
let server_policy = match server_policy_file {
Some(path) => Some(PolicyEngine::load_server(path)?),
let server_policy = match server_policy_source {
Some(PolicySource::File(path)) => Some(PolicyEngine::load_server(path)?),
Some(PolicySource::Inline(source)) => {
Some(PolicyEngine::load_server_from_source(source)?)
}
None => None,
};

Expand Down Expand Up @@ -1128,9 +1149,9 @@ async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>>
// owned `Arc`, so no borrow of `db` survives into the match.
let queries = validate_and_attach(cfg.queries, &db.catalog(), graph_id.as_str())?;

let (policy_arc, db) = match &cfg.policy_file {
Some(path) => {
let policy = PolicyEngine::load_graph(path, graph_id.as_str())?;
let (policy_arc, db) = match &cfg.policy {
Some(source) => {
let policy = load_graph_policy(source, graph_id.as_str())?;
let policy_arc: Arc<PolicyEngine> = Arc::new(policy);
let checker = Arc::clone(&policy_arc) as Arc<dyn omnigraph_policy::PolicyChecker>;
(Some(policy_arc), db.with_policy(checker))
Expand Down
Loading
Loading