From 14b84ecfc596da974caaf6e45a7e73c93b4bb80e Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Sun, 3 May 2026 19:15:01 +0100 Subject: [PATCH 01/10] counts for edges in and out --- db4-storage/src/segments/mod.rs | 20 ++++++++++++++++ db4-storage/src/segments/node/segment.rs | 29 ++++++++++++++++-------- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/db4-storage/src/segments/mod.rs b/db4-storage/src/segments/mod.rs index 222c8b2d71..a77ed3e4f3 100644 --- a/db4-storage/src/segments/mod.rs +++ b/db4-storage/src/segments/mod.rs @@ -160,6 +160,8 @@ pub struct SegmentContainer { max_page_len: u32, properties: Properties, meta: Arc, + out_count: usize, // used to count num edges + inb_count: usize, // used to count num edges } pub trait HasRow: Default + Send + Sync + Sized { @@ -178,6 +180,8 @@ impl SegmentContainer { max_page_len, properties: Default::default(), meta, + out_count: 0, + inb_count: 0, } } @@ -190,6 +194,22 @@ impl SegmentContainer { ) } + pub fn inc_out_count(&mut self, i: usize) { + self.out_count += i; + } + + pub fn inc_inb_count(&mut self, i: usize) { + self.inb_count += i; + } + + pub fn out_count(&self) -> usize { + self.out_count + } + + pub fn inb_count(&self) -> usize { + self.inb_count + } + #[inline] pub fn est_size(&self) -> usize { // TODO: this is a rough estimate and should be improved diff --git a/db4-storage/src/segments/node/segment.rs b/db4-storage/src/segments/node/segment.rs index 53c27d4899..ac200fdeb7 100644 --- a/db4-storage/src/segments/node/segment.rs +++ b/db4-storage/src/segments/node/segment.rs @@ -261,11 +261,15 @@ impl MemNodeSegment { let layer = self.get_or_create_layer(layer_id); let est_size = layer.est_size(); - let add_out = layer.reserve_local_row(src_pos); - let new_entry = add_out.is_new(); - let add_out = add_out.inner(); - let is_new_edge = add_out.adj.add_edge_out(dst, e_id.edge); - let row = add_out.row; + let (is_new_edge, row, new_entry) = { + let add_out = layer.reserve_local_row(src_pos); + let new_entry = add_out.is_new(); + let add_out = add_out.inner(); + let is_new_edge = add_out.adj.add_edge_out(dst, e_id.edge); + let row = add_out.row; + (is_new_edge, row, new_entry) + }; + layer.inc_out_count(is_new_edge as usize); if let Some(t) = t { self.update_timestamp_inner(t, row, e_id); } @@ -290,11 +294,16 @@ impl MemNodeSegment { let layer = self.get_or_create_layer(layer_id); let est_size = layer.est_size(); - let add_in = layer.reserve_local_row(dst_pos); - let new_entry = add_in.is_new(); - let add_in = add_in.inner(); - let is_new_edge = add_in.adj.add_edge_into(src, e_id.edge); - let row = add_in.row; + let (is_new_edge, row, new_entry) = { + let add_in = layer.reserve_local_row(dst_pos); + let new_entry = add_in.is_new(); + let add_in = add_in.inner(); + let is_new_edge = add_in.adj.add_edge_into(src, e_id.edge); + let row = add_in.row; + (is_new_edge, row, new_entry) + }; + + layer.inc_inb_count(is_new_edge as usize); if let Some(t) = t { self.update_timestamp_inner(t, row, e_id); From e24330b1d4026f474dd4a4a62bc7e0d02d62022e Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Tue, 5 May 2026 13:51:42 +0100 Subject: [PATCH 02/10] EventTime repr(C) --- raphtory-api/src/core/storage/timeindex.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/raphtory-api/src/core/storage/timeindex.rs b/raphtory-api/src/core/storage/timeindex.rs index 13defb156e..68c12f7809 100644 --- a/raphtory-api/src/core/storage/timeindex.rs +++ b/raphtory-api/src/core/storage/timeindex.rs @@ -1,3 +1,4 @@ +use bytemuck::{Pod, Zeroable}; use chrono::{DateTime, Utc}; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -24,7 +25,10 @@ impl fmt::Display for TimeError { impl std::error::Error for TimeError {} -#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq, Hash)] +#[derive( + Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq, Hash, Pod, Zeroable, +)] +#[repr(C)] pub struct EventTime(pub i64, pub usize); impl PartialEq for EventTime { @@ -181,11 +185,11 @@ impl<'a, L: TimeIndexOps<'a>, R: TimeIndexOps<'a, IndexType = L::IndexType>> Tim } fn first(&self) -> Option { - self.0.first().into_iter().chain(self.1.first()).min() + Iterator::min(self.0.first().into_iter().chain(self.1.first())) // rust-analyzer } fn last(&self) -> Option { - self.0.last().into_iter().chain(self.1.last()).max() + Iterator::max(self.0.last().into_iter().chain(self.1.last())) // rust-analyzer } fn iter(self) -> impl Iterator + Send + Sync + 'a { From d1acfdfcb5a59fc9c97ccd08a769c236314be0af Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Fri, 8 May 2026 14:18:46 +0100 Subject: [PATCH 03/10] minor changes to support persistent-matrix --- db4-storage/src/gen_ts.rs | 1 - db4-storage/src/properties/mod.rs | 6 ++++++ db4-storage/src/segments/mod.rs | 4 ++++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/db4-storage/src/gen_ts.rs b/db4-storage/src/gen_ts.rs index f74ebfb059..1c2a13c007 100644 --- a/db4-storage/src/gen_ts.rs +++ b/db4-storage/src/gen_ts.rs @@ -1,6 +1,5 @@ use std::ops::Range; -use itertools::Itertools; use raphtory_api::core::entities::LayerId; use raphtory_core::{ entities::{ELID, LayerIds, layers::Multiple}, diff --git a/db4-storage/src/properties/mod.rs b/db4-storage/src/properties/mod.rs index 39b361b3ae..6482dffc61 100644 --- a/db4-storage/src/properties/mod.rs +++ b/db4-storage/src/properties/mod.rs @@ -38,6 +38,7 @@ pub struct Properties { has_properties: bool, has_deletions: bool, pub additions_count: usize, + pub deletions_count: usize, } pub(crate) struct PropMutEntry<'a> { @@ -264,6 +265,10 @@ impl Properties { pub fn t_len(&self) -> usize { self.t_properties.len() } + + pub fn deletions_count(&self) -> usize { + self.deletions_count + } } impl<'a> PropMutEntry<'a> { @@ -325,6 +330,7 @@ impl<'a> PropMutEntry<'a> { } self.properties.has_deletions = true; + self.properties.deletions_count += 1; let prop_timestamps = &mut self.properties.deletions[self.row]; prop_timestamps.set(t, edge_id.unwrap_or_default()); diff --git a/db4-storage/src/segments/mod.rs b/db4-storage/src/segments/mod.rs index a77ed3e4f3..9efd11bc84 100644 --- a/db4-storage/src/segments/mod.rs +++ b/db4-storage/src/segments/mod.rs @@ -246,6 +246,10 @@ impl SegmentContainer { self.properties.t_len() } + pub fn deletions_len(&self) -> usize { + self.properties.deletions_count() + } + /// Reserves a local row for the given item position. /// If the item position already exists, it returns a mutable reference to the existing item. /// Left variant indicates that the item was already present, From eccad93284806656f05cb5bace077f2b56e75a20 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Fri, 8 May 2026 17:07:08 +0100 Subject: [PATCH 04/10] update Cargo.lock --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 2bcaf031e6..df4bbc2ff7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7533,7 +7533,7 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "snb" -version = "0.18.0" +version = "0.17.0" dependencies = [ "chrono", "flate2", From 81975bda9bcae907442bf47d495529bb6eb935d7 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Mon, 11 May 2026 16:05:42 +0100 Subject: [PATCH 05/10] minor fixes --- raphtory/src/db/api/view/graph.rs | 1 - raphtory/src/db/api/view/internal/filter_ops.rs | 1 - .../src/db/api/view/internal/time_semantics/filtered_edge.rs | 1 - raphtory/src/parquet_encoder/mod.rs | 1 + 4 files changed, 1 insertion(+), 3 deletions(-) diff --git a/raphtory/src/db/api/view/graph.rs b/raphtory/src/db/api/view/graph.rs index e265dcf910..78c5905e92 100644 --- a/raphtory/src/db/api/view/graph.rs +++ b/raphtory/src/db/api/view/graph.rs @@ -96,7 +96,6 @@ pub trait GraphViewOps<'graph>: BoxableGraphView + Sized + Clone + 'graph { /// If a path is provided, it will be used to store the new graph /// (assuming the storage feature is enabled). Sets a new config. #[cfg(feature = "io")] - #[cfg(feature = "io")] fn materialize_at_with_config( &self, path: &(impl GraphPaths + ?Sized), diff --git a/raphtory/src/db/api/view/internal/filter_ops.rs b/raphtory/src/db/api/view/internal/filter_ops.rs index a7bf61c624..c03fffec97 100644 --- a/raphtory/src/db/api/view/internal/filter_ops.rs +++ b/raphtory/src/db/api/view/internal/filter_ops.rs @@ -52,7 +52,6 @@ pub trait FilterOps { fn node_and_edge_filters_independent(&self) -> bool; fn filtered(&self) -> bool; - #[inline] fn filtered_excluding_layers(&self) -> bool; fn node_list_trusted(&self) -> bool; diff --git a/raphtory/src/db/api/view/internal/time_semantics/filtered_edge.rs b/raphtory/src/db/api/view/internal/time_semantics/filtered_edge.rs index a97d4e4fa5..737ad7c4a5 100644 --- a/raphtory/src/db/api/view/internal/time_semantics/filtered_edge.rs +++ b/raphtory/src/db/api/view/internal/time_semantics/filtered_edge.rs @@ -334,7 +334,6 @@ impl<'a> FilteredEdgeStorageOps<'a> for EdgeEntryRef<'a> { ) -> impl Iterator + 'a { self.layer_ids_iter(layer_ids) .filter(move |layer_id| view.internal_filter_edge_layer(self, *layer_id)) - .map(|layer_id| layer_id) } fn filtered_additions_iter( diff --git a/raphtory/src/parquet_encoder/mod.rs b/raphtory/src/parquet_encoder/mod.rs index 73b280b2f1..97134b02a3 100644 --- a/raphtory/src/parquet_encoder/mod.rs +++ b/raphtory/src/parquet_encoder/mod.rs @@ -93,6 +93,7 @@ pub(crate) fn run_encode_indexed< encode_fn: impl Fn(II, &G, &mut Decoder, &mut S) -> Result<(), GraphError> + Sync, ) -> Result<(), GraphError> { let schema = derive_schema(meta, g.id_type(), default_fields_fn)?; + let num_digits = 8; items.try_for_each(|(chunk, items)| { From dda24786dc931e60bd9b12989cde63f6f5a0f43d Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Wed, 13 May 2026 12:41:28 +0000 Subject: [PATCH 06/10] add bulk writer to avoid calling atomics on every event --- .../src/pages/edge_page/bulk_writer.rs | 136 +++ db4-storage/src/pages/edge_page/mod.rs | 1 + db4-storage/src/pages/edge_page/writer.rs | 1 + db4-storage/src/pages/layer_counter.rs | 8 +- db4-storage/src/pages/locked/edges.rs | 7 +- db4-storage/src/pages/locked/nodes.rs | 7 +- .../src/pages/node_page/bulk_writer.rs | 145 +++ db4-storage/src/pages/node_page/mod.rs | 1 + db4-storage/src/pages/node_page/writer.rs | 39 +- raphtory/examples/snb_loader.rs | 966 ++++++++---------- raphtory/src/arrow_loader/df_loaders/edges.rs | 8 +- raphtory/src/io/parquet_loaders.rs | 82 +- 12 files changed, 819 insertions(+), 582 deletions(-) create mode 100644 db4-storage/src/pages/edge_page/bulk_writer.rs create mode 100644 db4-storage/src/pages/node_page/bulk_writer.rs diff --git a/db4-storage/src/pages/edge_page/bulk_writer.rs b/db4-storage/src/pages/edge_page/bulk_writer.rs new file mode 100644 index 0000000000..575a7b22a5 --- /dev/null +++ b/db4-storage/src/pages/edge_page/bulk_writer.rs @@ -0,0 +1,136 @@ +use crate::{ + LocalPOS, api::edges::EdgeSegmentOps, pages::edge_page::writer::EdgeWriter, + segments::edge::segment::MemEdgeSegment, +}; +use raphtory_api::core::entities::{ + EID, LayerId, VID, + properties::{meta::STATIC_GRAPH_LAYER_ID, prop::AsPropRef}, +}; +use raphtory_core::storage::timeindex::{AsTime, EventTime}; +use std::ops::DerefMut; + +pub struct BulkEdgeWriter< + 'a, + MP: DerefMut + std::fmt::Debug, + ES: EdgeSegmentOps, +> { + ew: EdgeWriter<'a, MP, ES>, + layers: Vec, + earliest: EventTime, + latest: EventTime, +} + +impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmentOps> + From> for BulkEdgeWriter<'a, MP, ES> +{ + fn from(value: EdgeWriter<'a, MP, ES>) -> Self { + Self { + ew: value, + layers: vec![0], + earliest: EventTime::MAX, + latest: EventTime::MIN, + } + } +} + +impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmentOps> + BulkEdgeWriter<'a, MP, ES> +{ + pub fn bulk_add_edge( + &mut self, + t: EventTime, + edge_pos: LocalPOS, + src: VID, + dst: VID, + edge_exists: bool, + layer_id: LayerId, + c_props: impl IntoIterator, + t_props: impl IntoIterator, + ) { + if !edge_exists { + if self + .ew + .writer + .insert_static_edge_internal(edge_pos, src, dst, STATIC_GRAPH_LAYER_ID) + { + self.increment_layer_num_edges(STATIC_GRAPH_LAYER_ID); + } + } + + if self + .ew + .writer + .insert_edge_internal(t, edge_pos, src, dst, layer_id, t_props) + && !self.ew.page.immut_has_edge(edge_pos, layer_id) + { + self.increment_layer_num_edges(layer_id); + } + + self.update_time(t); + + self.ew + .writer + .update_const_properties(edge_pos, src, dst, layer_id, c_props); + } + + pub fn bulk_delete_edge( + &mut self, + t: EventTime, + edge_pos: LocalPOS, + src: VID, + dst: VID, + exists: bool, + layer_id: LayerId, + ) { + if !exists { + if self + .ew + .writer + .insert_static_edge_internal(edge_pos, src, dst, STATIC_GRAPH_LAYER_ID) + { + self.increment_layer_num_edges(STATIC_GRAPH_LAYER_ID); + } + } + + self.update_time(t); + if self + .ew + .writer + .delete_edge_internal(t, edge_pos, src, dst, layer_id) + && !self.ew.page.immut_has_edge(edge_pos, layer_id) + { + self.increment_layer_num_edges(layer_id); + } + } + + #[inline] + fn increment_layer_num_edges(&mut self, layer_id: LayerId) { + if self.layers.len() <= layer_id.0 { + self.layers.resize_with(layer_id.0 + 1, Default::default); + } + self.layers[layer_id.0] += 1; + } + + #[inline] + fn update_time(&mut self, t: EventTime) { + self.earliest = self.earliest.min(t); + self.latest = self.latest.max(t); + } + + #[inline(always)] + pub fn resolve_pos(&self, edge_id: EID) -> Option { + self.ew.resolve_pos(edge_id) + } +} + +impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmentOps> Drop + for BulkEdgeWriter<'a, MP, ES> +{ + fn drop(&mut self) { + for (layer_id, count) in self.layers.iter().enumerate() { + self.ew.graph_stats.increment_by(LayerId(layer_id), *count); + } + self.ew.graph_stats.update_time(self.earliest.t()); + self.ew.graph_stats.update_time(self.latest.t()); + } +} diff --git a/db4-storage/src/pages/edge_page/mod.rs b/db4-storage/src/pages/edge_page/mod.rs index d3baa81782..69fc99e0c2 100644 --- a/db4-storage/src/pages/edge_page/mod.rs +++ b/db4-storage/src/pages/edge_page/mod.rs @@ -1 +1,2 @@ pub mod writer; +pub mod bulk_writer; diff --git a/db4-storage/src/pages/edge_page/writer.rs b/db4-storage/src/pages/edge_page/writer.rs index 03c96cefce..d02a5f1242 100644 --- a/db4-storage/src/pages/edge_page/writer.rs +++ b/db4-storage/src/pages/edge_page/writer.rs @@ -177,6 +177,7 @@ impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmen self.page.segment_id() } + #[inline] fn increment_layer_num_edges(&self, layer_id: LayerId) { self.graph_stats.increment(layer_id); } diff --git a/db4-storage/src/pages/layer_counter.rs b/db4-storage/src/pages/layer_counter.rs index e94f8567b7..3342fdd14a 100644 --- a/db4-storage/src/pages/layer_counter.rs +++ b/db4-storage/src/pages/layer_counter.rs @@ -70,9 +70,15 @@ impl GraphStats { self.latest.get() } + #[inline(always)] pub fn increment(&self, layer_id: LayerId) -> usize { + self.increment_by(layer_id, 1) + } + + #[inline(always)] + pub fn increment_by(&self, layer_id: LayerId, count: usize) -> usize { let counter = self.get_or_create_layer(layer_id); - counter.fetch_add(1, std::sync::atomic::Ordering::Release) + counter.fetch_add(count, std::sync::atomic::Ordering::Release) } pub fn get(&self, layer_id: LayerId) -> usize { diff --git a/db4-storage/src/pages/locked/edges.rs b/db4-storage/src/pages/locked/edges.rs index 709816f637..393fdb31d2 100644 --- a/db4-storage/src/pages/locked/edges.rs +++ b/db4-storage/src/pages/locked/edges.rs @@ -4,7 +4,7 @@ use crate::{ LocalPOS, api::edges::EdgeSegmentOps, error::StorageError, - pages::{edge_page::writer::EdgeWriter, layer_counter::GraphStats, resolve_pos}, + pages::{edge_page::{bulk_writer::BulkEdgeWriter, writer::EdgeWriter}, layer_counter::GraphStats, resolve_pos}, persist::strategy::PersistenceStrategy, segments::edge::segment::MemEdgeSegment, }; @@ -44,6 +44,11 @@ impl<'a, ES: EdgeSegmentOps> LockedEdgePage<'a, ES> { EdgeWriter::new(self.num_edges, self.page, self.lock.deref_mut()) } + #[inline(always)] + pub fn bulk_writer(&mut self) -> BulkEdgeWriter<'_, &mut MemEdgeSegment, ES> { + EdgeWriter::new(self.num_edges, self.page, self.lock.deref_mut()).into() + } + #[inline(always)] pub fn page_id(&self) -> usize { self.page_id diff --git a/db4-storage/src/pages/locked/nodes.rs b/db4-storage/src/pages/locked/nodes.rs index 0cb296d27e..3ec70f1f8f 100644 --- a/db4-storage/src/pages/locked/nodes.rs +++ b/db4-storage/src/pages/locked/nodes.rs @@ -2,7 +2,7 @@ use crate::{ LocalPOS, api::nodes::NodeSegmentOps, error::StorageError, - pages::{layer_counter::GraphStats, node_page::writer::NodeWriter, resolve_pos}, + pages::{layer_counter::GraphStats, node_page::{bulk_writer::BulkNodeWriter, writer::NodeWriter}, resolve_pos}, persist::strategy::PersistenceStrategy, segments::node::segment::MemNodeSegment, }; @@ -47,6 +47,11 @@ impl<'a, NS: NodeSegmentOps> LockedNodePage<'a, NS> { NodeWriter::new(self.page, self.layer_counter, self.lock.deref_mut()) } + #[inline(always)] + pub fn bulk_writer(&mut self) -> BulkNodeWriter<'_, &mut MemNodeSegment, NS> { + NodeWriter::new(self.page, self.layer_counter, self.lock.deref_mut()).into() + } + pub fn head(&mut self) -> &mut MemNodeSegment { self.lock.deref_mut() } diff --git a/db4-storage/src/pages/node_page/bulk_writer.rs b/db4-storage/src/pages/node_page/bulk_writer.rs new file mode 100644 index 0000000000..d5e0b41eac --- /dev/null +++ b/db4-storage/src/pages/node_page/bulk_writer.rs @@ -0,0 +1,145 @@ +use std::ops::DerefMut; + +use raphtory_api::core::entities::properties::{ + meta::{NODE_ID_IDX, STATIC_GRAPH_LAYER_ID}, + prop::Prop, +}; +use raphtory_core::{ + entities::{EID, ELID, GID, LayerId, VID}, + storage::timeindex::AsTime, +}; + +use crate::{ + LocalPOS, api::nodes::NodeSegmentOps, pages::node_page::writer::NodeWriter, + segments::node::segment::MemNodeSegment, +}; + +#[derive(Debug)] +pub struct BulkNodeWriter<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> { + nw: NodeWriter<'a, MP, NS>, + layers: Vec, +} + +impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> + From> for BulkNodeWriter<'a, MP, NS> +{ + fn from(value: NodeWriter<'a, MP, NS>) -> Self { + Self { + nw: value, + layers: Vec::new(), + } + } +} + +impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> + BulkNodeWriter<'a, MP, NS> +{ + #[inline] + pub fn get_out_edge(&self, pos: LocalPOS, dst: VID, layer_id: LayerId) -> Option { + self.nw.get_out_edge(pos, dst, layer_id) + } + + #[inline(always)] + pub fn resolve_pos(&self, node_id: VID) -> Option { + self.nw.resolve_pos(node_id) + } + + #[inline(always)] + pub fn add_static_outbound_edge( + &mut self, + src_pos: LocalPOS, + dst: impl Into, + e_id: impl Into, + ) { + let e_id = e_id.into(); + self.nw.add_outbound_edge_inner::( + None, + src_pos, + dst, + e_id.with_layer(STATIC_GRAPH_LAYER_ID), + |layer_id| { + Self::update_layer_count(layer_id, &mut self.layers); + }, + ); + } + + pub fn add_static_inbound_edge( + &mut self, + dst_pos: LocalPOS, + src: impl Into, + e_id: impl Into, + ) { + let e_id = e_id.into(); + self.nw.add_inbound_edge_inner::( + None, + dst_pos, + src, + e_id.with_layer(STATIC_GRAPH_LAYER_ID), + |layer_id| { + Self::update_layer_count(layer_id, &mut self.layers); + }, + ); + } + + #[inline(always)] + pub fn add_outbound_edge( + &mut self, + t: Option, + src_pos: impl Into, + dst: impl Into, + e_id: impl Into, + ) { + self.nw + .add_outbound_edge_inner(t, src_pos, dst, e_id, |layer_id| { + Self::update_layer_count(layer_id, &mut self.layers); + }); + } + + pub fn add_inbound_edge( + &mut self, + t: Option, + dst_pos: impl Into, + src: impl Into, + e_id: impl Into, + ) { + self.nw + .add_inbound_edge_inner(t, dst_pos, src, e_id, |layer_id| { + Self::update_layer_count(layer_id, &mut self.layers); + }); + } + + fn update_layer_count(layer_id: LayerId, layers: &mut Vec) { + if layers.len() <= layer_id.0 { + layers.resize_with(layer_id.0 + 1, Default::default); + } + layers[layer_id.0] += 1; + } + + #[inline(always)] + pub fn update_timestamp(&mut self, t: T, pos: LocalPOS, e_id: ELID) { + self.nw.update_timestamp(t, pos, e_id); + } + + #[inline(always)] + pub fn store_node_id(&mut self, pos: LocalPOS, layer_id: LayerId, gid: GID) { + let gid = match gid { + GID::U64(id) => Prop::U64(id), + GID::Str(s) => Prop::str(s), + }; + let props = [(NODE_ID_IDX, gid)]; + self.nw + .update_c_props_inner(pos, layer_id, props, |layer_id| { + Self::update_layer_count(layer_id, &mut self.layers); + }); + } +} + +impl<'a, MP: DerefMut , ES: NodeSegmentOps> Drop + for BulkNodeWriter<'a, MP, ES> +{ + fn drop(&mut self) { + for (layer_id, count) in self.layers.iter().enumerate() { + self.nw.l_counter.increment_by(LayerId(layer_id), *count); + } + } +} diff --git a/db4-storage/src/pages/node_page/mod.rs b/db4-storage/src/pages/node_page/mod.rs index d3baa81782..69fc99e0c2 100644 --- a/db4-storage/src/pages/node_page/mod.rs +++ b/db4-storage/src/pages/node_page/mod.rs @@ -1 +1,2 @@ pub mod writer; +pub mod bulk_writer; diff --git a/db4-storage/src/pages/node_page/writer.rs b/db4-storage/src/pages/node_page/writer.rs index 5c462bc638..6bb8d8a0fa 100644 --- a/db4-storage/src/pages/node_page/writer.rs +++ b/db4-storage/src/pages/node_page/writer.rs @@ -56,7 +56,9 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri dst: impl Into, e_id: impl Into, ) { - self.add_outbound_edge_inner(t, src_pos, dst, e_id); + self.add_outbound_edge_inner(t, src_pos, dst, e_id, |layer_id| { + self.l_counter.increment(layer_id); + }); } pub fn add_static_outbound_edge( @@ -71,15 +73,19 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri src_pos, dst, e_id.with_layer(STATIC_GRAPH_LAYER_ID), + |layer_id| { + self.l_counter.increment(layer_id); + }, ); } - fn add_outbound_edge_inner( + pub(crate) fn add_outbound_edge_inner( &mut self, t: Option, src_pos: impl Into, dst: impl Into, e_id: impl Into, + mut layer_counter: impl FnMut(LayerId) -> (), ) { let src_pos = src_pos.into(); let dst = dst.into(); @@ -93,7 +99,8 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri self.mut_segment.increment_est_size(add); if is_new_node && !self.page.has_node(src_pos, layer_id) { - self.l_counter.increment(layer_id); + // self.l_counter.increment(layer_id); + layer_counter(layer_id); } } @@ -104,7 +111,9 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri src: impl Into, e_id: impl Into, ) { - self.add_inbound_edge_inner(t, dst_pos, src, e_id); + self.add_inbound_edge_inner(t, dst_pos, src, e_id, |layer| { + self.l_counter.increment(layer); + }); } pub fn add_static_inbound_edge( @@ -119,15 +128,19 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri dst_pos, src, e_id.with_layer(STATIC_GRAPH_LAYER_ID), + |layer| { + self.l_counter.increment(layer); + }, ); } - fn add_inbound_edge_inner( + pub(crate) fn add_inbound_edge_inner( &mut self, t: Option, dst_pos: impl Into, src: impl Into, e_id: impl Into, + mut layer_counter: impl FnMut(LayerId) -> (), ) { let e_id = e_id.into(); let src = src.into(); @@ -141,7 +154,7 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri self.mut_segment.increment_est_size(add); if is_new_node && !self.page.has_node(dst_pos, layer) { - self.l_counter.increment(layer); + layer_counter(layer); } } @@ -175,11 +188,23 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri pos: LocalPOS, layer_id: LayerId, props: impl IntoIterator, + ) { + self.update_c_props_inner(pos, layer_id, props, |layer_id| { + self.l_counter.increment(layer_id); + }); + } + + pub(crate) fn update_c_props_inner( + &mut self, + pos: LocalPOS, + layer_id: LayerId, + props: impl IntoIterator, + mut layer_counter: impl FnMut(LayerId) -> (), ) { let (is_new_node, add) = self.mut_segment.update_metadata(pos, layer_id, props); self.mut_segment.increment_est_size(add); if is_new_node && !self.page.has_node(pos, layer_id) { - self.l_counter.increment(layer_id); + layer_counter(layer_id); } } diff --git a/raphtory/examples/snb_loader.rs b/raphtory/examples/snb_loader.rs index 26584b1860..b8ca0c0c10 100644 --- a/raphtory/examples/snb_loader.rs +++ b/raphtory/examples/snb_loader.rs @@ -1,9 +1,15 @@ #[cfg(feature = "io")] use raphtory::io::parquet_loaders::{load_edges_from_parquet, load_nodes_from_parquet}; +#[cfg(feature = "io")] use raphtory::{arrow_loader::df_loaders::edges::ColumnNames, errors::GraphError, prelude::*}; +use serde::Deserialize; +#[cfg(feature = "io")] use std::path::{Path, PathBuf}; +#[cfg(feature = "io")] +use storage::persist::node; /// Construct the path to a named Parquet file inside `parquet_dir`. +#[cfg(feature = "io")] fn pq(parquet_dir: &Path, name: &str) -> PathBuf { parquet_dir.join(format!("{}.parquet", name)) } @@ -15,540 +21,435 @@ use tikv_jemallocator::Jemalloc; #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; -/// Load SNB data from Parquet files into a Raphtory Graph. #[cfg(feature = "io")] -fn load_snb_graph(parquet_dir: &Path, graph: &Graph) -> Result<(), GraphError> { - // ── Static Nodes ────────────────────────────────────────────────────── - - // println!("Loading Places..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "place"), - // "_time", - // None, - // "_node_id", - // None, - // Some("type"), - // &["name", "url", "id"], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ places"); - - // let fp = pq(parquet_dir, "place_IS_PART_OF_place"); - // if fp.exists() { - // load_edges_from_parquet( - // graph, - // &fp, - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("IS_PART_OF"), - // None, - // None, - // )?; - // graph.flush()?; - // println!(" ✓ IS_PART_OF edges"); - // } - - // println!("Loading Organisations..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "organisation"), - // "_time", - // None, - // "_node_id", - // None, - // Some("type"), - // &["name", "url", "id"], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ organisations"); - - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "organisation_IS_LOCATED_IN_place"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("IS_LOCATED_IN"), - // None, - // None, - // )?; - // graph.flush()?; - // println!(" ✓ Organisation IS_LOCATED_IN edges"); - - // println!("Loading Tags..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "tag"), - // "_time", - // None, - // "_node_id", - // Some("Tag"), - // None, - // &["name", "url", "id"], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ tags"); - - // let fp = pq(parquet_dir, "tagclass"); - // if fp.exists() { - // println!("Loading TagClasses..."); - // load_nodes_from_parquet( - // graph, - // &fp, - // "_time", - // None, - // "_node_id", - // Some("TagClass"), - // None, - // &["name", "url", "id"], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ tag classes"); - // } - - // let fp = pq(parquet_dir, "tag_HAS_TYPE_tagclass"); - // if fp.exists() { - // load_edges_from_parquet( - // graph, - // &fp, - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("HAS_TYPE"), - // None, - // None, - // )?; - // graph.flush()?; - // println!(" ✓ HAS_TYPE edges"); - // } - - // let fp = pq(parquet_dir, "tagclass_IS_SUBCLASS_OF_tagclass"); - // if fp.exists() { - // load_edges_from_parquet( - // graph, - // &fp, - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("IS_SUBCLASS_OF"), - // None, - // None, - // )?; - // graph.flush()?; - // println!(" ✓ IS_SUBCLASS_OF edges"); - // } - - // // ── Dynamic Nodes ───────────────────────────────────────────────────── - - // println!("Loading Persons..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "person"), - // "creationDate", - // None, - // "_node_id", - // Some("Person"), - // None, - // &[ - // "firstName", - // "lastName", - // "gender", - // "birthday", - // "locationIP", - // "browserUsed", - // "language", - // "email", - // "id", - // "creationDate", - // ], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ persons"); - - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "person_IS_LOCATED_IN_place"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("IS_LOCATED_IN"), - // None, - // None, - // )?; - // graph.flush()?; - - // println!("Loading Forums..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "forum"), - // "creationDate", - // None, - // "_node_id", - // Some("Forum"), - // None, - // &["title", "id", "creationDate"], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ forums"); - - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "forum_HAS_MODERATOR_person"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("HAS_MODERATOR"), - // None, - // None, - // )?; - // graph.flush()?; - - // println!("Loading Posts..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "post"), - // "creationDate", - // None, - // "_node_id", - // Some("Post"), - // None, - // &[ - // "imageFile", - // "locationIP", - // "browserUsed", - // "language", - // "content", - // "length", - // "id", - // "creationDate", - // ], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ posts"); +struct NodeParquetInput { + path: PathBuf, + time_col: String, + id_col: String, + node_type: Option, + node_type_col: Option, + property_cols: Vec, +} - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "post_HAS_CREATOR_person"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("HAS_CREATOR"), - // None, - // None, - // )?; - // graph.flush()?; +#[cfg(feature = "io")] +impl NodeParquetInput { + fn new<'a>( + path: impl AsRef, + time_col: &str, + id_col: &str, + node_type: Option<&str>, + node_type_col: Option<&str>, + property_cols: Vec<&str>, + ) -> NodeParquetInput { + NodeParquetInput { + path: path.as_ref().into(), + time_col: time_col.into(), + id_col: id_col.into(), + node_type: node_type.map(Into::into), + node_type_col: node_type_col.map(Into::into), + property_cols: property_cols.into_iter().map(|s| s.into()).collect(), + } + } + + fn path_as_string(&self) -> &str { + self.path.iter().last().and_then(|p| p.to_str()).unwrap() + } +} +#[cfg(feature = "io")] +struct EdgeParquetInput { + path: PathBuf, + time_col: String, + src_col: String, + dst_col: String, + layer: Option, + property_cols: Vec, +} - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "post_IS_LOCATED_IN_place"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("IS_LOCATED_IN"), - // None, - // None, - // )?; - // graph.flush()?; +#[cfg(feature = "io")] +impl EdgeParquetInput { + fn new( + path: impl AsRef, + time_col: &str, + src_col: &str, + dst_col: &str, + layer: Option<&str>, + property_cols: Vec<&str>, + ) -> EdgeParquetInput { + EdgeParquetInput { + path: path.as_ref().into(), + time_col: time_col.into(), + src_col: src_col.into(), + dst_col: dst_col.into(), + layer: layer.map(Into::into), + property_cols: property_cols.into_iter().map(Into::into).collect(), + } + } + + fn path_as_string(&self) -> &str { + self.path.iter().last().and_then(|p| p.to_str()).unwrap() + } +} - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "forum_CONTAINER_OF_post"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("CONTAINER_OF"), - // None, - // None, - // )?; - // graph.flush()?; +#[cfg(feature = "io")] +fn load_snb_graph_v2( + nodes: impl IntoIterator, + edges: impl IntoIterator, + graph: &Graph, +) -> Result<(), GraphError> { + for node in nodes { + println!("Loading nodes from Parquet file with time column '{}', id column '{}', label column '{:?}', and property columns {:?}...", node.time_col, node.id_col, node.node_type, node.property_cols); + load_nodes_from_parquet( + graph, + &node.path, + &node.time_col, + None, + &node.id_col, + node.node_type.as_deref(), + node.node_type_col.as_deref(), + &node + .property_cols + .iter() + .map(|s| s.as_ref()) + .collect::>(), + &[], + None, + None, + None, + None, + true, + None, + )?; + println!( + " ✓ Finished loading nodes from Parquet file with id column '{}'", + node.id_col + ); + } + + for edge in edges { + println!("Loading edges from Parquet file with time column '{}', src column '{}', dst column '{}', label column '{:?}', and property columns {:?}...", edge.time_col, edge.src_col, edge.dst_col, edge.layer, edge.property_cols); + load_edges_from_parquet( + graph, + &edge.path, + ColumnNames::new(&edge.time_col, None, &edge.src_col, &edge.dst_col, None), + true, + &edge + .property_cols + .iter() + .map(|s| s.as_ref()) + .collect::>(), + &[], + None, + edge.layer.as_deref(), + None, + None, + )?; + println!( + " ✓ Finished loading edges from Parquet file with src column '{}'", + edge.src_col + ); + } + Ok(()) +} - println!("Loading Comments..."); - load_nodes_from_parquet( - graph, - &pq(parquet_dir, "comment"), - "creationDate", - None, - "_node_id", - Some("Comment"), - None, - &[ - "locationIP", - "browserUsed", - "content", - "length", - "id", +/// Load SNB data from Parquet files into a Raphtory Graph. +#[cfg(feature = "io")] +fn load_snb_graph( + parquet_dir: &Path, + filter: Option, + graph: &Graph, +) -> Result<(), GraphError> { + let node_inputs = [ + NodeParquetInput::new( + pq(parquet_dir, "place"), + "_time", + "_node_id", + None, + Some("type"), + vec!["name", "url", "id"], + ), + NodeParquetInput::new( + pq(parquet_dir, "organisation"), + "_time", + "_node_id", + None, + Some("type"), + vec!["name", "url", "id"], + ), + NodeParquetInput::new( + pq(parquet_dir, "tag"), + "_time", + "_node_id", + Some("Tag"), + None, + vec!["name", "url", "id"], + ), + NodeParquetInput::new( + pq(parquet_dir, "tagclass"), + "_time", + "_node_id", + Some("TagClass"), + None, + vec!["name", "url", "id"], + ), + NodeParquetInput::new( + pq(parquet_dir, "person"), "creationDate", - ], - &[], - None, - None, - None, - None, - true, - None, - )?; - println!(" ✓ comments"); - // graph.flush()?; - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "comment_HAS_CREATOR_person"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("HAS_CREATOR"), - None, - None, - )?; - // graph.flush()?; - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "comment_IS_LOCATED_IN_place"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("IS_LOCATED_IN"), - None, - None, - )?; - // graph.flush()?; - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "comment_REPLY_OF_post"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("REPLY_OF"), - None, - None, - )?; - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "comment_REPLY_OF_comment"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("REPLY_OF"), - None, - None, - )?; - // graph.flush()?; - - // ── Edge-only relationships ─────────────────────────────────────────── - - println!("Loading KNOWS edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_KNOWS_person"), - ColumnNames::new("creationDate", None, "START_ID", "END_ID", None), - true, - &["creationDate"], - &[], - None, - Some("KNOWS"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ KNOWS edges"); - - println!("Loading LIKES edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_LIKES_post"), - ColumnNames::new("creationDate", None, "START_ID", "END_ID", None), - true, - &["creationDate"], - &[], - None, - Some("LIKES"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ LIKES (Post) edges"); - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_LIKES_comment"), - ColumnNames::new("creationDate", None, "START_ID", "END_ID", None), - true, - &["creationDate"], - &[], - None, - Some("LIKES"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ LIKES (Comment) edges"); - - println!("Loading HAS_MEMBER edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "forum_HAS_MEMBER_person"), - ColumnNames::new("joinDate", None, "START_ID", "END_ID", None), - true, - &["joinDate"], - &[], - None, - Some("HAS_MEMBER"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ HAS_MEMBER edges"); - - println!("Loading STUDY_AT edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_STUDY_AT_organisation"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &["classYear"], - &[], - None, - Some("STUDY_AT"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ STUDY_AT edges"); - - println!("Loading WORK_AT edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_WORK_AT_organisation"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &["workFrom"], - &[], - None, - Some("WORK_AT"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ WORK_AT edges"); - - println!("Loading HAS_TAG edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "post_HAS_TAG_tag"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("HAS_TAG"), - None, - None, - )?; - graph.flush()?; - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "comment_HAS_TAG_tag"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("HAS_TAG"), - None, - None, - )?; - graph.flush()?; + "_node_id", + Some("Person"), + None, + vec![ + "firstName", + "lastName", + "gender", + "birthday", + "locationIP", + "browserUsed", + "language", + "email", + "id", + "creationDate", + ], + ), + NodeParquetInput::new( + pq(parquet_dir, "forum"), + "creationDate", + "_node_id", + Some("Forum"), + None, + vec!["title", "id", "creationDate"], + ), + NodeParquetInput::new( + pq(parquet_dir, "post"), + "creationDate", + "_node_id", + Some("Post"), + None, + vec![ + "imageFile", + "locationIP", + "browserUsed", + "language", + "content", + "length", + "id", + "creationDate", + ], + ), + NodeParquetInput::new( + pq(parquet_dir, "comment"), + "creationDate", + "_node_id", + Some("Comment"), + None, + vec![ + "locationIP", + "browserUsed", + "content", + "length", + "id", + "creationDate", + ], + ), + ]; + + let edge_inputs = [ + EdgeParquetInput::new( + pq(parquet_dir, "place_IS_PART_OF_place"), + "_time", + "START_ID", + "END_ID", + Some("IS_PART_OF"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "organisation_IS_LOCATED_IN_place"), + "_time", + "START_ID", + "END_ID", + Some("IS_LOCATED_IN"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_IS_LOCATED_IN_place"), + "_time", + "START_ID", + "END_ID", + Some("IS_LOCATED_IN"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "post_IS_LOCATED_IN_place"), + "_time", + "START_ID", + "END_ID", + Some("IS_LOCATED_IN"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "forum_HAS_MODERATOR_person"), + "_time", + "START_ID", + "END_ID", + Some("HAS_MODERATOR"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "post_HAS_CREATOR_person"), + "_time", + "START_ID", + "END_ID", + Some("HAS_CREATOR"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "comment_HAS_CREATOR_person"), + "_time", + "START_ID", + "END_ID", + Some("HAS_CREATOR"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "forum_CONTAINER_OF_post"), + "_time", + "START_ID", + "END_ID", + Some("CONTAINER_OF"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "comment_REPLY_OF_post"), + "_time", + "START_ID", + "END_ID", + Some("REPLY_OF"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "comment_REPLY_OF_comment"), + "_time", + "START_ID", + "END_ID", + Some("REPLY_OF"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_KNOWS_person"), + "creationDate", + "START_ID", + "END_ID", + Some("KNOWS"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_LIKES_post"), + "creationDate", + "START_ID", + "END_ID", + Some("LIKES"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_LIKES_comment"), + "creationDate", + "START_ID", + "END_ID", + Some("LIKES"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "forum_HAS_MEMBER_person"), + "joinDate", + "START_ID", + "END_ID", + Some("HAS_MEMBER"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_STUDY_AT_organisation"), + "_time", + "START_ID", + "END_ID", + Some("STUDY_AT"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_WORK_AT_organisation"), + "_time", + "START_ID", + "END_ID", + Some("WORK_AT"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "post_HAS_TAG_tag"), + "_time", + "START_ID", + "END_ID", + Some("HAS_TAG"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "comment_HAS_TAG_tag"), + "_time", + "START_ID", + "END_ID", + Some("HAS_TAG"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "forum_HAS_TAG_tag"), + "_time", + "START_ID", + "END_ID", + Some("HAS_TAG"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_HAS_INTEREST_tag"), + "_time", + "START_ID", + "END_ID", + Some("HAS_INTEREST"), + vec![], + ), + ]; + + let edge_inputs = edge_inputs + .into_iter() + .filter(|edge| { + filter + .as_ref() + .and_then(|filter| filter.edges.as_ref()) + .map(|e_f| e_f.iter().any(|name| edge.path_as_string().contains(name))) + .unwrap_or(true) + }) + .collect::>(); + + let node_inputs = node_inputs + .into_iter() + .filter(|node| { + filter + .as_ref() + .and_then(|filter| filter.nodes.as_ref()) + .map(|e_f| e_f.iter().any(|name| node.path_as_string().contains(name))) + .unwrap_or(true) + }) + .collect::>(); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "forum_HAS_TAG_tag"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("HAS_TAG"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ HAS_TAG edges"); + println!( + "edge_inputs: {:?}, node_inputs: {:?}", + edge_inputs + .iter() + .map(|e| e.path_as_string()) + .collect::>(), + node_inputs + .iter() + .map(|e| e.path_as_string()) + .collect::>(), + ); - println!("Loading HAS_INTEREST edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_HAS_INTEREST_tag"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("HAS_INTEREST"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ HAS_INTEREST edges"); + load_snb_graph_v2(node_inputs, edge_inputs, graph)?; println!( "\n✅ Graph loaded: {} nodes, {} edges", @@ -558,6 +459,12 @@ fn load_snb_graph(parquet_dir: &Path, graph: &Graph) -> Result<(), GraphError> { Ok(()) } +#[derive(Deserialize)] +struct Filter { + nodes: Option>, + edges: Option>, +} + #[cfg(feature = "io")] fn main() { let parquet_dir = std::env::args() @@ -568,8 +475,13 @@ fn main() { .nth(2) .map(|graph| PathBuf::from(graph)) .unwrap_or_else(|| parquet_dir.join("..").join("graph")); + let filter = std::env::args() + .nth(3) + .map(|s| serde_json::from_str::(&s)) + .transpose() + .unwrap(); let graph = Graph::new_at_path(&graph_path).unwrap(); - load_snb_graph(&parquet_dir, &graph).unwrap() + load_snb_graph(&parquet_dir, filter, &graph).unwrap() } #[cfg(not(feature = "io"))] diff --git a/raphtory/src/arrow_loader/df_loaders/edges.rs b/raphtory/src/arrow_loader/df_loaders/edges.rs index 70a51deb5e..62f47721e9 100644 --- a/raphtory/src/arrow_loader/df_loaders/edges.rs +++ b/raphtory/src/arrow_loader/df_loaders/edges.rs @@ -530,7 +530,7 @@ fn update_edge_properties>( ) { let mut t_props = vec![]; let mut c_props = vec![]; - let mut writer = shard.writer(); + let mut writer = shard.bulk_writer(); for (row, src, dst, time, secondary_index, eid, layer, exists) in zip { if let Some(eid_pos) = writer.resolve_pos(eid) { @@ -570,7 +570,7 @@ fn update_inbound_edges>( zip: impl Iterator, delete: bool, ) { - let mut writer = shard.writer(); + let mut writer = shard.bulk_writer(); for ( _row, src, @@ -626,7 +626,7 @@ fn add_and_resolve_outbound_edges< locked_page: &mut LockedNodePage<'_, NS>, delete: bool, ) { - let mut writer = locked_page.writer(); + let mut writer = locked_page.bulk_writer(); for (row, src, dst, time, secondary_index, layer) in zip { if let Some(src_pos) = writer.resolve_pos(src) { let t = EventTime(time, secondary_index); @@ -714,7 +714,7 @@ pub fn store_node_ids>( gid_str_cache: &[(GidRef<'_>, VID)], locked_page: &mut LockedNodePage<'_, NS>, ) { - let mut writer = locked_page.writer(); + let mut writer = locked_page.bulk_writer(); for (src_gid, vid) in gid_str_cache.iter() { if let Some(src_pos) = writer.resolve_pos(*vid) { writer.store_node_id(src_pos, STATIC_GRAPH_LAYER_ID, (*src_gid).into()); diff --git a/raphtory/src/io/parquet_loaders.rs b/raphtory/src/io/parquet_loaders.rs index f43647bb2f..a18043f550 100644 --- a/raphtory/src/io/parquet_loaders.rs +++ b/raphtory/src/io/parquet_loaders.rs @@ -60,47 +60,47 @@ pub fn load_nodes_from_parquet< resolve_nodes: bool, schema: Option>>, ) -> Result<(), GraphError> { - let mut cols_to_check = vec![id, time]; - - cols_to_check.extend_from_slice(properties); - cols_to_check.extend_from_slice(metadata); - - if let Some(ref node_type_col) = node_type_col { - cols_to_check.push(node_type_col.as_ref()); - } - - if let Some(ref secondary_index) = secondary_index { - cols_to_check.push(secondary_index.as_ref()); - } - - if let Some(ref layer_col) = layer_col { - cols_to_check.push(layer_col.as_ref()); - } - - for path in get_parquet_file_paths(parquet_path)? { - let df_view = process_parquet_file_to_df( - path.as_path(), - Some(&cols_to_check), - batch_size, - schema.clone(), - )?; - df_view.check_cols_exist(&cols_to_check)?; - load_nodes_from_df( - df_view, - time, - secondary_index, - id, - properties, - metadata, - shared_metadata, - node_type, - node_type_col, - graph, - resolve_nodes, - layer, - layer_col, - )?; - } + // let mut cols_to_check = vec![id, time]; + + // cols_to_check.extend_from_slice(properties); + // cols_to_check.extend_from_slice(metadata); + + // if let Some(ref node_type_col) = node_type_col { + // cols_to_check.push(node_type_col.as_ref()); + // } + + // if let Some(ref secondary_index) = secondary_index { + // cols_to_check.push(secondary_index.as_ref()); + // } + + // if let Some(ref layer_col) = layer_col { + // cols_to_check.push(layer_col.as_ref()); + // } + + // for path in get_parquet_file_paths(parquet_path)? { + // let df_view = process_parquet_file_to_df( + // path.as_path(), + // Some(&cols_to_check), + // batch_size, + // schema.clone(), + // )?; + // df_view.check_cols_exist(&cols_to_check)?; + // load_nodes_from_df( + // df_view, + // time, + // secondary_index, + // id, + // properties, + // metadata, + // shared_metadata, + // node_type, + // node_type_col, + // graph, + // resolve_nodes, + // layer, + // layer_col, + // )?; + // } Ok(()) } From 11dc50539755e2702f1851cc552955c586333175 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Wed, 13 May 2026 15:34:12 +0100 Subject: [PATCH 07/10] uncomment load nodes --- Cargo.lock | 2 +- db4-storage/src/pages/node_page/writer.rs | 1 - raphtory/src/io/parquet_loaders.rs | 82 +++++++++++------------ 3 files changed, 42 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df4bbc2ff7..2bcaf031e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7533,7 +7533,7 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "snb" -version = "0.17.0" +version = "0.18.0" dependencies = [ "chrono", "flate2", diff --git a/db4-storage/src/pages/node_page/writer.rs b/db4-storage/src/pages/node_page/writer.rs index 6bb8d8a0fa..134aae4158 100644 --- a/db4-storage/src/pages/node_page/writer.rs +++ b/db4-storage/src/pages/node_page/writer.rs @@ -99,7 +99,6 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri self.mut_segment.increment_est_size(add); if is_new_node && !self.page.has_node(src_pos, layer_id) { - // self.l_counter.increment(layer_id); layer_counter(layer_id); } } diff --git a/raphtory/src/io/parquet_loaders.rs b/raphtory/src/io/parquet_loaders.rs index a18043f550..f43647bb2f 100644 --- a/raphtory/src/io/parquet_loaders.rs +++ b/raphtory/src/io/parquet_loaders.rs @@ -60,47 +60,47 @@ pub fn load_nodes_from_parquet< resolve_nodes: bool, schema: Option>>, ) -> Result<(), GraphError> { - // let mut cols_to_check = vec![id, time]; - - // cols_to_check.extend_from_slice(properties); - // cols_to_check.extend_from_slice(metadata); - - // if let Some(ref node_type_col) = node_type_col { - // cols_to_check.push(node_type_col.as_ref()); - // } - - // if let Some(ref secondary_index) = secondary_index { - // cols_to_check.push(secondary_index.as_ref()); - // } - - // if let Some(ref layer_col) = layer_col { - // cols_to_check.push(layer_col.as_ref()); - // } - - // for path in get_parquet_file_paths(parquet_path)? { - // let df_view = process_parquet_file_to_df( - // path.as_path(), - // Some(&cols_to_check), - // batch_size, - // schema.clone(), - // )?; - // df_view.check_cols_exist(&cols_to_check)?; - // load_nodes_from_df( - // df_view, - // time, - // secondary_index, - // id, - // properties, - // metadata, - // shared_metadata, - // node_type, - // node_type_col, - // graph, - // resolve_nodes, - // layer, - // layer_col, - // )?; - // } + let mut cols_to_check = vec![id, time]; + + cols_to_check.extend_from_slice(properties); + cols_to_check.extend_from_slice(metadata); + + if let Some(ref node_type_col) = node_type_col { + cols_to_check.push(node_type_col.as_ref()); + } + + if let Some(ref secondary_index) = secondary_index { + cols_to_check.push(secondary_index.as_ref()); + } + + if let Some(ref layer_col) = layer_col { + cols_to_check.push(layer_col.as_ref()); + } + + for path in get_parquet_file_paths(parquet_path)? { + let df_view = process_parquet_file_to_df( + path.as_path(), + Some(&cols_to_check), + batch_size, + schema.clone(), + )?; + df_view.check_cols_exist(&cols_to_check)?; + load_nodes_from_df( + df_view, + time, + secondary_index, + id, + properties, + metadata, + shared_metadata, + node_type, + node_type_col, + graph, + resolve_nodes, + layer, + layer_col, + )?; + } Ok(()) } From 2dcb8f495c37db54904ed6be897ddabca4a052c6 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Sat, 16 May 2026 16:38:49 +0100 Subject: [PATCH 08/10] add API for total memory --- db4-storage/src/pages/edge_page/mod.rs | 2 +- db4-storage/src/pages/locked/edges.rs | 6 +++++- db4-storage/src/pages/locked/nodes.rs | 6 +++++- .../src/pages/node_page/bulk_writer.rs | 2 +- db4-storage/src/pages/node_page/mod.rs | 2 +- db4-storage/src/segments/edge/segment.rs | 16 +++++++++------ db4-storage/src/segments/node/segment.rs | 20 +++++++++++-------- raphtory-storage/src/graph/graph.rs | 8 ++++++-- raphtory/examples/snb_loader.rs | 6 +++++- 9 files changed, 46 insertions(+), 22 deletions(-) diff --git a/db4-storage/src/pages/edge_page/mod.rs b/db4-storage/src/pages/edge_page/mod.rs index 69fc99e0c2..a1f6628c62 100644 --- a/db4-storage/src/pages/edge_page/mod.rs +++ b/db4-storage/src/pages/edge_page/mod.rs @@ -1,2 +1,2 @@ -pub mod writer; pub mod bulk_writer; +pub mod writer; diff --git a/db4-storage/src/pages/locked/edges.rs b/db4-storage/src/pages/locked/edges.rs index 393fdb31d2..65b77908e9 100644 --- a/db4-storage/src/pages/locked/edges.rs +++ b/db4-storage/src/pages/locked/edges.rs @@ -4,7 +4,11 @@ use crate::{ LocalPOS, api::edges::EdgeSegmentOps, error::StorageError, - pages::{edge_page::{bulk_writer::BulkEdgeWriter, writer::EdgeWriter}, layer_counter::GraphStats, resolve_pos}, + pages::{ + edge_page::{bulk_writer::BulkEdgeWriter, writer::EdgeWriter}, + layer_counter::GraphStats, + resolve_pos, + }, persist::strategy::PersistenceStrategy, segments::edge::segment::MemEdgeSegment, }; diff --git a/db4-storage/src/pages/locked/nodes.rs b/db4-storage/src/pages/locked/nodes.rs index 3ec70f1f8f..3b394b4edf 100644 --- a/db4-storage/src/pages/locked/nodes.rs +++ b/db4-storage/src/pages/locked/nodes.rs @@ -2,7 +2,11 @@ use crate::{ LocalPOS, api::nodes::NodeSegmentOps, error::StorageError, - pages::{layer_counter::GraphStats, node_page::{bulk_writer::BulkNodeWriter, writer::NodeWriter}, resolve_pos}, + pages::{ + layer_counter::GraphStats, + node_page::{bulk_writer::BulkNodeWriter, writer::NodeWriter}, + resolve_pos, + }, persist::strategy::PersistenceStrategy, segments::node::segment::MemNodeSegment, }; diff --git a/db4-storage/src/pages/node_page/bulk_writer.rs b/db4-storage/src/pages/node_page/bulk_writer.rs index d5e0b41eac..7c1b484bf7 100644 --- a/db4-storage/src/pages/node_page/bulk_writer.rs +++ b/db4-storage/src/pages/node_page/bulk_writer.rs @@ -134,7 +134,7 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> } } -impl<'a, MP: DerefMut , ES: NodeSegmentOps> Drop +impl<'a, MP: DerefMut, ES: NodeSegmentOps> Drop for BulkNodeWriter<'a, MP, ES> { fn drop(&mut self) { diff --git a/db4-storage/src/pages/node_page/mod.rs b/db4-storage/src/pages/node_page/mod.rs index 69fc99e0c2..a1f6628c62 100644 --- a/db4-storage/src/pages/node_page/mod.rs +++ b/db4-storage/src/pages/node_page/mod.rs @@ -1,2 +1,2 @@ -pub mod writer; pub mod bulk_writer; +pub mod writer; diff --git a/db4-storage/src/segments/edge/segment.rs b/db4-storage/src/segments/edge/segment.rs index 61adc1d71f..c857d91e92 100644 --- a/db4-storage/src/segments/edge/segment.rs +++ b/db4-storage/src/segments/edge/segment.rs @@ -1,30 +1,30 @@ use crate::{ - LocalPOS, api::edges::{EdgeSegmentOps, LockedESegment}, error::StorageError, persist::{config::ConfigOps, strategy::PersistenceStrategy}, properties::PropMutEntry, segments::{ - HasRow, SegmentContainer, edge::entry::{MemEdgeEntry, MemEdgeRef}, + HasRow, SegmentContainer, }, utils::Iter4, wal::LSN, + LocalPOS, }; use parking_lot::lock_api::ArcRwLockReadGuard; use raphtory_api::core::{ entities::{ - LayerId, VID, properties::{ meta::{Meta, STATIC_GRAPH_LAYER_ID}, prop::AsPropRef, }, + LayerId, VID, }, storage::dict_mapper::MaybeNew, }; use raphtory_api_macros::box_on_debug_lifetime; use raphtory_core::{ - entities::{LayerIds, edges::edge_ref::EdgeRef}, + entities::{edges::edge_ref::EdgeRef, LayerIds}, storage::timeindex::{AsTime, EventTime}, }; use rayon::prelude::*; @@ -32,8 +32,8 @@ use std::{ ops::{Deref, DerefMut}, path::PathBuf, sync::{ - Arc, atomic::{self, AtomicU32, AtomicUsize, Ordering}, + Arc, }, }; @@ -94,6 +94,10 @@ impl MemEdgeSegment { .fetch_add(increment, Ordering::Relaxed); } + pub fn memory_tracker(&self) -> &Arc { + &self.global_memory_tracker + } + pub fn edge_meta(&self) -> &Arc { self.layers[0].meta() } @@ -630,9 +634,9 @@ impl>> EdgeSegmentOps for EdgeSeg mod test { use super::*; use crate::{ - Config, pages::{edge_page::writer::EdgeWriter, layer_counter::GraphStats}, persist::strategy::NoOpStrategy, + Config, }; use raphtory_api::core::entities::properties::{ meta::{Meta, STATIC_GRAPH_LAYER_ID}, diff --git a/db4-storage/src/segments/node/segment.rs b/db4-storage/src/segments/node/segment.rs index ac200fdeb7..576985325c 100644 --- a/db4-storage/src/segments/node/segment.rs +++ b/db4-storage/src/segments/node/segment.rs @@ -1,37 +1,37 @@ use crate::{ - LocalPOS, api::nodes::{LockedNSSegment, NodeSegmentOps}, error::StorageError, loop_lock_write, persist::{config::ConfigOps, strategy::PersistenceStrategy}, segments::{ - HasRow, SegmentContainer, node::entry::{MemNodeEntry, MemNodeRef}, + HasRow, SegmentContainer, }, wal::LSN, + LocalPOS, }; use either::Either; -use parking_lot::{RwLock, lock_api::ArcRwLockReadGuard}; +use parking_lot::{lock_api::ArcRwLockReadGuard, RwLock}; use raphtory_api::core::{ - Direction, entities::{ - EID, LayerId, VID, properties::{ meta::Meta, prop::{AsPropRef, Prop}, }, + LayerId, EID, VID, }, + Direction, }; use raphtory_core::{ - entities::{ELID, nodes::structure::adj::Adj}, + entities::{nodes::structure::adj::Adj, ELID}, storage::timeindex::{AsTime, EventTime}, }; use std::{ ops::{Deref, DerefMut}, path::PathBuf, sync::{ - Arc, atomic::{AtomicU32, AtomicUsize, Ordering}, + Arc, }, }; @@ -103,6 +103,10 @@ impl MemNodeSegment { self.est_size } + pub fn memory_tracker(&self) -> &Arc { + &self.global_mem_tracker + } + pub(crate) fn increment_global_est_size(&self, increment: usize) { self.global_mem_tracker .fetch_add(increment, Ordering::Relaxed); @@ -624,13 +628,13 @@ impl>> NodeSegmentOps for NodeSeg #[cfg(test)] mod test { use crate::{ - LocalPOS, NodeSegmentView, api::nodes::NodeSegmentOps, pages::{layer_counter::GraphStats, node_page::writer::NodeWriter}, persist::{ config::BaseConfig, strategy::{NoOpStrategy, PersistenceStrategy}, }, + LocalPOS, NodeSegmentView, }; use raphtory_api::core::entities::properties::{ meta::{Meta, STATIC_GRAPH_LAYER_ID}, diff --git a/raphtory-storage/src/graph/graph.rs b/raphtory-storage/src/graph/graph.rs index 0526415cb4..1dc73c6994 100644 --- a/raphtory-storage/src/graph/graph.rs +++ b/raphtory-storage/src/graph/graph.rs @@ -18,8 +18,8 @@ use raphtory_api::core::entities::{ use raphtory_core::entities::{edges::edge_ref::EdgeRef, nodes::node_ref::NodeRef}; use std::{fmt::Debug, iter, path::Path, sync::Arc}; use storage::{ - error::StorageError, pages::SegmentCounts, state::StateIndex, Extension, GIDResolver, - GraphPropEntry, + error::StorageError, pages::SegmentCounts, persist::strategy::PersistenceStrategy, + state::StateIndex, Extension, GIDResolver, GraphPropEntry, }; use thiserror::Error; @@ -307,6 +307,10 @@ impl GraphStorage { } } + pub fn total_allocated_memory(&self) -> usize { + self.extension().estimated_size() + } + pub fn node_segment_counts(&self) -> SegmentCounts { match self { GraphStorage::Mem(storage) => storage.nodes.segment_counts(), diff --git a/raphtory/examples/snb_loader.rs b/raphtory/examples/snb_loader.rs index b8ca0c0c10..83a1951269 100644 --- a/raphtory/examples/snb_loader.rs +++ b/raphtory/examples/snb_loader.rs @@ -480,7 +480,11 @@ fn main() { .map(|s| serde_json::from_str::(&s)) .transpose() .unwrap(); - let graph = Graph::new_at_path(&graph_path).unwrap(); + let graph = if !graph_path.exists() { + Graph::new_at_path(&graph_path).unwrap() + } else { + Graph::load(&graph_path).unwrap() + }; load_snb_graph(&parquet_dir, filter, &graph).unwrap() } From 66a0e00661d7e2ab3a6d3c1536f753b4de388f16 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Mon, 18 May 2026 15:11:31 +0100 Subject: [PATCH 09/10] minor additions to PropType --- .../entities/properties/prop/prop_type.rs | 123 +++++++++++++++++- raphtory-core/src/storage/mod.rs | 8 +- 2 files changed, 126 insertions(+), 5 deletions(-) diff --git a/raphtory-api/src/core/entities/properties/prop/prop_type.rs b/raphtory-api/src/core/entities/properties/prop/prop_type.rs index 1807084aa5..b1b17c3131 100644 --- a/raphtory-api/src/core/entities/properties/prop/prop_type.rs +++ b/raphtory-api/src/core/entities/properties/prop/prop_type.rs @@ -208,7 +208,7 @@ pub struct InvalidPropertyTypeErr(pub DataType); pub mod arrow { use crate::core::entities::properties::prop::{PropType, EMPTY_MAP_FIELD_NAME}; - use arrow_schema::{DataType, TimeUnit}; + use arrow_schema::{DataType, Field, Fields, TimeUnit}; impl From<&DataType> for PropType { fn from(value: &DataType) -> Self { @@ -244,6 +244,55 @@ pub mod arrow { } } } + + impl From<&PropType> for DataType { + fn from(value: &PropType) -> Self { + match value { + PropType::Str => DataType::Utf8View, + PropType::U8 => DataType::UInt8, + PropType::U16 => DataType::UInt16, + PropType::I32 => DataType::Int32, + PropType::I64 => DataType::Int64, + PropType::U32 => DataType::UInt32, + PropType::U64 => DataType::UInt64, + PropType::F32 => DataType::Float32, + PropType::F64 => DataType::Float64, + PropType::Decimal { scale } => { + DataType::Decimal128(38, (*scale).try_into().unwrap()) + } + PropType::Bool => DataType::Boolean, + PropType::NDTime => DataType::Timestamp(TimeUnit::Millisecond, None), + PropType::DTime => DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), + PropType::List(p_type) => DataType::LargeList( + Field::new("data", DataType::from(p_type.as_ref()), true).into(), + ), + PropType::Map(p_type) => { + let mut fields = p_type + .iter() + .map(|(name, p_type)| Field::new(name, DataType::from(p_type), true)) + .collect::>(); + fields.sort_by(|l, r| l.name().cmp(r.name())); + + if fields.is_empty() { + DataType::Struct(Fields::from_iter([Field::new( + EMPTY_MAP_FIELD_NAME, + DataType::Null, + true, + )])) + } else { + DataType::Struct(fields.into()) + } + } + PropType::Empty => DataType::Null, + } + } + } + + impl From for DataType { + fn from(value: PropType) -> Self { + DataType::from(&value) + } + } } // step through these types trees and check they are structurally the same @@ -369,6 +418,8 @@ pub fn check_for_unification(l: &PropType, r: &PropType) -> Option { #[cfg(test)] mod test { use super::*; + use arrow_schema::{DataType, Field, Fields, TimeUnit}; + use proptest::{collection::btree_map, prelude::*}; #[test] fn test_unify_types_ne() { @@ -501,4 +552,74 @@ mod test { let size = size_of::(); println!("PropError = {size}") } + + fn field_name() -> impl Strategy { + proptest::string::string_regex("[a-z][a-z0-9_]{0,6}") + .unwrap() + .prop_filter("not the empty map sentinel", |name| { + name != crate::core::entities::properties::prop::EMPTY_MAP_FIELD_NAME + }) + } + + fn canonical_data_type() -> impl Strategy { + let leaf = prop_oneof![ + Just(DataType::Boolean), + Just(DataType::Int32), + Just(DataType::Int64), + Just(DataType::UInt8), + Just(DataType::UInt16), + Just(DataType::UInt32), + Just(DataType::UInt64), + Just(DataType::Float32), + Just(DataType::Float64), + Just(DataType::Utf8View), + Just(DataType::Timestamp(TimeUnit::Millisecond, None)), + Just(DataType::Timestamp( + TimeUnit::Millisecond, + Some("UTC".into()) + )), + (0i8..=38).prop_map(|scale| DataType::Decimal128(38, scale)), + Just(DataType::Null), + ]; + + leaf.prop_recursive(4, 64, 4, |inner| { + prop_oneof![ + inner.clone().prop_map(|data_type| DataType::LargeList( + Field::new("data", data_type, true).into() + )), + btree_map(field_name(), inner, 0..4).prop_map(|fields| { + if fields.is_empty() { + DataType::Struct(Fields::from_iter([Field::new( + crate::core::entities::properties::prop::EMPTY_MAP_FIELD_NAME, + DataType::Null, + true, + )])) + } else { + DataType::Struct( + fields + .into_iter() + .map(|(name, data_type)| Field::new(name, data_type, true)) + .collect::>() + .into(), + ) + } + }), + ] + }) + } + + proptest! { + #[test] + fn data_type_to_prop_type_to_data_type_is_transitive(data_type in canonical_data_type()) { + prop_assert_eq!(DataType::from(PropType::from(&data_type)), data_type); + } + + #[test] + fn prop_type_to_data_type_to_prop_type_is_transitive(data_type in canonical_data_type()) { + let prop_type = PropType::from(&data_type); + let round_tripped: DataType = (&prop_type).into(); + + prop_assert_eq!(PropType::from(&round_tripped), prop_type); + } + } } diff --git a/raphtory-core/src/storage/mod.rs b/raphtory-core/src/storage/mod.rs index 6aaf77501a..ca4d9f1dfa 100644 --- a/raphtory-core/src/storage/mod.rs +++ b/raphtory-core/src/storage/mod.rs @@ -165,7 +165,7 @@ impl PropColumn { col } - pub(crate) fn dtype(&self) -> PropType { + pub(crate) fn dtype_for_error_report(&self) -> PropType { match self { PropColumn::Empty(_) => PropType::Empty, PropColumn::Bool(_) => PropType::Bool, @@ -230,7 +230,7 @@ impl PropColumn { } (col, prop) => { Err(IllegalPropType { - expected: col.dtype(), + expected: col.dtype_for_error_report(), actual: prop.into_prop().dtype(), })?; } @@ -261,7 +261,7 @@ impl PropColumn { )?, (col, prop) => { Err(IllegalPropType { - expected: col.dtype(), + expected: col.dtype_for_error_report(), actual: prop.clone().into_prop().dtype(), })?; } @@ -308,7 +308,7 @@ impl PropColumn { } (col, prop) => { Err(IllegalPropType { - expected: col.dtype(), + expected: col.dtype_for_error_report(), actual: prop.into_prop().dtype(), })?; } From e25a0fd5a3a5ef3b2080e07c74559057a13f1564 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Tue, 19 May 2026 17:41:38 +0100 Subject: [PATCH 10/10] fmt --- db4-storage/src/segments/edge/segment.rs | 12 ++++++------ db4-storage/src/segments/node/segment.rs | 16 ++++++++-------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/db4-storage/src/segments/edge/segment.rs b/db4-storage/src/segments/edge/segment.rs index c857d91e92..28834e22c2 100644 --- a/db4-storage/src/segments/edge/segment.rs +++ b/db4-storage/src/segments/edge/segment.rs @@ -1,30 +1,30 @@ use crate::{ + LocalPOS, api::edges::{EdgeSegmentOps, LockedESegment}, error::StorageError, persist::{config::ConfigOps, strategy::PersistenceStrategy}, properties::PropMutEntry, segments::{ - edge::entry::{MemEdgeEntry, MemEdgeRef}, HasRow, SegmentContainer, + edge::entry::{MemEdgeEntry, MemEdgeRef}, }, utils::Iter4, wal::LSN, - LocalPOS, }; use parking_lot::lock_api::ArcRwLockReadGuard; use raphtory_api::core::{ entities::{ + LayerId, VID, properties::{ meta::{Meta, STATIC_GRAPH_LAYER_ID}, prop::AsPropRef, }, - LayerId, VID, }, storage::dict_mapper::MaybeNew, }; use raphtory_api_macros::box_on_debug_lifetime; use raphtory_core::{ - entities::{edges::edge_ref::EdgeRef, LayerIds}, + entities::{LayerIds, edges::edge_ref::EdgeRef}, storage::timeindex::{AsTime, EventTime}, }; use rayon::prelude::*; @@ -32,8 +32,8 @@ use std::{ ops::{Deref, DerefMut}, path::PathBuf, sync::{ - atomic::{self, AtomicU32, AtomicUsize, Ordering}, Arc, + atomic::{self, AtomicU32, AtomicUsize, Ordering}, }, }; @@ -634,9 +634,9 @@ impl>> EdgeSegmentOps for EdgeSeg mod test { use super::*; use crate::{ + Config, pages::{edge_page::writer::EdgeWriter, layer_counter::GraphStats}, persist::strategy::NoOpStrategy, - Config, }; use raphtory_api::core::entities::properties::{ meta::{Meta, STATIC_GRAPH_LAYER_ID}, diff --git a/db4-storage/src/segments/node/segment.rs b/db4-storage/src/segments/node/segment.rs index 576985325c..6f9e565b79 100644 --- a/db4-storage/src/segments/node/segment.rs +++ b/db4-storage/src/segments/node/segment.rs @@ -1,37 +1,37 @@ use crate::{ + LocalPOS, api::nodes::{LockedNSSegment, NodeSegmentOps}, error::StorageError, loop_lock_write, persist::{config::ConfigOps, strategy::PersistenceStrategy}, segments::{ - node::entry::{MemNodeEntry, MemNodeRef}, HasRow, SegmentContainer, + node::entry::{MemNodeEntry, MemNodeRef}, }, wal::LSN, - LocalPOS, }; use either::Either; -use parking_lot::{lock_api::ArcRwLockReadGuard, RwLock}; +use parking_lot::{RwLock, lock_api::ArcRwLockReadGuard}; use raphtory_api::core::{ + Direction, entities::{ + EID, LayerId, VID, properties::{ meta::Meta, prop::{AsPropRef, Prop}, }, - LayerId, EID, VID, }, - Direction, }; use raphtory_core::{ - entities::{nodes::structure::adj::Adj, ELID}, + entities::{ELID, nodes::structure::adj::Adj}, storage::timeindex::{AsTime, EventTime}, }; use std::{ ops::{Deref, DerefMut}, path::PathBuf, sync::{ - atomic::{AtomicU32, AtomicUsize, Ordering}, Arc, + atomic::{AtomicU32, AtomicUsize, Ordering}, }, }; @@ -628,13 +628,13 @@ impl>> NodeSegmentOps for NodeSeg #[cfg(test)] mod test { use crate::{ + LocalPOS, NodeSegmentView, api::nodes::NodeSegmentOps, pages::{layer_counter::GraphStats, node_page::writer::NodeWriter}, persist::{ config::BaseConfig, strategy::{NoOpStrategy, PersistenceStrategy}, }, - LocalPOS, NodeSegmentView, }; use raphtory_api::core::entities::properties::{ meta::{Meta, STATIC_GRAPH_LAYER_ID},