Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# SQLiteGraph Changelog

## [2.4.0] - 2026-05-16

### Added
- **`SqliteGraph::insert_entities_bulk` and `insert_edges_bulk`** — Atomic transactional bulk insert with a single `prepare_cached` INSERT statement reused across rows. Empty input returns `Ok(vec![])` without opening a transaction; on any error the transaction is rolled back and the database is left unchanged. Returns rowids in input order.
- **`GraphBackend::insert_nodes_bulk` and `insert_edges_bulk`** — Trait methods with default implementations that loop the single-insert path (so existing implementations remain source-compatible). `&B` blanket forwarders included.
- **`SqliteGraphBackend` overrides** that dispatch to the new `SqliteGraph` bulk paths. Publisher events fire per row after commit, matching single-insert observer semantics.
- **`bulk_insert_tests.rs`** — 8 integration tests: input-order IDs, empty input, transactional rollback on validation error, edge-bulk parity, observable state matches per-item loop.

### Notes
- The default trait impl preserves correct semantics for third-party `GraphBackend` consumers: they get a working bulk method without modifications, at single-insert performance. Override for speed.
- V3Backend inherits the default loop impl; a future patch can route through `WriteBatchGuard` for native batched writes.

## [2.3.0] - 2026-05-15

### Added
Expand Down
2 changes: 1 addition & 1 deletion sqlitegraph-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sqlitegraph"
version = "2.3.0"
version = "2.4.0"
edition = "2024"
description = "Embedded graph database with full ACID transactions, HNSW vector search, dual backend support, and comprehensive graph algorithms library"
license = "GPL-3.0"
Expand Down
37 changes: 37 additions & 0 deletions sqlitegraph-core/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,35 @@ pub trait GraphBackend {
fn insert_node(&self, node: NodeSpec) -> Result<i64, SqliteGraphError>;
fn insert_edge(&self, edge: EdgeSpec) -> Result<i64, SqliteGraphError>;

/// Insert many nodes atomically.
///
/// Default implementation loops single-insert for each item; backends
/// override with a single-transaction path. Returns the new node IDs in
/// the same order as `nodes`. All-or-nothing: if any insert fails the
/// caller should assume nothing was committed.
fn insert_nodes_bulk(&self, nodes: &[NodeSpec]) -> Result<Vec<i64>, SqliteGraphError> {
let mut ids = Vec::with_capacity(nodes.len());
for n in nodes {
ids.push(self.insert_node(n.clone())?);
}
Ok(ids)
}

/// Insert many edges atomically.
///
/// Default implementation loops single-insert for each item; backends
/// override with a single-transaction path. Returns the new edge IDs in
/// the same order as `edges`. All-or-nothing semantics on the bulk path;
/// the default fallback inherits whatever atomicity the single
/// `insert_edge` provides.
fn insert_edges_bulk(&self, edges: &[EdgeSpec]) -> Result<Vec<i64>, SqliteGraphError> {
let mut ids = Vec::with_capacity(edges.len());
for e in edges {
ids.push(self.insert_edge(e.clone())?);
}
Ok(ids)
}

/// Update an existing node in place without allocating a new node ID
///
/// This modifies the data associated with an existing node while preserving
Expand Down Expand Up @@ -565,6 +594,14 @@ where
(*self).entity_ids()
}

fn insert_nodes_bulk(&self, nodes: &[NodeSpec]) -> Result<Vec<i64>, SqliteGraphError> {
(*self).insert_nodes_bulk(nodes)
}

fn insert_edges_bulk(&self, edges: &[EdgeSpec]) -> Result<Vec<i64>, SqliteGraphError> {
(*self).insert_edges_bulk(edges)
}

fn neighbors(
&self,
snapshot_id: SnapshotId,
Expand Down
56 changes: 56 additions & 0 deletions sqlitegraph-core/src/backend/sqlite/impl_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,62 @@ impl crate::backend::GraphBackend for SqliteGraphBackend {
Ok(id)
}

fn insert_nodes_bulk(&self, nodes: &[NodeSpec]) -> Result<Vec<i64>, SqliteGraphError> {
let entities: Vec<GraphEntity> = nodes
.iter()
.map(|node| GraphEntity {
id: 0,
kind: node.kind.clone(),
name: node.name.clone(),
file_path: node.file_path.clone(),
data: node.data.clone(),
})
.collect();
let ids = self.graph.insert_entities_bulk(&entities)?;

// Emit per-row events after the commit, matching single-insert
// observer semantics.
let pub_guard = self.publisher.read();
if let Some(ref publisher) = *pub_guard {
for id in &ids {
publisher.emit(PubSubEvent::NodeChanged {
node_id: *id,
snapshot_id: 0,
});
}
}

Ok(ids)
}

fn insert_edges_bulk(&self, edges: &[EdgeSpec]) -> Result<Vec<i64>, SqliteGraphError> {
let graph_edges: Vec<GraphEdge> = edges
.iter()
.map(|edge| GraphEdge {
id: 0,
from_id: edge.from,
to_id: edge.to,
edge_type: edge.edge_type.clone(),
data: edge.data.clone(),
})
.collect();
let ids = self.graph.insert_edges_bulk(&graph_edges)?;

let pub_guard = self.publisher.read();
if let Some(ref publisher) = *pub_guard {
for (id, edge) in ids.iter().zip(edges.iter()) {
publisher.emit(PubSubEvent::EdgeChanged {
from_node: edge.from,
to_node: edge.to,
edge_id: *id,
snapshot_id: 0,
});
}
}

Ok(ids)
}

fn delete_entity(&self, id: i64) -> Result<(), SqliteGraphError> {
self.graph.delete_entity(id)
}
Expand Down
60 changes: 60 additions & 0 deletions sqlitegraph-core/src/graph/edge_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,66 @@ impl SqliteGraph {
Ok(self.connection().last_insert_rowid())
}

/// Insert many edges atomically inside a single transaction.
///
/// Returns the rowids of the inserted edges in the same order as the
/// input. Empty input returns an empty vector without opening a
/// transaction. On any error, the transaction is rolled back and the
/// database is left untouched.
pub fn insert_edges_bulk(&self, edges: &[GraphEdge]) -> Result<Vec<i64>, SqliteGraphError> {
if edges.is_empty() {
return Ok(Vec::new());
}
for edge in edges {
validate_edge(edge)?;
if !self.entity_exists(edge.from_id)? || !self.entity_exists(edge.to_id)? {
return Err(SqliteGraphError::invalid_input(
"edge endpoints must reference existing entities",
));
}
}
let conn = self.connection();
conn.underlying()
.execute_batch("BEGIN")
.map_err(|e| SqliteGraphError::query(e.to_string()))?;

let mut ids = Vec::with_capacity(edges.len());
let insert_result: Result<(), SqliteGraphError> = (|| {
let mut stmt = conn
.prepare_cached(
"INSERT INTO graph_edges(from_id, to_id, edge_type, data) VALUES(?1, ?2, ?3, ?4)",
)
.map_err(|e| SqliteGraphError::query(e.to_string()))?;
for edge in edges {
let data = serde_json::to_string(&edge.data)
.map_err(|e| SqliteGraphError::invalid_input(e.to_string()))?;
stmt.execute(params![
edge.from_id,
edge.to_id,
edge.edge_type.as_str(),
data,
])
.map_err(|e| SqliteGraphError::query(e.to_string()))?;
ids.push(conn.last_insert_rowid());
}
Ok(())
})();

match insert_result {
Ok(()) => {
conn.underlying()
.execute_batch("COMMIT")
.map_err(|e| SqliteGraphError::query(e.to_string()))?;
self.invalidate_caches();
Ok(ids)
}
Err(err) => {
let _ = conn.underlying().execute_batch("ROLLBACK");
Err(err)
}
}
}

pub fn get_edge(&self, id: i64) -> Result<GraphEdge, SqliteGraphError> {
self.connection()
.query_row(
Expand Down
57 changes: 57 additions & 0 deletions sqlitegraph-core/src/graph/entity_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,63 @@ impl SqliteGraph {
Ok(self.connection().last_insert_rowid())
}

/// Insert many entities atomically inside a single transaction.
///
/// Returns the rowids of the inserted entities in the same order as the
/// input. Empty input returns an empty vector without opening a
/// transaction. On any error, the transaction is rolled back and the
/// database is left untouched.
pub fn insert_entities_bulk(
&self,
entities: &[GraphEntity],
) -> Result<Vec<i64>, SqliteGraphError> {
if entities.is_empty() {
return Ok(Vec::new());
}
for entity in entities {
validate_entity(entity)?;
}
let conn = self.connection();
conn.underlying()
.execute_batch("BEGIN")
.map_err(|e| SqliteGraphError::query(e.to_string()))?;

let mut ids = Vec::with_capacity(entities.len());
let insert_result: Result<(), SqliteGraphError> = (|| {
let mut stmt = conn
.prepare_cached(
"INSERT INTO graph_entities(kind, name, file_path, data) VALUES(?1, ?2, ?3, ?4)",
)
.map_err(|e| SqliteGraphError::query(e.to_string()))?;
for entity in entities {
let data = serde_json::to_string(&entity.data)
.map_err(|e| SqliteGraphError::invalid_input(e.to_string()))?;
stmt.execute(params![
entity.kind.as_str(),
entity.name.as_str(),
entity.file_path.as_deref(),
data,
])
.map_err(|e| SqliteGraphError::query(e.to_string()))?;
ids.push(conn.last_insert_rowid());
}
Ok(())
})();

match insert_result {
Ok(()) => {
conn.underlying()
.execute_batch("COMMIT")
.map_err(|e| SqliteGraphError::query(e.to_string()))?;
Ok(ids)
}
Err(err) => {
let _ = conn.underlying().execute_batch("ROLLBACK");
Err(err)
}
}
}

pub fn get_entity(&self, id: i64) -> Result<GraphEntity, SqliteGraphError> {
self.connection()
.query_row(
Expand Down
Loading
Loading