Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 58 additions & 2 deletions crates/omnigraph-policy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,14 @@ pub struct PolicyEngine {

impl PolicyConfig {
pub fn load(path: &Path) -> Result<Self> {
let config: Self = serde_yaml::from_str(&fs::read_to_string(path)?)?;
Self::from_source(&fs::read_to_string(path)?)
}

/// Parse + validate a policy from YAML source. The from-content twin of
/// `load` for callers whose policies don't live on the local filesystem
/// (e.g. a cluster catalog on object storage).
pub fn from_source(source: &str) -> Result<Self> {
let config: Self = serde_yaml::from_str(source)?;
config.validate()?;
Ok(config)
}
Expand Down Expand Up @@ -465,13 +472,26 @@ impl PolicyEngine {
PolicyCompiler::compile(&config, graph_id)
}

/// `load_graph` from YAML content instead of a file path — for policies
/// that live in a non-filesystem catalog (cluster object storage).
pub fn load_graph_from_source(source: &str, graph_id: &str) -> Result<Self> {
let config = PolicyConfig::from_source(source)?;
validate_kind_alignment(&config, PolicyEngineKind::Graph)?;
PolicyCompiler::compile(&config, graph_id)
}

/// Load a server-level policy file. Rejects rules whose actions
/// are per-graph (e.g. `read`, `change`) — those belong in a
/// per-graph policy file, not the server one. Takes no `graph_id`:
/// server-scoped actions resolve against the singleton
/// `Omnigraph::Server::"root"` entity, never a Graph.
pub fn load_server(path: &Path) -> Result<Self> {
let config = PolicyConfig::load(path)?;
Self::load_server_from_source(&fs::read_to_string(path)?)
}

/// `load_server` from YAML content instead of a file path.
pub fn load_server_from_source(source: &str) -> Result<Self> {
let config = PolicyConfig::from_source(source)?;
validate_kind_alignment(&config, PolicyEngineKind::Server)?;
// The Graph entity created by the compiler is never referenced
// by a server-scoped rule, so the label below is purely a
Expand Down Expand Up @@ -1002,6 +1022,42 @@ impl PolicyChecker for PolicyEngine {

#[cfg(test)]
mod tests {

#[test]
fn from_source_twins_match_path_loaders() {
let yaml = r#"
version: 1
groups:
readers: ["act-r"]
protected_branches: [main]
rules:
- id: r1
allow:
actors: { group: readers }
actions: [read]
branch_scope: any
"#;
let config = PolicyConfig::from_source(yaml).unwrap();
assert_eq!(config.version, 1);
let engine = PolicyEngine::load_graph_from_source(yaml, "g1").unwrap();
drop(engine);

let server_yaml = r#"
version: 1
kind: server
groups:
admins: ["act-a"]
rules:
- id: s1
allow:
actors: { group: admins }
actions: [graph_list]
"#;
PolicyEngine::load_server_from_source(server_yaml).unwrap();
// Kind misalignment stays loud through the from-source path.
assert!(PolicyEngine::load_graph_from_source(server_yaml, "g1").is_err());
assert!(PolicyEngine::load_server_from_source(yaml).is_err());
}
use super::{
PolicyAction, PolicyCompiler, PolicyConfig, PolicyEngine, PolicyExpectation, PolicyRequest,
PolicyTestCase, PolicyTestConfig,
Expand Down
1 change: 1 addition & 0 deletions crates/omnigraph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ serde_json = { workspace = true }
reqwest = { workspace = true }
object_store = { workspace = true }
ulid = { workspace = true }
sha2 = { workspace = true }
base64 = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
Expand Down
38 changes: 38 additions & 0 deletions crates/omnigraph/src/db/omnigraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2029,6 +2029,25 @@ edge WorksAt: Person -> Company
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
self.inner.list_dir(dir_uri).await
}

async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
self.inner.read_text_versioned(uri).await
}

async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
self.inner
.write_text_if_match(uri, contents, expected_version)
.await
}

async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
self.inner.delete_prefix(prefix_uri).await
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -2071,6 +2090,25 @@ edge WorksAt: Person -> Company
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
self.inner.list_dir(dir_uri).await
}

async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
self.inner.read_text_versioned(uri).await
}

async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
self.inner
.write_text_if_match(uri, contents, expected_version)
.await
}

async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
self.inner.delete_prefix(prefix_uri).await
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down
203 changes: 203 additions & 0 deletions crates/omnigraph/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,39 @@ pub trait StorageAdapter: Debug + Send + Sync {
/// Returns full URIs (same scheme as `dir_uri`). The result is unordered.
/// Returns Ok(empty) if the directory does not exist or is empty.
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>>;
/// Read a text object together with its backend version token (S3: the
/// object's ETag; local: sha256 of the content). The token is opaque —
/// valid only for `write_text_if_match` against the same adapter.
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)>;
/// Replace the object at `uri` only if its current version still matches
/// `expected_version` (obtained from a prior versioned read/write on this
/// adapter). Returns `Ok(Some(new_version))` on success and `Ok(None)`
/// when the precondition failed (a concurrent writer won — the CAS-lost
/// case callers must surface, never swallow). S3 uses a conditional put
/// (If-Match); local compares content then replaces via temp + rename —
/// the same single-machine semantics the callers had before this trait,
/// safe under the callers' own lock protocol but not a cross-process
/// barrier by itself.
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>>;
/// Recursively delete every object under `prefix_uri`. Returns Ok(())
/// when nothing exists there (idempotent). Local: `remove_dir_all`;
/// S3: list + delete (NOT atomic — callers must tolerate partial
/// prefixes on crash, which the cluster delete protocol does by retry).
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()>;
}

/// Version token for local files: content identity. ETags are unavailable
/// on the filesystem; sha256 is stable, cheap at these object sizes, and
/// already the cluster ledger's CAS vocabulary.
fn local_version_token(bytes: &[u8]) -> String {
use sha2::{Digest, Sha256};
let digest = Sha256::digest(bytes);
digest.iter().map(|byte| format!("{byte:02x}")).collect()
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -149,6 +182,49 @@ impl StorageAdapter for LocalStorageAdapter {
}
Ok(out)
}

async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
let path = local_path_from_uri(uri)?;
let bytes = tokio::fs::read(&path).await?;
let version = local_version_token(&bytes);
let text = String::from_utf8(bytes).map_err(|err| {
OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
})?;
Ok((text, version))
}

async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
let path = local_path_from_uri(uri)?;
let current = match tokio::fs::read(&path).await {
Ok(bytes) => bytes,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err.into()),
};
if local_version_token(&current) != expected_version {
return Ok(None);
}
let tmp = path.with_extension(format!("tmp.{}", ulid::Ulid::new()));
tokio::fs::write(&tmp, contents.as_bytes()).await?;
if let Err(err) = tokio::fs::rename(&tmp, &path).await {
let _ = tokio::fs::remove_file(&tmp).await;
return Err(err.into());
}
Ok(Some(local_version_token(contents.as_bytes())))
}

async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
let path = local_path_from_uri(prefix_uri)?;
match tokio::fs::remove_dir_all(&path).await {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err.into()),
}
}
}

#[async_trait]
Expand Down Expand Up @@ -276,6 +352,84 @@ impl StorageAdapter for S3StorageAdapter {
}
Ok(out)
}

async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
let location = self.object_path(uri)?;
let result = self
.store
.get(&location)
.await
.map_err(|err| storage_backend_error("read", uri, err))?;
let etag = result.meta.e_tag.clone();
let bytes = result
.bytes()
.await
.map_err(|err| storage_backend_error("read", uri, err))?;
// Every S3-compatible store we target returns ETags; fall back to a
// content token rather than failing if one ever omits it.
let version = etag.unwrap_or_else(|| local_version_token(&bytes));
let text = String::from_utf8(bytes.to_vec()).map_err(|err| {
OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
})?;
Ok((text, version))
}

async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
let location = self.object_path(uri)?;
let mode = PutMode::Update(object_store::UpdateVersion {
e_tag: Some(expected_version.to_string()),
version: None,
});
match self
.store
.put_opts(
&location,
PutPayload::from(contents.as_bytes().to_vec()),
mode.into(),
)
.await
{
Ok(result) => Ok(Some(
result
.e_tag
.unwrap_or_else(|| local_version_token(contents.as_bytes())),
)),
Err(object_store::Error::Precondition { .. })
| Err(object_store::Error::NotFound { .. }) => Ok(None),
Err(err) => Err(storage_backend_error("write_if_match", uri, err)),
}
}

async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
let dir_with_slash = if prefix_uri.ends_with('/') {
prefix_uri.to_string()
} else {
format!("{}/", prefix_uri)
};
let prefix_loc = self.object_path(&dir_with_slash)?;
let mut entries = self.store.list(Some(&prefix_loc));
let mut locations = Vec::new();
while let Some(meta) = entries
.try_next()
.await
.map_err(|err| storage_backend_error("delete_prefix", prefix_uri, err))?
{
locations.push(meta.location);
}
for location in locations {
match self.store.delete(&location).await {
Ok(()) => {}
Err(object_store::Error::NotFound { .. }) => {}
Err(err) => return Err(storage_backend_error("delete_prefix", prefix_uri, err)),
}
}
Ok(())
}
}

impl S3StorageAdapter {
Expand Down Expand Up @@ -444,6 +598,55 @@ fn env_var_truthy(key: &str) -> bool {

#[cfg(test)]
mod tests {

#[tokio::test]
async fn local_versioned_cas_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/state.json", dir.path().display());
let adapter = LocalStorageAdapter;
adapter.write_text(&uri, "v1").await.unwrap();
let (text, version) = adapter.read_text_versioned(&uri).await.unwrap();
assert_eq!(text, "v1");

// Matching token replaces and returns the next token.
let next = adapter
.write_text_if_match(&uri, "v2", &version)
.await
.unwrap()
.expect("fresh token must win");
assert_ne!(next, version);
// The stale token must lose (CAS-lost is Ok(None), never silent).
assert!(
adapter
.write_text_if_match(&uri, "v3", &version)
.await
.unwrap()
.is_none()
);
let (text, _) = adapter.read_text_versioned(&uri).await.unwrap();
assert_eq!(text, "v2");
// Missing object: precondition can't hold.
let missing = format!("{}/absent.json", dir.path().display());
assert!(
adapter
.write_text_if_match(&missing, "x", &version)
.await
.unwrap()
.is_none()
);
}

#[tokio::test]
async fn local_delete_prefix_is_recursive_and_idempotent() {
let dir = tempfile::tempdir().unwrap();
let root = format!("{}/tree", dir.path().display());
let adapter = LocalStorageAdapter;
adapter.write_text(&format!("{root}/a.txt"), "a").await.unwrap();
adapter.write_text(&format!("{root}/sub/b.txt"), "b").await.unwrap();
adapter.delete_prefix(&root).await.unwrap();
assert!(!adapter.exists(&format!("{root}/a.txt")).await.unwrap());
adapter.delete_prefix(&root).await.unwrap(); // absent -> Ok
}
use super::*;

#[test]
Expand Down
Loading
Loading