diff --git a/core-relations/src/containers/mod.rs b/core-relations/src/containers/mod.rs index 63ca81f41..41c9b112c 100644 --- a/core-relations/src/containers/mod.rs +++ b/core-relations/src/containers/mod.rs @@ -12,11 +12,13 @@ use std::{ any::{Any, TypeId}, hash::{Hash, Hasher}, ops::Deref, + sync::{Arc, Mutex}, }; use crate::numeric_id::{DenseIdMap, IdVec, NumericId, define_id}; use crossbeam_queue::SegQueue; use dashmap::SharedValue; +use hashbrown::HashSet; use rayon::{ iter::{ParallelBridge, ParallelIterator}, prelude::*, @@ -201,7 +203,116 @@ struct ContainerEnv { to_id: DashMap, to_container: DashMap, /// Map from a Value to the set of ids of containers that contain that value. - val_index: DashMap>, + val_index: LazyContainerIdx, +} +#[derive(Clone)] +struct LazyContainerIdx { + val_index: DashMap>, + // keys and value to insert + // if user want to insert same value for all keys in IndexSet, LazyMap will put them + // in pending_insert and do the insertion for single key and remove this key in pending_insert when user want to read LazyMap + pending_operations: Arc, InsertOrRemove)>>>, +} +enum InsertOrRemove { + Insert(Value), + Remove(Value), +} + +const LAZY_BOUND: usize = 30; +use dashmap::mapref::one::Ref; +impl LazyContainerIdx { + /// Creates a new, empty `LazyMapOfIndexSet`. + pub fn new() -> Self { + Self { + val_index: DashMap::default(), + pending_operations: Default::default(), + } + } + + /// Returns a reference to the value corresponding to the key. + pub fn get(&mut self, key: &Value) -> Option>> { + self.flush_pending_operations_for_key(key); + self.val_index.get(key) + } + + /// Lazily inserts a value for all keys in the given index set. + /// The actual insertion will be performed when the map is next accessed. + pub fn insert_for_all_keys(&self, keys: HashSet, value: Value) { + if keys.len() < LAZY_BOUND { + for key in keys { + self.val_index.entry(key).or_default().insert(value); + } + } else { + self.pending_operations + .lock() + .unwrap() + .push((keys, InsertOrRemove::Insert(value))); + } + } + + /// Lazily removes a value for all keys in the given index set. + pub fn remove_for_all_keys(&self, keys: HashSet, value: Value) { + if keys.len() < LAZY_BOUND { + for key in keys { + if let Some(mut pending_keys) = self.val_index.get_mut(&key) { + pending_keys.remove(&value); + } + } + } else { + self.pending_operations + .lock() + .unwrap() + .push((keys, InsertOrRemove::Remove(value))); + } + } + + /// Flushes all pending lazy insertions to the underlying map. + fn flush_pending_operations_for_key(&self, key: &Value) { + let mut pending_ops = self.pending_operations.lock().unwrap(); + let mut flush_whole_set = false; + for (keys, op) in pending_ops.iter_mut() { + if keys.contains(key) { + // if the length of keys set is less than LAZY_BOUND just flush the whole set. + if keys.len() < LAZY_BOUND { + // flush all keys in set + for key in keys.iter() { + match op { + InsertOrRemove::Insert(v) => { + self.val_index.entry(*key).or_default().insert(*v); + } + InsertOrRemove::Remove(v) => { + if let Some(mut pending_keys) = self.val_index.get_mut(key) { + pending_keys.remove(v); + } + } + } + } + flush_whole_set = true; + } else { + match op { + InsertOrRemove::Insert(v) => { + self.val_index.entry(*key).or_default().insert(*v); + } + InsertOrRemove::Remove(v) => { + if let Some(mut pending_keys) = self.val_index.get_mut(key) { + pending_keys.remove(v); + } + } + } + keys.remove(key); + } + } + } + if flush_whole_set { + pending_ops.retain(|(keys, _ops)| !keys.is_empty()); + } + } +} + +impl Default for LazyContainerIdx { + fn default() -> Self { + Self::new() + } } impl DynamicContainerEnv for ContainerEnv { @@ -242,7 +353,7 @@ impl ContainerEnv { counter, to_id: DashMap::default(), to_container: DashMap::default(), - val_index: DashMap::default(), + val_index: Default::default(), } } @@ -274,9 +385,8 @@ impl ContainerEnv { dashmap::Entry::Vacant(vac) => { // Common case: insert the mapping in to_id and update the index. vac.insert(value); - for val in container.iter() { - self.val_index.entry(val).or_default().insert(value); - } + self.val_index + .insert_for_all_keys(container.iter().collect(), value); value } dashmap::Entry::Occupied(occ) => { @@ -302,18 +412,16 @@ impl ContainerEnv { self.to_container.remove(&old_val); self.to_container.insert(result, (hc as usize, target_map)); *occ.get_mut() = result; - for val in occ.key().iter() { - let mut index = self.val_index.entry(val).or_default(); - index.swap_remove(&old_val); - index.insert(result); - } + self.val_index + .remove_for_all_keys(occ.key().iter().collect(), old_val); + self.val_index + .insert_for_all_keys(occ.key().iter().collect(), result); } } dashmap::Entry::Vacant(vacant_entry) => { self.to_container.insert(value, (hc as usize, target_map)); - for val in vacant_entry.key().iter() { - self.val_index.entry(val).or_default().insert(value); - } + self.val_index + .insert_for_all_keys(vacant_entry.key().iter().collect(), value); vacant_entry.insert(value); } } @@ -349,7 +457,7 @@ impl ContainerEnv { let Some(ids) = self.val_index.get(&row[0]) else { continue; }; - to_rebuild.extend(&*ids); + to_rebuild.extend(ids.iter()); } for id in to_rebuild { let Some((hc, target_map)) = self.to_container.get(&id).map(|x| *x) else { @@ -500,18 +608,17 @@ impl ContainerEnv { self.to_container.remove(&old_val); self.to_container.insert(result, (hc as usize, target_map)); *val_slot.get_mut() = result; - for val in container.iter() { - let mut index = self.val_index.entry(val).or_default(); - index.swap_remove(&old_val); - index.insert(result); - } + self.val_index + .remove_for_all_keys(container.iter().collect(), old_val); + self.val_index + .insert_for_all_keys(container.iter().collect(), result); } } Err(slot) => { self.to_container.insert(val, (hc as usize, target_map)); - for v in container.iter() { - self.val_index.entry(v).or_default().insert(val); - } + self.val_index + .insert_for_all_keys(container.iter().collect(), val); + // SAFETY: We just got this slot from `find_or_find_insert_slot` // and we have not mutated the map at all since then. unsafe {