Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 129 additions & 22 deletions core-relations/src/containers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ use std::{
any::{Any, TypeId},
hash::{Hash, Hasher},
ops::Deref,
sync::{Arc, Mutex},
};

use crate::numeric_id::{DenseIdMap, IdVec, NumericId, define_id};
use crossbeam_queue::SegQueue;
use dashmap::SharedValue;
use hashbrown::HashSet;
use rayon::{
iter::{ParallelBridge, ParallelIterator},
prelude::*,
Expand Down Expand Up @@ -201,7 +203,116 @@ struct ContainerEnv<C: Eq + Hash> {
to_id: DashMap<C, Value>,
to_container: DashMap<Value, (usize /* hash code */, usize /* map */)>,
/// Map from a Value to the set of ids of containers that contain that value.
val_index: DashMap<Value, IndexSet<Value>>,
val_index: LazyContainerIdx,
}
#[derive(Clone)]
struct LazyContainerIdx {
val_index: DashMap<Value, HashSet<Value>>,
// keys and value to insert
// if user want to insert same value for all keys in IndexSet<Value>, LazyMap will put them
// in pending_insert and do the insertion for single key and remove this key in pending_insert when user want to read LazyMap
pending_operations: Arc<Mutex<Vec<(HashSet<Value>, InsertOrRemove)>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be very slow when many threads are contending to insert to the index

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eli suggested crossbeam_queue for concurrent access.

}
enum InsertOrRemove {
Insert(Value),
Remove(Value),
}

const LAZY_BOUND: usize = 30;
use dashmap::mapref::one::Ref;
impl LazyContainerIdx {
/// Creates a new, empty `LazyMapOfIndexSet`.
pub fn new() -> Self {
Self {
val_index: DashMap::default(),
pending_operations: Default::default(),
}
}

/// Returns a reference to the value corresponding to the key.
pub fn get(&mut self, key: &Value) -> Option<Ref<'_, Value, HashSet<Value>>> {
self.flush_pending_operations_for_key(key);
self.val_index.get(key)
}

/// Lazily inserts a value for all keys in the given index set.
/// The actual insertion will be performed when the map is next accessed.
pub fn insert_for_all_keys(&self, keys: HashSet<Value>, value: Value) {
if keys.len() < LAZY_BOUND {
for key in keys {
self.val_index.entry(key).or_default().insert(value);
}
} else {
self.pending_operations
.lock()
.unwrap()
.push((keys, InsertOrRemove::Insert(value)));
}
}

/// Lazily removes a value for all keys in the given index set.
pub fn remove_for_all_keys(&self, keys: HashSet<Value>, value: Value) {
if keys.len() < LAZY_BOUND {
Copy link
Collaborator

@yihozhang yihozhang Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to flush all the updates before eagerly removing?

update: I think you also need to do flush_pending_operations_for_key for eager insertion, not just eager removal

for key in keys {
if let Some(mut pending_keys) = self.val_index.get_mut(&key) {
pending_keys.remove(&value);
}
}
} else {
self.pending_operations
.lock()
.unwrap()
.push((keys, InsertOrRemove::Remove(value)));
}
}

/// Flushes all pending lazy insertions to the underlying map.
fn flush_pending_operations_for_key(&self, key: &Value) {
let mut pending_ops = self.pending_operations.lock().unwrap();
let mut flush_whole_set = false;
for (keys, op) in pending_ops.iter_mut() {
if keys.contains(key) {
// if the length of keys set is less than LAZY_BOUND just flush the whole set.
if keys.len() < LAZY_BOUND {
// flush all keys in set
for key in keys.iter() {
match op {
InsertOrRemove::Insert(v) => {
self.val_index.entry(*key).or_default().insert(*v);
}
InsertOrRemove::Remove(v) => {
if let Some(mut pending_keys) = self.val_index.get_mut(key) {
pending_keys.remove(v);
}
}
}
}
flush_whole_set = true;
} else {
match op {
InsertOrRemove::Insert(v) => {
self.val_index.entry(*key).or_default().insert(*v);
}
InsertOrRemove::Remove(v) => {
if let Some(mut pending_keys) = self.val_index.get_mut(key) {
pending_keys.remove(v);
}
}
}
keys.remove(key);
}
}
}
if flush_whole_set {
pending_ops.retain(|(keys, _ops)| !keys.is_empty());
}
}
}

impl Default for LazyContainerIdx {
fn default() -> Self {
Self::new()
}
}

impl<C: ContainerValue> DynamicContainerEnv for ContainerEnv<C> {
Expand Down Expand Up @@ -242,7 +353,7 @@ impl<C: ContainerValue> ContainerEnv<C> {
counter,
to_id: DashMap::default(),
to_container: DashMap::default(),
val_index: DashMap::default(),
val_index: Default::default(),
}
}

Expand Down Expand Up @@ -274,9 +385,8 @@ impl<C: ContainerValue> ContainerEnv<C> {
dashmap::Entry::Vacant(vac) => {
// Common case: insert the mapping in to_id and update the index.
vac.insert(value);
for val in container.iter() {
self.val_index.entry(val).or_default().insert(value);
}
self.val_index
.insert_for_all_keys(container.iter().collect(), value);
value
}
dashmap::Entry::Occupied(occ) => {
Expand All @@ -302,18 +412,16 @@ impl<C: ContainerValue> ContainerEnv<C> {
self.to_container.remove(&old_val);
self.to_container.insert(result, (hc as usize, target_map));
*occ.get_mut() = result;
for val in occ.key().iter() {
let mut index = self.val_index.entry(val).or_default();
index.swap_remove(&old_val);
index.insert(result);
}
self.val_index
.remove_for_all_keys(occ.key().iter().collect(), old_val);
self.val_index
.insert_for_all_keys(occ.key().iter().collect(), result);
}
}
dashmap::Entry::Vacant(vacant_entry) => {
self.to_container.insert(value, (hc as usize, target_map));
for val in vacant_entry.key().iter() {
self.val_index.entry(val).or_default().insert(value);
}
self.val_index
.insert_for_all_keys(vacant_entry.key().iter().collect(), value);
vacant_entry.insert(value);
}
}
Expand Down Expand Up @@ -349,7 +457,7 @@ impl<C: ContainerValue> ContainerEnv<C> {
let Some(ids) = self.val_index.get(&row[0]) else {
continue;
};
to_rebuild.extend(&*ids);
to_rebuild.extend(ids.iter());
}
for id in to_rebuild {
let Some((hc, target_map)) = self.to_container.get(&id).map(|x| *x) else {
Expand Down Expand Up @@ -500,18 +608,17 @@ impl<C: ContainerValue> ContainerEnv<C> {
self.to_container.remove(&old_val);
self.to_container.insert(result, (hc as usize, target_map));
*val_slot.get_mut() = result;
for val in container.iter() {
let mut index = self.val_index.entry(val).or_default();
index.swap_remove(&old_val);
index.insert(result);
}
self.val_index
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

swap_remove is always used together with an insert, so maybe we can have an update_for_all_keys that takes both the old value and the new value. This way you don't need to materialize the container twice.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As another optimization, currently we always materialize the container (container.iter().collect()), but when the container only has a fewer entries than (LAZY_BOUND), the materialized container is immediately discarded. Can we make insert/remove_for_all_keys take an iterator instead of an IndexSet?

Copy link
Contributor Author

@MilkBlock MilkBlock Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 ways to implement this.

pub trait ContainerValue: Hash + Eq + Clone + Send + Sync + 'static {
    /// Rebuild an additional container in place according the the given [`Rebuilder`].
    ///
    /// If this method returns `false` then the container must not have been modified (i.e. it must
    /// hash to the same value, and compare equal to a copy of itself before the call).
    fn rebuild_contents(&mut self, rebuilder: &dyn Rebuilder) -> bool;

    /// Iterate over the contents of the container.
    ///
    /// Note that containers can be more structured than just a sequence of values. This iterator
    /// is used to populate an index that in turn is used to speed up rebuilds. If a value in the
    /// container is eligible for a rebuild and it is not mentioned by this iterator, the outer
    /// [`Containers`] registry may skip rebuilding this container.
    fn iter(&self) -> impl Iterator<Item = Value> + '_;
}
  1. trait revision: add fn len()to this trait
  2. make iterator sized_iterator

Maybe we could consider it in next PR this may require many changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just change the interface of iter to return an ExtractSizeIterator?

.remove_for_all_keys(container.iter().collect(), old_val);
self.val_index
.insert_for_all_keys(container.iter().collect(), result);
}
}
Err(slot) => {
self.to_container.insert(val, (hc as usize, target_map));
for v in container.iter() {
self.val_index.entry(v).or_default().insert(val);
}
self.val_index
.insert_for_all_keys(container.iter().collect(), val);

// SAFETY: We just got this slot from `find_or_find_insert_slot`
// and we have not mutated the map at all since then.
unsafe {
Expand Down