Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion db4-storage/src/gen_ts.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::ops::Range;

use itertools::Itertools;
use raphtory_api::core::entities::LayerId;
use raphtory_core::{
entities::{ELID, LayerIds, layers::Multiple},
Expand Down
136 changes: 136 additions & 0 deletions db4-storage/src/pages/edge_page/bulk_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use crate::{
LocalPOS, api::edges::EdgeSegmentOps, pages::edge_page::writer::EdgeWriter,
segments::edge::segment::MemEdgeSegment,
};
use raphtory_api::core::entities::{
EID, LayerId, VID,
properties::{meta::STATIC_GRAPH_LAYER_ID, prop::AsPropRef},
};
use raphtory_core::storage::timeindex::{AsTime, EventTime};
use std::ops::DerefMut;

pub struct BulkEdgeWriter<
'a,
MP: DerefMut<Target = MemEdgeSegment> + std::fmt::Debug,
ES: EdgeSegmentOps,
> {
ew: EdgeWriter<'a, MP, ES>,
layers: Vec<usize>,
earliest: EventTime,
latest: EventTime,
}

impl<'a, MP: DerefMut<Target = MemEdgeSegment> + std::fmt::Debug, ES: EdgeSegmentOps>
From<EdgeWriter<'a, MP, ES>> for BulkEdgeWriter<'a, MP, ES>
{
fn from(value: EdgeWriter<'a, MP, ES>) -> Self {
Self {
ew: value,
layers: vec![0],
earliest: EventTime::MAX,
latest: EventTime::MIN,
}
}
}

impl<'a, MP: DerefMut<Target = MemEdgeSegment> + std::fmt::Debug, ES: EdgeSegmentOps>
BulkEdgeWriter<'a, MP, ES>
{
pub fn bulk_add_edge<P: AsPropRef>(
&mut self,
t: EventTime,
edge_pos: LocalPOS,
src: VID,
dst: VID,
edge_exists: bool,
layer_id: LayerId,
c_props: impl IntoIterator<Item = (usize, P)>,
t_props: impl IntoIterator<Item = (usize, P)>,
) {
if !edge_exists {
if self
.ew
.writer
.insert_static_edge_internal(edge_pos, src, dst, STATIC_GRAPH_LAYER_ID)
{
self.increment_layer_num_edges(STATIC_GRAPH_LAYER_ID);
}
}

if self
.ew
.writer
.insert_edge_internal(t, edge_pos, src, dst, layer_id, t_props)
&& !self.ew.page.immut_has_edge(edge_pos, layer_id)
{
self.increment_layer_num_edges(layer_id);
}

self.update_time(t);

self.ew
.writer
.update_const_properties(edge_pos, src, dst, layer_id, c_props);
}

pub fn bulk_delete_edge(
&mut self,
t: EventTime,
edge_pos: LocalPOS,
src: VID,
dst: VID,
exists: bool,
layer_id: LayerId,
) {
if !exists {
if self
.ew
.writer
.insert_static_edge_internal(edge_pos, src, dst, STATIC_GRAPH_LAYER_ID)
{
self.increment_layer_num_edges(STATIC_GRAPH_LAYER_ID);
}
}

self.update_time(t);
if self
.ew
.writer
.delete_edge_internal(t, edge_pos, src, dst, layer_id)
&& !self.ew.page.immut_has_edge(edge_pos, layer_id)
{
self.increment_layer_num_edges(layer_id);
}
}

#[inline]
fn increment_layer_num_edges(&mut self, layer_id: LayerId) {
if self.layers.len() <= layer_id.0 {
self.layers.resize_with(layer_id.0 + 1, Default::default);
}
self.layers[layer_id.0] += 1;
}

#[inline]
fn update_time(&mut self, t: EventTime) {
self.earliest = self.earliest.min(t);
self.latest = self.latest.max(t);
}

#[inline(always)]
pub fn resolve_pos(&self, edge_id: EID) -> Option<LocalPOS> {
self.ew.resolve_pos(edge_id)
}
}

impl<'a, MP: DerefMut<Target = MemEdgeSegment> + std::fmt::Debug, ES: EdgeSegmentOps> Drop
for BulkEdgeWriter<'a, MP, ES>
{
fn drop(&mut self) {
for (layer_id, count) in self.layers.iter().enumerate() {
self.ew.graph_stats.increment_by(LayerId(layer_id), *count);
}
self.ew.graph_stats.update_time(self.earliest.t());
self.ew.graph_stats.update_time(self.latest.t());
}
}
1 change: 1 addition & 0 deletions db4-storage/src/pages/edge_page/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod bulk_writer;
pub mod writer;
1 change: 1 addition & 0 deletions db4-storage/src/pages/edge_page/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl<'a, MP: DerefMut<Target = MemEdgeSegment> + std::fmt::Debug, ES: EdgeSegmen
self.page.segment_id()
}

#[inline]
fn increment_layer_num_edges(&self, layer_id: LayerId) {
self.graph_stats.increment(layer_id);
}
Expand Down
8 changes: 7 additions & 1 deletion db4-storage/src/pages/layer_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,15 @@ impl GraphStats {
self.latest.get()
}

#[inline(always)]
pub fn increment(&self, layer_id: LayerId) -> usize {
self.increment_by(layer_id, 1)
}

#[inline(always)]
pub fn increment_by(&self, layer_id: LayerId, count: usize) -> usize {
let counter = self.get_or_create_layer(layer_id);
counter.fetch_add(1, std::sync::atomic::Ordering::Release)
counter.fetch_add(count, std::sync::atomic::Ordering::Release)
}

pub fn get(&self, layer_id: LayerId) -> usize {
Expand Down
11 changes: 10 additions & 1 deletion db4-storage/src/pages/locked/edges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use crate::{
LocalPOS,
api::edges::EdgeSegmentOps,
error::StorageError,
pages::{edge_page::writer::EdgeWriter, layer_counter::GraphStats, resolve_pos},
pages::{
edge_page::{bulk_writer::BulkEdgeWriter, writer::EdgeWriter},
layer_counter::GraphStats,
resolve_pos,
},
persist::strategy::PersistenceStrategy,
segments::edge::segment::MemEdgeSegment,
};
Expand Down Expand Up @@ -44,6 +48,11 @@ impl<'a, ES: EdgeSegmentOps> LockedEdgePage<'a, ES> {
EdgeWriter::new(self.num_edges, self.page, self.lock.deref_mut())
}

#[inline(always)]
pub fn bulk_writer(&mut self) -> BulkEdgeWriter<'_, &mut MemEdgeSegment, ES> {
EdgeWriter::new(self.num_edges, self.page, self.lock.deref_mut()).into()
}

#[inline(always)]
pub fn page_id(&self) -> usize {
self.page_id
Expand Down
11 changes: 10 additions & 1 deletion db4-storage/src/pages/locked/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ use crate::{
LocalPOS,
api::nodes::NodeSegmentOps,
error::StorageError,
pages::{layer_counter::GraphStats, node_page::writer::NodeWriter, resolve_pos},
pages::{
layer_counter::GraphStats,
node_page::{bulk_writer::BulkNodeWriter, writer::NodeWriter},
resolve_pos,
},
persist::strategy::PersistenceStrategy,
segments::node::segment::MemNodeSegment,
};
Expand Down Expand Up @@ -47,6 +51,11 @@ impl<'a, NS: NodeSegmentOps> LockedNodePage<'a, NS> {
NodeWriter::new(self.page, self.layer_counter, self.lock.deref_mut())
}

#[inline(always)]
pub fn bulk_writer(&mut self) -> BulkNodeWriter<'_, &mut MemNodeSegment, NS> {
NodeWriter::new(self.page, self.layer_counter, self.lock.deref_mut()).into()
}

pub fn head(&mut self) -> &mut MemNodeSegment {
self.lock.deref_mut()
}
Expand Down
145 changes: 145 additions & 0 deletions db4-storage/src/pages/node_page/bulk_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::ops::DerefMut;

use raphtory_api::core::entities::properties::{
meta::{NODE_ID_IDX, STATIC_GRAPH_LAYER_ID},
prop::Prop,
};
use raphtory_core::{
entities::{EID, ELID, GID, LayerId, VID},
storage::timeindex::AsTime,
};

use crate::{
LocalPOS, api::nodes::NodeSegmentOps, pages::node_page::writer::NodeWriter,
segments::node::segment::MemNodeSegment,
};

#[derive(Debug)]
pub struct BulkNodeWriter<'a, MP: DerefMut<Target = MemNodeSegment> + 'a, NS: NodeSegmentOps> {
nw: NodeWriter<'a, MP, NS>,
layers: Vec<usize>,
}

impl<'a, MP: DerefMut<Target = MemNodeSegment> + 'a, NS: NodeSegmentOps>
From<NodeWriter<'a, MP, NS>> for BulkNodeWriter<'a, MP, NS>
{
fn from(value: NodeWriter<'a, MP, NS>) -> Self {
Self {
nw: value,
layers: Vec::new(),
}
}
}

impl<'a, MP: DerefMut<Target = MemNodeSegment> + 'a, NS: NodeSegmentOps>
BulkNodeWriter<'a, MP, NS>
{
#[inline]
pub fn get_out_edge(&self, pos: LocalPOS, dst: VID, layer_id: LayerId) -> Option<EID> {
self.nw.get_out_edge(pos, dst, layer_id)
}

#[inline(always)]
pub fn resolve_pos(&self, node_id: VID) -> Option<LocalPOS> {
self.nw.resolve_pos(node_id)
}

#[inline(always)]
pub fn add_static_outbound_edge(
&mut self,
src_pos: LocalPOS,
dst: impl Into<VID>,
e_id: impl Into<EID>,
) {
let e_id = e_id.into();
self.nw.add_outbound_edge_inner::<i64>(
None,
src_pos,
dst,
e_id.with_layer(STATIC_GRAPH_LAYER_ID),
|layer_id| {
Self::update_layer_count(layer_id, &mut self.layers);
},
);
}

pub fn add_static_inbound_edge(
&mut self,
dst_pos: LocalPOS,
src: impl Into<VID>,
e_id: impl Into<EID>,
) {
let e_id = e_id.into();
self.nw.add_inbound_edge_inner::<i64>(
None,
dst_pos,
src,
e_id.with_layer(STATIC_GRAPH_LAYER_ID),
|layer_id| {
Self::update_layer_count(layer_id, &mut self.layers);
},
);
}

#[inline(always)]
pub fn add_outbound_edge<T: AsTime>(
&mut self,
t: Option<T>,
src_pos: impl Into<LocalPOS>,
dst: impl Into<VID>,
e_id: impl Into<ELID>,
) {
self.nw
.add_outbound_edge_inner(t, src_pos, dst, e_id, |layer_id| {
Self::update_layer_count(layer_id, &mut self.layers);
});
}

pub fn add_inbound_edge<T: AsTime>(
&mut self,
t: Option<T>,
dst_pos: impl Into<LocalPOS>,
src: impl Into<VID>,
e_id: impl Into<ELID>,
) {
self.nw
.add_inbound_edge_inner(t, dst_pos, src, e_id, |layer_id| {
Self::update_layer_count(layer_id, &mut self.layers);
});
}

fn update_layer_count(layer_id: LayerId, layers: &mut Vec<usize>) {
if layers.len() <= layer_id.0 {
layers.resize_with(layer_id.0 + 1, Default::default);
}
layers[layer_id.0] += 1;
}

#[inline(always)]
pub fn update_timestamp<T: AsTime>(&mut self, t: T, pos: LocalPOS, e_id: ELID) {
self.nw.update_timestamp(t, pos, e_id);
}

#[inline(always)]
pub fn store_node_id(&mut self, pos: LocalPOS, layer_id: LayerId, gid: GID) {
let gid = match gid {
GID::U64(id) => Prop::U64(id),
GID::Str(s) => Prop::str(s),
};
let props = [(NODE_ID_IDX, gid)];
self.nw
.update_c_props_inner(pos, layer_id, props, |layer_id| {
Self::update_layer_count(layer_id, &mut self.layers);
});
}
}

impl<'a, MP: DerefMut<Target = MemNodeSegment>, ES: NodeSegmentOps> Drop
for BulkNodeWriter<'a, MP, ES>
{
fn drop(&mut self) {
for (layer_id, count) in self.layers.iter().enumerate() {
self.nw.l_counter.increment_by(LayerId(layer_id), *count);
}
}
}
1 change: 1 addition & 0 deletions db4-storage/src/pages/node_page/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod bulk_writer;
pub mod writer;
Loading