From b6f7da0593bb8fa2a0c97e9ee41727323495b6c4 Mon Sep 17 00:00:00 2001 From: Luiz Spies Date: Sat, 16 May 2026 01:40:22 +0200 Subject: [PATCH 1/3] feat(core,py): bulk-insert primitives for nodes and edges Adds insert_*_bulk methods that batch multiple inserts inside a single transaction with a reused prepare_cached statement. Closes the 8x build- time gap downstream consumers see when loading large graphs from grounded-index DBs (Python->Rust FFI per add_node was the bottleneck). Core (sqlitegraph-core): - SqliteGraph::insert_entities_bulk and insert_edges_bulk: BEGIN - prepare_cached(INSERT) - loop execute + last_insert_rowid - COMMIT. Empty input returns Ok(vec![]) without opening a transaction. On any error mid-batch: ROLLBACK and return the error; the database is left untouched. Returns rowids in input order. - GraphBackend::insert_nodes_bulk and insert_edges_bulk: trait methods with default implementations that loop the single-insert path, so any existing GraphBackend consumer keeps working at 2.3 -> 2.4 with no source changes. The &B blanket forwarders are wired through. - SqliteGraphBackend overrides both, dispatching to the new SqliteGraph bulk paths. Publisher events fire per row after commit to preserve single-insert observer semantics; no new batched event type. Python (sqlitegraph-py): - Graph.add_nodes_bulk(items: list[dict]) and add_edges_bulk(items): each dict carries the same fields as the kwargs-style add_node/add_edge. Missing required fields raise; valid items go through in one FFI call. Tests: - 8 Rust integration cases in tests/bulk_insert_tests.rs: input-order IDs, empty input, validation rollback, edge bulk parity, observable state matches a per-item loop. - 10 Python cases in tests/test_bulk_insert.py: both bulk paths, missing-field validation, data/file_path round-trip, parity with the per-item loop. Co-Authored-By: Claude Opus 4.7 (1M context) --- sqlitegraph-core/src/backend.rs | 37 ++++ sqlitegraph-core/src/backend/sqlite/impl_.rs | 56 ++++++ sqlitegraph-core/src/graph/edge_ops.rs | 60 +++++++ sqlitegraph-core/src/graph/entity_ops.rs | 57 ++++++ sqlitegraph-core/tests/bulk_insert_tests.rs | 173 +++++++++++++++++++ sqlitegraph-py/src/lib.rs | 90 ++++++++++ sqlitegraph-py/tests/test_bulk_insert.py | 127 ++++++++++++++ 7 files changed, 600 insertions(+) create mode 100644 sqlitegraph-core/tests/bulk_insert_tests.rs create mode 100644 sqlitegraph-py/tests/test_bulk_insert.py diff --git a/sqlitegraph-core/src/backend.rs b/sqlitegraph-core/src/backend.rs index c6394a50..2c2179d7 100644 --- a/sqlitegraph-core/src/backend.rs +++ b/sqlitegraph-core/src/backend.rs @@ -117,6 +117,35 @@ pub trait GraphBackend { fn insert_node(&self, node: NodeSpec) -> Result; fn insert_edge(&self, edge: EdgeSpec) -> Result; + /// Insert many nodes atomically. + /// + /// Default implementation loops single-insert for each item; backends + /// override with a single-transaction path. Returns the new node IDs in + /// the same order as `nodes`. All-or-nothing: if any insert fails the + /// caller should assume nothing was committed. + fn insert_nodes_bulk(&self, nodes: &[NodeSpec]) -> Result, SqliteGraphError> { + let mut ids = Vec::with_capacity(nodes.len()); + for n in nodes { + ids.push(self.insert_node(n.clone())?); + } + Ok(ids) + } + + /// Insert many edges atomically. + /// + /// Default implementation loops single-insert for each item; backends + /// override with a single-transaction path. Returns the new edge IDs in + /// the same order as `edges`. All-or-nothing semantics on the bulk path; + /// the default fallback inherits whatever atomicity the single + /// `insert_edge` provides. + fn insert_edges_bulk(&self, edges: &[EdgeSpec]) -> Result, SqliteGraphError> { + let mut ids = Vec::with_capacity(edges.len()); + for e in edges { + ids.push(self.insert_edge(e.clone())?); + } + Ok(ids) + } + /// Update an existing node in place without allocating a new node ID /// /// This modifies the data associated with an existing node while preserving @@ -565,6 +594,14 @@ where (*self).entity_ids() } + fn insert_nodes_bulk(&self, nodes: &[NodeSpec]) -> Result, SqliteGraphError> { + (*self).insert_nodes_bulk(nodes) + } + + fn insert_edges_bulk(&self, edges: &[EdgeSpec]) -> Result, SqliteGraphError> { + (*self).insert_edges_bulk(edges) + } + fn neighbors( &self, snapshot_id: SnapshotId, diff --git a/sqlitegraph-core/src/backend/sqlite/impl_.rs b/sqlitegraph-core/src/backend/sqlite/impl_.rs index 9619a745..04da27fa 100644 --- a/sqlitegraph-core/src/backend/sqlite/impl_.rs +++ b/sqlitegraph-core/src/backend/sqlite/impl_.rs @@ -296,6 +296,62 @@ impl crate::backend::GraphBackend for SqliteGraphBackend { Ok(id) } + fn insert_nodes_bulk(&self, nodes: &[NodeSpec]) -> Result, SqliteGraphError> { + let entities: Vec = nodes + .iter() + .map(|node| GraphEntity { + id: 0, + kind: node.kind.clone(), + name: node.name.clone(), + file_path: node.file_path.clone(), + data: node.data.clone(), + }) + .collect(); + let ids = self.graph.insert_entities_bulk(&entities)?; + + // Emit per-row events after the commit, matching single-insert + // observer semantics. + let pub_guard = self.publisher.read(); + if let Some(ref publisher) = *pub_guard { + for id in &ids { + publisher.emit(PubSubEvent::NodeChanged { + node_id: *id, + snapshot_id: 0, + }); + } + } + + Ok(ids) + } + + fn insert_edges_bulk(&self, edges: &[EdgeSpec]) -> Result, SqliteGraphError> { + let graph_edges: Vec = edges + .iter() + .map(|edge| GraphEdge { + id: 0, + from_id: edge.from, + to_id: edge.to, + edge_type: edge.edge_type.clone(), + data: edge.data.clone(), + }) + .collect(); + let ids = self.graph.insert_edges_bulk(&graph_edges)?; + + let pub_guard = self.publisher.read(); + if let Some(ref publisher) = *pub_guard { + for (id, edge) in ids.iter().zip(edges.iter()) { + publisher.emit(PubSubEvent::EdgeChanged { + from_node: edge.from, + to_node: edge.to, + edge_id: *id, + snapshot_id: 0, + }); + } + } + + Ok(ids) + } + fn delete_entity(&self, id: i64) -> Result<(), SqliteGraphError> { self.graph.delete_entity(id) } diff --git a/sqlitegraph-core/src/graph/edge_ops.rs b/sqlitegraph-core/src/graph/edge_ops.rs index d164ab03..e122e302 100644 --- a/sqlitegraph-core/src/graph/edge_ops.rs +++ b/sqlitegraph-core/src/graph/edge_ops.rs @@ -29,6 +29,66 @@ impl SqliteGraph { Ok(self.connection().last_insert_rowid()) } + /// Insert many edges atomically inside a single transaction. + /// + /// Returns the rowids of the inserted edges in the same order as the + /// input. Empty input returns an empty vector without opening a + /// transaction. On any error, the transaction is rolled back and the + /// database is left untouched. + pub fn insert_edges_bulk(&self, edges: &[GraphEdge]) -> Result, SqliteGraphError> { + if edges.is_empty() { + return Ok(Vec::new()); + } + for edge in edges { + validate_edge(edge)?; + if !self.entity_exists(edge.from_id)? || !self.entity_exists(edge.to_id)? { + return Err(SqliteGraphError::invalid_input( + "edge endpoints must reference existing entities", + )); + } + } + let conn = self.connection(); + conn.underlying() + .execute_batch("BEGIN") + .map_err(|e| SqliteGraphError::query(e.to_string()))?; + + let mut ids = Vec::with_capacity(edges.len()); + let insert_result: Result<(), SqliteGraphError> = (|| { + let mut stmt = conn + .prepare_cached( + "INSERT INTO graph_edges(from_id, to_id, edge_type, data) VALUES(?1, ?2, ?3, ?4)", + ) + .map_err(|e| SqliteGraphError::query(e.to_string()))?; + for edge in edges { + let data = serde_json::to_string(&edge.data) + .map_err(|e| SqliteGraphError::invalid_input(e.to_string()))?; + stmt.execute(params![ + edge.from_id, + edge.to_id, + edge.edge_type.as_str(), + data, + ]) + .map_err(|e| SqliteGraphError::query(e.to_string()))?; + ids.push(conn.last_insert_rowid()); + } + Ok(()) + })(); + + match insert_result { + Ok(()) => { + conn.underlying() + .execute_batch("COMMIT") + .map_err(|e| SqliteGraphError::query(e.to_string()))?; + self.invalidate_caches(); + Ok(ids) + } + Err(err) => { + let _ = conn.underlying().execute_batch("ROLLBACK"); + Err(err) + } + } + } + pub fn get_edge(&self, id: i64) -> Result { self.connection() .query_row( diff --git a/sqlitegraph-core/src/graph/entity_ops.rs b/sqlitegraph-core/src/graph/entity_ops.rs index f0d42c2c..b0e5579a 100644 --- a/sqlitegraph-core/src/graph/entity_ops.rs +++ b/sqlitegraph-core/src/graph/entity_ops.rs @@ -28,6 +28,63 @@ impl SqliteGraph { Ok(self.connection().last_insert_rowid()) } + /// Insert many entities atomically inside a single transaction. + /// + /// Returns the rowids of the inserted entities in the same order as the + /// input. Empty input returns an empty vector without opening a + /// transaction. On any error, the transaction is rolled back and the + /// database is left untouched. + pub fn insert_entities_bulk( + &self, + entities: &[GraphEntity], + ) -> Result, SqliteGraphError> { + if entities.is_empty() { + return Ok(Vec::new()); + } + for entity in entities { + validate_entity(entity)?; + } + let conn = self.connection(); + conn.underlying() + .execute_batch("BEGIN") + .map_err(|e| SqliteGraphError::query(e.to_string()))?; + + let mut ids = Vec::with_capacity(entities.len()); + let insert_result: Result<(), SqliteGraphError> = (|| { + let mut stmt = conn + .prepare_cached( + "INSERT INTO graph_entities(kind, name, file_path, data) VALUES(?1, ?2, ?3, ?4)", + ) + .map_err(|e| SqliteGraphError::query(e.to_string()))?; + for entity in entities { + let data = serde_json::to_string(&entity.data) + .map_err(|e| SqliteGraphError::invalid_input(e.to_string()))?; + stmt.execute(params![ + entity.kind.as_str(), + entity.name.as_str(), + entity.file_path.as_deref(), + data, + ]) + .map_err(|e| SqliteGraphError::query(e.to_string()))?; + ids.push(conn.last_insert_rowid()); + } + Ok(()) + })(); + + match insert_result { + Ok(()) => { + conn.underlying() + .execute_batch("COMMIT") + .map_err(|e| SqliteGraphError::query(e.to_string()))?; + Ok(ids) + } + Err(err) => { + let _ = conn.underlying().execute_batch("ROLLBACK"); + Err(err) + } + } + } + pub fn get_entity(&self, id: i64) -> Result { self.connection() .query_row( diff --git a/sqlitegraph-core/tests/bulk_insert_tests.rs b/sqlitegraph-core/tests/bulk_insert_tests.rs new file mode 100644 index 00000000..37ec3915 --- /dev/null +++ b/sqlitegraph-core/tests/bulk_insert_tests.rs @@ -0,0 +1,173 @@ +//! Tests for bulk insert primitives on `SqliteGraph` and `GraphBackend`. + +use serde_json::json; +use sqlitegraph::{ + GraphEdge, GraphEntity, SqliteGraph, SqliteGraphBackend, + backend::{EdgeSpec, GraphBackend, NodeSpec}, +}; + +fn entity(kind: &str, name: &str) -> GraphEntity { + GraphEntity { + id: 0, + kind: kind.to_string(), + name: name.to_string(), + file_path: None, + data: json!({}), + } +} + +fn edge(from: i64, to: i64, kind: &str) -> GraphEdge { + GraphEdge { + id: 0, + from_id: from, + to_id: to, + edge_type: kind.to_string(), + data: json!({}), + } +} + +fn node_spec(kind: &str, name: &str) -> NodeSpec { + NodeSpec { + kind: kind.to_string(), + name: name.to_string(), + file_path: None, + data: json!({}), + } +} + +fn edge_spec(from: i64, to: i64, kind: &str) -> EdgeSpec { + EdgeSpec { + from: from, + to: to, + edge_type: kind.to_string(), + data: json!({}), + } +} + +#[test] +fn insert_entities_bulk_returns_ids_in_input_order() { + let graph = SqliteGraph::open_in_memory().expect("graph"); + let entities = vec![ + entity("Function", "a"), + entity("Function", "b"), + entity("Function", "c"), + ]; + let ids = graph + .insert_entities_bulk(&entities) + .expect("bulk insert entities"); + assert_eq!(ids.len(), 3); + assert!(ids[0] < ids[1]); + assert!(ids[1] < ids[2]); + // Verify roundtrip + let stored = graph.get_entity(ids[1]).expect("get"); + assert_eq!(stored.name, "b"); +} + +#[test] +fn insert_entities_bulk_empty_input_returns_empty_vec() { + let graph = SqliteGraph::open_in_memory().expect("graph"); + let ids = graph.insert_entities_bulk(&[]).expect("bulk empty"); + assert!(ids.is_empty()); +} + +#[test] +fn insert_entities_bulk_rolls_back_on_error() { + let graph = SqliteGraph::open_in_memory().expect("graph"); + // Insert one valid entity to anchor a baseline count. + graph + .insert_entity(&entity("Function", "baseline")) + .unwrap(); + + // Build a batch where the second entity has an invalid (empty) name. + let entities = vec![ + entity("Function", "valid_one"), + entity("Function", ""), // validate_entity rejects empty name + entity("Function", "valid_two"), + ]; + let result = graph.insert_entities_bulk(&entities); + assert!(result.is_err(), "expected error for invalid entity"); + + // Count must remain 1 — the partial inserts in this batch were rolled back. + let ids = graph.list_entity_ids().unwrap(); + assert_eq!(ids.len(), 1, "expected rollback to undo partial inserts"); +} + +#[test] +fn insert_edges_bulk_returns_ids_in_input_order() { + let graph = SqliteGraph::open_in_memory().expect("graph"); + let a = graph.insert_entity(&entity("Node", "a")).unwrap(); + let b = graph.insert_entity(&entity("Node", "b")).unwrap(); + let c = graph.insert_entity(&entity("Node", "c")).unwrap(); + + let edges = vec![ + edge(a, b, "CALL"), + edge(b, c, "CALL"), + edge(a, c, "IMPORTS"), + ]; + let ids = graph.insert_edges_bulk(&edges).expect("bulk insert edges"); + assert_eq!(ids.len(), 3); + assert!(ids[0] < ids[1]); + assert!(ids[1] < ids[2]); +} + +#[test] +fn insert_edges_bulk_empty_input_returns_empty_vec() { + let graph = SqliteGraph::open_in_memory().expect("graph"); + let ids = graph.insert_edges_bulk(&[]).expect("bulk empty"); + assert!(ids.is_empty()); +} + +#[test] +fn graph_backend_insert_nodes_bulk_via_sqlite_backend() { + let backend = SqliteGraphBackend::in_memory().expect("backend"); + let specs = vec![ + node_spec("Function", "alpha"), + node_spec("Function", "beta"), + node_spec("Function", "gamma"), + ]; + let ids = backend + .insert_nodes_bulk(&specs) + .expect("bulk insert nodes"); + assert_eq!(ids.len(), 3); + assert!(ids[0] < ids[1]); +} + +#[test] +fn graph_backend_insert_edges_bulk_via_sqlite_backend() { + let backend = SqliteGraphBackend::in_memory().expect("backend"); + let node_specs = vec![node_spec("Node", "a"), node_spec("Node", "b")]; + let ids = backend.insert_nodes_bulk(&node_specs).expect("nodes"); + let edge_specs = vec![edge_spec(ids[0], ids[1], "LINK")]; + let edge_ids = backend + .insert_edges_bulk(&edge_specs) + .expect("bulk insert edges"); + assert_eq!(edge_ids.len(), 1); +} + +#[test] +fn bulk_insert_matches_single_insert_observable_state() { + let single = SqliteGraph::open_in_memory().expect("single"); + let bulk = SqliteGraph::open_in_memory().expect("bulk"); + + let entities = vec![ + entity("Function", "a"), + entity("Function", "b"), + entity("Function", "c"), + ]; + for e in &entities { + single.insert_entity(e).unwrap(); + } + let bulk_ids = bulk.insert_entities_bulk(&entities).unwrap(); + + // Same observable state: same names, same kinds, same file_paths, + // same id ordering, same count. + assert_eq!(bulk_ids.len(), 3); + for (i, id) in bulk_ids.iter().enumerate() { + let stored = bulk.get_entity(*id).unwrap(); + assert_eq!(stored.name, entities[i].name); + assert_eq!(stored.kind, entities[i].kind); + } + let bulk_count = bulk.list_entity_ids().unwrap().len(); + let single_count = single.list_entity_ids().unwrap().len(); + assert_eq!(bulk_count, single_count); +} diff --git a/sqlitegraph-py/src/lib.rs b/sqlitegraph-py/src/lib.rs index 4c2ecef6..ba54f019 100644 --- a/sqlitegraph-py/src/lib.rs +++ b/sqlitegraph-py/src/lib.rs @@ -133,6 +133,51 @@ impl Graph { self.backend.insert_node(spec).map_err(into_pyerr) } + /// Insert many nodes atomically inside a single transaction. + /// + /// Args: + /// items: List of dicts, each with `kind` and `name` (required) plus + /// optional `data` (dict) and `file_path` (str). + /// + /// Returns: + /// List of new node IDs in the same order as ``items``. + /// + /// Raises: + /// InvalidArgumentError if any item is missing required fields or + /// fails validation. On error the transaction is rolled back; no + /// nodes are inserted. + fn add_nodes_bulk(&self, items: Vec>) -> PyResult> { + let mut specs = Vec::with_capacity(items.len()); + for item in items.iter() { + let kind: String = item + .get_item("kind")? + .ok_or_else(|| PyException::new_err("each item must have a 'kind' field"))? + .extract()?; + let name: String = item + .get_item("name")? + .ok_or_else(|| PyException::new_err("each item must have a 'name' field"))? + .extract()?; + let file_path: Option = match item.get_item("file_path")? { + Some(v) if !v.is_none() => Some(v.extract()?), + _ => None, + }; + let data = match item.get_item("data")? { + Some(v) if !v.is_none() => { + let dict = v.cast::()?; + dict_to_json(dict)? + } + _ => serde_json::json!({}), + }; + specs.push(NodeSpec { + kind, + name, + file_path, + data, + }); + } + self.backend.insert_nodes_bulk(&specs).map_err(into_pyerr) + } + /// Get a node by ID. Returns a dict with keys: id, kind, name, data. fn get_node<'py>(&self, py: Python<'py>, id: i64) -> PyResult> { let entity = self @@ -230,6 +275,51 @@ impl Graph { self.backend.insert_edge(spec).map_err(into_pyerr) } + /// Insert many edges atomically inside a single transaction. + /// + /// Args: + /// items: List of dicts, each with `from_id`, `to_id`, and + /// `edge_type` (all required) plus optional `data` (dict). + /// + /// Returns: + /// List of new edge IDs in the same order as ``items``. + /// + /// Raises: + /// InvalidArgumentError if any item is missing required fields or + /// references non-existent endpoints. On error the transaction is + /// rolled back; no edges are inserted. + fn add_edges_bulk(&self, items: Vec>) -> PyResult> { + let mut specs = Vec::with_capacity(items.len()); + for item in items.iter() { + let from_id: i64 = item + .get_item("from_id")? + .ok_or_else(|| PyException::new_err("each item must have a 'from_id' field"))? + .extract()?; + let to_id: i64 = item + .get_item("to_id")? + .ok_or_else(|| PyException::new_err("each item must have a 'to_id' field"))? + .extract()?; + let edge_type: String = item + .get_item("edge_type")? + .ok_or_else(|| PyException::new_err("each item must have an 'edge_type' field"))? + .extract()?; + let data = match item.get_item("data")? { + Some(v) if !v.is_none() => { + let dict = v.cast::()?; + dict_to_json(dict)? + } + _ => serde_json::json!({}), + }; + specs.push(EdgeSpec { + from: from_id, + to: to_id, + edge_type, + data, + }); + } + self.backend.insert_edges_bulk(&specs).map_err(into_pyerr) + } + /// Get neighbors of a node. /// /// Args: diff --git a/sqlitegraph-py/tests/test_bulk_insert.py b/sqlitegraph-py/tests/test_bulk_insert.py new file mode 100644 index 00000000..fb331341 --- /dev/null +++ b/sqlitegraph-py/tests/test_bulk_insert.py @@ -0,0 +1,127 @@ +"""Tests for the bulk insert primitives: add_nodes_bulk, add_edges_bulk.""" + +import pytest +import sqlitegraph + + +def _g(): + return sqlitegraph.Graph.open_in_memory() + + +def test_add_nodes_bulk_returns_ids_in_input_order(): + g = _g() + items = [ + {"kind": "Function", "name": "alpha"}, + {"kind": "Function", "name": "beta"}, + {"kind": "Function", "name": "gamma"}, + ] + ids = g.add_nodes_bulk(items) + assert len(ids) == 3 + assert ids[0] < ids[1] < ids[2] + + # Verify they actually round-trip. + middle = g.get_node(ids[1]) + assert middle["name"] == "beta" + + +def test_add_nodes_bulk_empty_returns_empty(): + g = _g() + assert g.add_nodes_bulk([]) == [] + + +def test_add_nodes_bulk_accepts_data_and_file_path(): + g = _g() + items = [ + { + "kind": "File", + "name": "main.rs", + "file_path": "src/main.rs", + "data": {"loc": 42, "tags": ["entry"]}, + }, + ] + ids = g.add_nodes_bulk(items) + assert len(ids) == 1 + node = g.get_node(ids[0]) + assert node["kind"] == "File" + assert node["name"] == "main.rs" + assert node["data"]["loc"] == 42 + assert node["data"]["tags"] == ["entry"] + + +def test_add_nodes_bulk_missing_kind_raises(): + g = _g() + with pytest.raises(Exception): + g.add_nodes_bulk([{"name": "alpha"}]) + + +def test_add_nodes_bulk_missing_name_raises(): + g = _g() + with pytest.raises(Exception): + g.add_nodes_bulk([{"kind": "Function"}]) + + +def test_add_edges_bulk_returns_ids_in_input_order(): + g = _g() + node_ids = g.add_nodes_bulk( + [ + {"kind": "N", "name": "a"}, + {"kind": "N", "name": "b"}, + {"kind": "N", "name": "c"}, + ] + ) + a, b, c = node_ids + items = [ + {"from_id": a, "to_id": b, "edge_type": "CALL"}, + {"from_id": b, "to_id": c, "edge_type": "CALL"}, + ] + edge_ids = g.add_edges_bulk(items) + assert len(edge_ids) == 2 + assert edge_ids[0] < edge_ids[1] + + +def test_add_edges_bulk_empty_returns_empty(): + g = _g() + assert g.add_edges_bulk([]) == [] + + +def test_add_edges_bulk_accepts_data(): + g = _g() + a, b = g.add_nodes_bulk( + [{"kind": "N", "name": "a"}, {"kind": "N", "name": "b"}] + ) + edge_ids = g.add_edges_bulk( + [{"from_id": a, "to_id": b, "edge_type": "CALL", "data": {"line": 17}}] + ) + edge = g.get_edge(edge_ids[0]) + assert edge["edge_type"] == "CALL" + assert edge["data"]["line"] == 17 + + +def test_add_edges_bulk_unknown_endpoint_raises(): + g = _g() + a, _ = g.add_nodes_bulk( + [{"kind": "N", "name": "a"}, {"kind": "N", "name": "b"}] + ) + with pytest.raises(Exception): + g.add_edges_bulk( + [{"from_id": a, "to_id": 999_999, "edge_type": "CALL"}] + ) + + +def test_bulk_matches_single_observable_state(): + """A bulk call produces the same observable graph as a per-item loop.""" + g_bulk = _g() + g_single = _g() + + items = [ + {"kind": "N", "name": f"node_{i}"} for i in range(50) + ] + bulk_ids = g_bulk.add_nodes_bulk(items) + single_ids = [ + g_single.add_node(kind=item["kind"], name=item["name"]) for item in items + ] + assert len(bulk_ids) == len(single_ids) + + # Round-trip names match + for nid_bulk, nid_single in zip(bulk_ids, single_ids): + assert g_bulk.get_node(nid_bulk)["name"] == g_single.get_node(nid_single)["name"] From dfecc16d175463e2d42f2018e9abcca1f0aaa1d1 Mon Sep 17 00:00:00 2001 From: Luiz Spies Date: Sat, 16 May 2026 01:40:41 +0200 Subject: [PATCH 2/3] release: bump versions and changelogs for 2.4.0 / 0.3.0 - sqlitegraph-core: 2.3.0 -> 2.4.0 (new GraphBackend::insert_*_bulk trait methods with default impls; SqliteGraph::insert_*_bulk transactional bulk paths; SqliteGraphBackend overrides). SemVer minor. - sqlitegraph-py: 0.2.0 -> 0.3.0 (Graph.add_nodes_bulk and add_edges_bulk Python methods). SemVer minor. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 12 ++++++++++++ sqlitegraph-core/Cargo.toml | 2 +- sqlitegraph-py/CHANGELOG.md | 21 +++++++++++++++++++++ sqlitegraph-py/Cargo.toml | 2 +- sqlitegraph-py/pyproject.toml | 2 +- 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4452f29d..fcf8765f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # SQLiteGraph Changelog +## [2.4.0] - 2026-05-16 + +### Added +- **`SqliteGraph::insert_entities_bulk` and `insert_edges_bulk`** — Atomic transactional bulk insert with a single `prepare_cached` INSERT statement reused across rows. Empty input returns `Ok(vec![])` without opening a transaction; on any error the transaction is rolled back and the database is left unchanged. Returns rowids in input order. +- **`GraphBackend::insert_nodes_bulk` and `insert_edges_bulk`** — Trait methods with default implementations that loop the single-insert path (so existing implementations remain source-compatible). `&B` blanket forwarders included. +- **`SqliteGraphBackend` overrides** that dispatch to the new `SqliteGraph` bulk paths. Publisher events fire per row after commit, matching single-insert observer semantics. +- **`bulk_insert_tests.rs`** — 8 integration tests: input-order IDs, empty input, transactional rollback on validation error, edge-bulk parity, observable state matches per-item loop. + +### Notes +- The default trait impl preserves correct semantics for third-party `GraphBackend` consumers: they get a working bulk method without modifications, at single-insert performance. Override for speed. +- V3Backend inherits the default loop impl; a future patch can route through `WriteBatchGuard` for native batched writes. + ## [2.3.0] - 2026-05-15 ### Added diff --git a/sqlitegraph-core/Cargo.toml b/sqlitegraph-core/Cargo.toml index a52a10a2..d68fc345 100644 --- a/sqlitegraph-core/Cargo.toml +++ b/sqlitegraph-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sqlitegraph" -version = "2.3.0" +version = "2.4.0" edition = "2024" description = "Embedded graph database with full ACID transactions, HNSW vector search, dual backend support, and comprehensive graph algorithms library" license = "GPL-3.0" diff --git a/sqlitegraph-py/CHANGELOG.md b/sqlitegraph-py/CHANGELOG.md index 8d8a8a23..912ff1eb 100644 --- a/sqlitegraph-py/CHANGELOG.md +++ b/sqlitegraph-py/CHANGELOG.md @@ -3,6 +3,27 @@ This file tracks releases of the `sqlitegraph` package on PyPI. The Rust crate of the same name has its own changelog at the repository root. +## [0.3.0] - 2026-05-16 + +### Added +- **`Graph.add_nodes_bulk(items: list[dict])`** — Insert many nodes in a + single FFI call inside one transaction. Each dict must have `kind` and + `name`; `data` (dict) and `file_path` (str) are optional. Returns + IDs in input order. +- **`Graph.add_edges_bulk(items: list[dict])`** — Insert many edges in a + single FFI call inside one transaction. Each dict must have `from_id`, + `to_id`, and `edge_type`; `data` (dict) is optional. Returns IDs in + input order. +- **10 new pytest cases** in `tests/test_bulk_insert.py` covering both + bulk paths, missing-field validation, data round-trip, and parity + with per-item single-insert. + +### Notes +- Built against `sqlitegraph` (Rust) **v2.4.0**, which adds the + underlying `GraphBackend::insert_nodes_bulk` and `insert_edges_bulk` + trait methods. +- All existing `add_node`/`add_edge` signatures are unchanged. + ## [0.2.0] - 2026-05-15 ### Added diff --git a/sqlitegraph-py/Cargo.toml b/sqlitegraph-py/Cargo.toml index 7c86a199..477ed0d7 100644 --- a/sqlitegraph-py/Cargo.toml +++ b/sqlitegraph-py/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sqlitegraph-py" -version = "0.2.0" +version = "0.3.0" edition = "2021" description = "Python bindings for sqlitegraph via PyO3" license = "GPL-3.0-only" diff --git a/sqlitegraph-py/pyproject.toml b/sqlitegraph-py/pyproject.toml index 15937808..8d39d8e8 100644 --- a/sqlitegraph-py/pyproject.toml +++ b/sqlitegraph-py/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "sqlitegraph" -version = "0.2.0" +version = "0.3.0" description = "Embedded graph database with HNSW vector search — Python bindings to the sqlitegraph Rust crate." license = { text = "GPL-3.0-only" } authors = [{ name = "Luiz Spies" }] From 76bde83cd8362631470d4b452d7ad491bac0078b Mon Sep 17 00:00:00 2001 From: Luiz Spies Date: Sat, 16 May 2026 01:51:04 +0200 Subject: [PATCH 3/3] fix(py): typed exceptions + ruff format on bulk-insert path Self-heals the python CI step on PR #5: - Replace bare PyException::new_err with InvalidArgumentError::new_err for the missing-field validators on add_nodes_bulk/add_edges_bulk so callers see a sqlitegraph-typed exception instead of a generic one. - Update test_bulk_insert.py to assert InvalidArgumentError specifically (silences ruff B017) and pass strict=True to zip (silences ruff B905). - Apply ruff format to the new test file. Co-Authored-By: Claude Opus 4.7 (1M context) --- sqlitegraph-py/src/lib.rs | 16 +++++++++---- sqlitegraph-py/tests/test_bulk_insert.py | 29 ++++++++---------------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/sqlitegraph-py/src/lib.rs b/sqlitegraph-py/src/lib.rs index ba54f019..631d8243 100644 --- a/sqlitegraph-py/src/lib.rs +++ b/sqlitegraph-py/src/lib.rs @@ -151,11 +151,11 @@ impl Graph { for item in items.iter() { let kind: String = item .get_item("kind")? - .ok_or_else(|| PyException::new_err("each item must have a 'kind' field"))? + .ok_or_else(|| InvalidArgumentError::new_err("each item must have a 'kind' field"))? .extract()?; let name: String = item .get_item("name")? - .ok_or_else(|| PyException::new_err("each item must have a 'name' field"))? + .ok_or_else(|| InvalidArgumentError::new_err("each item must have a 'name' field"))? .extract()?; let file_path: Option = match item.get_item("file_path")? { Some(v) if !v.is_none() => Some(v.extract()?), @@ -293,15 +293,21 @@ impl Graph { for item in items.iter() { let from_id: i64 = item .get_item("from_id")? - .ok_or_else(|| PyException::new_err("each item must have a 'from_id' field"))? + .ok_or_else(|| { + InvalidArgumentError::new_err("each item must have a 'from_id' field") + })? .extract()?; let to_id: i64 = item .get_item("to_id")? - .ok_or_else(|| PyException::new_err("each item must have a 'to_id' field"))? + .ok_or_else(|| { + InvalidArgumentError::new_err("each item must have a 'to_id' field") + })? .extract()?; let edge_type: String = item .get_item("edge_type")? - .ok_or_else(|| PyException::new_err("each item must have an 'edge_type' field"))? + .ok_or_else(|| { + InvalidArgumentError::new_err("each item must have an 'edge_type' field") + })? .extract()?; let data = match item.get_item("data")? { Some(v) if !v.is_none() => { diff --git a/sqlitegraph-py/tests/test_bulk_insert.py b/sqlitegraph-py/tests/test_bulk_insert.py index fb331341..2822ae13 100644 --- a/sqlitegraph-py/tests/test_bulk_insert.py +++ b/sqlitegraph-py/tests/test_bulk_insert.py @@ -2,6 +2,7 @@ import pytest import sqlitegraph +from sqlitegraph import InvalidArgumentError def _g(): @@ -50,13 +51,13 @@ def test_add_nodes_bulk_accepts_data_and_file_path(): def test_add_nodes_bulk_missing_kind_raises(): g = _g() - with pytest.raises(Exception): + with pytest.raises(InvalidArgumentError): g.add_nodes_bulk([{"name": "alpha"}]) def test_add_nodes_bulk_missing_name_raises(): g = _g() - with pytest.raises(Exception): + with pytest.raises(InvalidArgumentError): g.add_nodes_bulk([{"kind": "Function"}]) @@ -86,9 +87,7 @@ def test_add_edges_bulk_empty_returns_empty(): def test_add_edges_bulk_accepts_data(): g = _g() - a, b = g.add_nodes_bulk( - [{"kind": "N", "name": "a"}, {"kind": "N", "name": "b"}] - ) + a, b = g.add_nodes_bulk([{"kind": "N", "name": "a"}, {"kind": "N", "name": "b"}]) edge_ids = g.add_edges_bulk( [{"from_id": a, "to_id": b, "edge_type": "CALL", "data": {"line": 17}}] ) @@ -99,13 +98,9 @@ def test_add_edges_bulk_accepts_data(): def test_add_edges_bulk_unknown_endpoint_raises(): g = _g() - a, _ = g.add_nodes_bulk( - [{"kind": "N", "name": "a"}, {"kind": "N", "name": "b"}] - ) - with pytest.raises(Exception): - g.add_edges_bulk( - [{"from_id": a, "to_id": 999_999, "edge_type": "CALL"}] - ) + a, _ = g.add_nodes_bulk([{"kind": "N", "name": "a"}, {"kind": "N", "name": "b"}]) + with pytest.raises(InvalidArgumentError): + g.add_edges_bulk([{"from_id": a, "to_id": 999_999, "edge_type": "CALL"}]) def test_bulk_matches_single_observable_state(): @@ -113,15 +108,11 @@ def test_bulk_matches_single_observable_state(): g_bulk = _g() g_single = _g() - items = [ - {"kind": "N", "name": f"node_{i}"} for i in range(50) - ] + items = [{"kind": "N", "name": f"node_{i}"} for i in range(50)] bulk_ids = g_bulk.add_nodes_bulk(items) - single_ids = [ - g_single.add_node(kind=item["kind"], name=item["name"]) for item in items - ] + single_ids = [g_single.add_node(kind=item["kind"], name=item["name"]) for item in items] assert len(bulk_ids) == len(single_ids) # Round-trip names match - for nid_bulk, nid_single in zip(bulk_ids, single_ids): + for nid_bulk, nid_single in zip(bulk_ids, single_ids, strict=True): assert g_bulk.get_node(nid_bulk)["name"] == g_single.get_node(nid_single)["name"]