Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/omnigraph-cli/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,13 @@ impl GraphClient {
token,
} => {
let data = std::fs::read_to_string(data)?;
// RFC-009 Phase 5: the canonical `load` verb targets the
// canonical `/load` route (the deprecated `ingest` verb below
// still rides `/ingest`).
let output = remote_json::<IngestOutput>(
http,
Method::POST,
remote_url(base_url, "/ingest"),
remote_url(base_url, "/load"),
Some(serde_json::to_value(IngestRequest {
branch: Some(branch.to_string()),
from: from.map(ToOwned::to_owned),
Expand Down
6 changes: 3 additions & 3 deletions crates/omnigraph-cli/tests/parity_matrix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ fn parity_errors_share_exit_codes() {
//
// - `graphs list`: server-only today; becomes Both-capability when the
// embedded arm enumerates the cluster catalog (RFC-009 open Q3, answered).
// - `ingest`: deprecated alias of load; the remote `load` arm itself rides
// the deprecated /ingest route today (RFC-009 Phase 5 flips it to /load —
// this matrix's `parity_load` row is where that flip becomes visible).
// - `ingest`: deprecated alias of load; its remote arm rides the deprecated
// /ingest route. The canonical `load` verb targets `/load` (RFC-009 Phase 5,
// landed) — `parity_load` exercises it on the remote arm.
// - `init`, `optimize`, `repair`, `cleanup`, `cluster *`: storage-plane by
// design (must work with the server down); Phase 4 declares this.
#[allow(dead_code)]
Expand Down
140 changes: 101 additions & 39 deletions crates/omnigraph-server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1183,46 +1183,22 @@ pub(crate) async fn server_schema_apply(
Ok(Json(schema_apply_output(handle.uri.as_str(), result)))
}

#[utoipa::path(
post,
path = "/ingest",
tag = "mutations",
operation_id = "ingest",
request_body = IngestRequest,
responses(
(status = 200, description = "Ingest results", body = IngestOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Bulk-load NDJSON data into a branch.
///
/// `data` is NDJSON with one record per line. `mode` controls behavior on
/// existing rows: `merge` upserts by id (default), `append` blindly inserts,
/// `overwrite` replaces table contents. Branch creation is opt-in by
/// presence of `from`: with `from` set, a missing `branch` is created from
/// it; without `from`, `branch` must already exist — a missing branch is a
/// 404, never an implicit fork. **Destructive** when `mode` is `overwrite`
/// or when the load produces conflicting writes.
pub(crate) async fn server_ingest(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<IngestRequest>,
) -> std::result::Result<Json<IngestOutput>, ApiError> {
/// Shared body for `POST /load` (canonical) and `POST /ingest` (deprecated):
/// branch-exists / fork-if-`from` check, Cedar authorization, admission, the
/// bulk `load_as`, and the `IngestOutput` mapping.
async fn run_ingest(
state: AppState,
handle: Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
request: IngestRequest,
) -> std::result::Result<IngestOutput, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
let from = request.from;
let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.map(|actor| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor
.as_ref()
.map(|Extension(actor)| actor.actor_id.as_ref());
let actor_id = actor.map(|actor| actor.actor_id.as_ref());

let branch_exists = {
let db = &handle.engine;
Expand All @@ -1244,7 +1220,7 @@ pub(crate) async fn server_ingest(
)));
}
Some(from) => authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchCreate,
Expand All @@ -1255,7 +1231,7 @@ pub(crate) async fn server_ingest(
}
}
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Change,
Expand All @@ -1276,12 +1252,98 @@ pub(crate) async fn server_ingest(
.map_err(ApiError::from_omni)?
};

Ok(Json(ingest_output(
Ok(ingest_output(
handle.uri.as_str(),
&result,
mode,
actor_id.map(str::to_string),
)))
))
}

#[utoipa::path(
post,
path = "/load",
tag = "mutations",
operation_id = "load",
request_body = IngestRequest,
responses(
(status = 200, description = "Load results", body = IngestOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Bulk-load NDJSON data into a branch (canonical load endpoint).
///
/// `data` is NDJSON with one record per line. `mode` controls behavior on
/// existing rows: `merge` upserts by id (default), `append` blindly inserts,
/// `overwrite` replaces table contents. Branch creation is opt-in by
/// presence of `from`: with `from` set, a missing `branch` is created from
/// it; without `from`, `branch` must already exist — a missing branch is a
/// 404, never an implicit fork. **Destructive** when `mode` is `overwrite`
/// or when the load produces conflicting writes.
///
/// The legacy `POST /ingest` route has identical semantics and is kept as a
/// deprecated alias.
pub(crate) async fn server_load(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<IngestRequest>,
) -> std::result::Result<Json<IngestOutput>, ApiError> {
Ok(Json(
run_ingest(
state,
handle,
actor.as_ref().map(|Extension(actor)| actor),
request,
)
.await?,
))
}

#[utoipa::path(
post,
path = "/ingest",
tag = "mutations",
operation_id = "ingest",
request_body = IngestRequest,
responses(
(status = 200, description = "Load results (response includes `Deprecation: true` + `Link: </load>; rel=\"successor-version\"`)", body = IngestOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
#[deprecated(note = "use POST /load instead; /ingest is kept indefinitely for back-compat")]
/// **Deprecated** — use [`POST /load`](#tag/mutations/operation/load) instead.
///
/// Bulk-load NDJSON data into a branch. Behavior is unchanged; the route is
/// kept indefinitely for back-compat. New integrations should target
/// `POST /load`, which has identical semantics. Responses from this route
/// include `Deprecation: true` and `Link: </load>; rel="successor-version"`
/// headers per RFC 9745 / RFC 8288 so SDKs and proxies can surface the signal.
pub(crate) async fn server_ingest(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<IngestRequest>,
) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<IngestOutput>), ApiError> {
let output = run_ingest(
state,
handle,
actor.as_ref().map(|Extension(actor)| actor),
request,
)
.await?;
Ok((
deprecation_headers("</load>; rel=\"successor-version\""),
Json(output),
))
}
Comment on lines +1330 to 1347

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 TypeScript SDK still targets /ingest with no /load method

omnigraph-ts's client.ts hardcodes POST /ingest in its ingest() method, and the SDK has no load() method for POST /load. Starting with this deployment, every ingest() call the SDK makes will receive Deprecation: true and Link: </load>; rel="successor-version" response headers with no corresponding canonical path for SDK consumers to migrate to. The spec/openapi.json pinned in that repo also predates this change and won't reflect the deprecation or the new /load operation. Since /ingest is kept indefinitely this isn't blocking, but a companion PR to regenerate the SDK types and add a load() method is the intended completion of this migration for TypeScript consumers.

Fix in Claude Code


#[utoipa::path(
Expand Down
18 changes: 16 additions & 2 deletions crates/omnigraph-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ fn hash_bearer_token(token: &str) -> BearerTokenHash {
handlers::server_invoke_query,
handlers::server_schema_apply,
handlers::server_schema_get,
handlers::server_ingest,
handlers::server_load,
// deprecated; the #[deprecated] attribute on the handler surfaces as
// `deprecated: true` on the OpenAPI operation.
#[allow(deprecated)] handlers::server_ingest,
handlers::server_branch_list,
handlers::server_branch_create,
handlers::server_branch_delete,
Expand Down Expand Up @@ -934,9 +937,20 @@ pub fn build_app(state: AppState) -> Router {
.route("/queries/{name}", post(server_invoke_query))
.route("/schema", get(server_schema_get))
.route("/schema/apply", post(server_schema_apply))
.route(
"/load",
post(server_load).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
)
// /ingest is the deprecated alias of /load; its handler carries
// #[deprecated] (OpenAPI operation flagged) and emits RFC 9745
// Deprecation + RFC 8288 Link headers. Suppress the call-site warning.
.route(
"/ingest",
post(server_ingest).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
post({
#[allow(deprecated)]
server_ingest
})
.layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
)
.route(
"/branches",
Expand Down
77 changes: 77 additions & 0 deletions crates/omnigraph-server/tests/data_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,83 @@ async fn change_endpoint_emits_deprecation_headers() {
);
}

#[tokio::test(flavor = "multi_thread")]
async fn load_endpoint_loads_into_existing_branch() {
// Canonical bulk-load endpoint (RFC-009 Phase 5). Same wire shape as
// /ingest, no deprecation signal.
let (_temp, app) = app_for_loaded_graph().await;
let request = IngestRequest {
branch: Some("main".to_string()),
from: None,
mode: Some(LoadMode::Merge),
data: r#"{"type":"Person","data":{"name":"Loaded","age":7}}"#.to_string(),
};
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/load")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert!(
response.headers().get("deprecation").is_none(),
"POST /load must not advertise itself as deprecated"
);
let body_bytes = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let body: Value = serde_json::from_slice(&body_bytes).unwrap();
assert_eq!(body["branch"], "main");
assert_eq!(body["tables"][0]["table_key"], "node:Person");
}

#[tokio::test(flavor = "multi_thread")]
async fn ingest_endpoint_emits_deprecation_headers() {
// `/ingest` is the deprecated alias of `/load` (RFC-009 Phase 5): flagged
// at runtime per RFC 9745 (`Deprecation: true`) + RFC 8288 (`Link: </load>;
// rel="successor-version"`). The OpenAPI side is covered by
// `openapi_ingest_is_deprecated` in tests/openapi.rs.
let (_temp, app) = app_for_loaded_graph().await;
let request = IngestRequest {
branch: Some("main".to_string()),
from: None,
mode: Some(LoadMode::Merge),
data: r#"{"type":"Person","data":{"name":"Legacyer","age":33}}"#.to_string(),
};
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/ingest")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
response
.headers()
.get("deprecation")
.and_then(|v| v.to_str().ok()),
Some("true"),
"POST /ingest must advertise `Deprecation: true` (RFC 9745)"
);
assert_eq!(
response.headers().get("link").and_then(|v| v.to_str().ok()),
Some("</load>; rel=\"successor-version\""),
"POST /ingest must point at /load via `Link` rel=successor-version (RFC 8288)"
);
}

#[tokio::test(flavor = "multi_thread")]
async fn read_endpoint_emits_deprecation_headers() {
// `/read` is kept indefinitely for byte-stable back-compat but flagged
Expand Down
28 changes: 28 additions & 0 deletions crates/omnigraph-server/tests/openapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ const EXPECTED_PATHS: &[&str] = &[
"/queries/{name}",
"/schema",
"/schema/apply",
"/load",
"/ingest",
"/branches",
"/branches/{branch}",
Expand Down Expand Up @@ -300,6 +301,32 @@ fn openapi_ingest_is_post() {
assert!(doc["paths"]["/ingest"]["post"].is_object());
}

#[test]
fn openapi_load_is_not_deprecated() {
// RFC-009 Phase 5: /load is the canonical bulk-load endpoint.
let doc = openapi_json();
assert!(doc["paths"]["/load"]["post"].is_object());
let deprecated = doc["paths"]["/load"]["post"]
.get("deprecated")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
assert!(
!deprecated,
"/load is the canonical load endpoint and must not be deprecated"
);
}

#[test]
fn openapi_ingest_is_deprecated() {
// RFC-009 Phase 5: /ingest is now the deprecated alias of /load.
let doc = openapi_json();
assert_eq!(
doc["paths"]["/ingest"]["post"]["deprecated"],
serde_json::Value::Bool(true),
"/ingest must be flagged deprecated now that /load is canonical"
);
}

#[test]
fn openapi_branches_supports_get_and_post() {
let doc = openapi_json();
Expand Down Expand Up @@ -705,6 +732,7 @@ fn protected_endpoints_reference_bearer_token_security() {
("/schema/apply", "post"),
("/queries", "get"),
("/queries/{name}", "post"),
("/load", "post"),
("/ingest", "post"),
("/export", "post"),
("/snapshot", "get"),
Expand Down
Loading
Loading