Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 158 additions & 26 deletions collator/src/collator/messages_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, VecDeque, btree_map};
use std::collections::{BTreeMap, VecDeque, btree_map, hash_map};

use rayon::iter::IntoParallelIterator;
use tycho_block_util::queue::QueueKey;
Expand All @@ -18,6 +18,8 @@ type SlotId = u16;
type FastIndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
pub type FastIndexSet<K> = indexmap::IndexSet<K, ahash::RandomState>;

type SortedIndexUpdates = FastHashMap<HashBytes, (usize, usize)>;

#[derive(Clone, Copy)]
pub struct MessagesBufferLimits {
pub max_count: usize,
Expand Down Expand Up @@ -102,6 +104,23 @@ impl MessagesBuffer {
}

pub fn remove_messages_by_accounts(&mut self, addresses_to_remove: &FastHashSet<HashBytes>) {
// update sorted by count index
for account_id in addresses_to_remove {
if let Some(account_msgs) = self.msgs.get(account_id) {
let old = account_msgs.len();
if old > 0
&& let Some(set) = self.sorted_index.get_mut(&old)
{
set.remove(account_id);
if set.is_empty() {
self.sorted_index.remove(&old);
}
}
}
}

// remove messages
// NOTE: this method is called only for internal messages
self.msgs.retain(|k, v| {
if addresses_to_remove.contains(k) {
self.int_count -= v.len();
Expand Down Expand Up @@ -132,10 +151,10 @@ impl MessagesBuffer {
let mut slots_info = std::mem::take(&mut msg_group.slots_info);

// we will collect updates for slots index and apply them at the end
let mut slots_index_updates = BTreeMap::new();
// we will collect updates for sorted index and apply them at the end
// we will track collected queue messages
let mut fill_cx = FillMessageGroupContext::default();

// track collected messages
let mut collected_int_msgs = vec![];
let mut collected_count = 0;

// track accounts whose messages were not used to fill group
Expand Down Expand Up @@ -172,7 +191,11 @@ impl MessagesBuffer {
while let Some(account_id) = buffer_accounts.pop_front() {
if msg_group.msgs.contains_key(&account_id) {
// try skip messages from skipped account: maybe they expired
self.try_skip_account_msgs(&account_id, &mut msg_filter);
self.try_skip_account_msgs(
&account_id,
&mut fill_cx.sorted_index_updates,
&mut msg_filter,
);

continue;
}
Expand All @@ -182,7 +205,11 @@ impl MessagesBuffer {
ops_count.saturating_add_assign(check_ops_count);
if skip_account {
// try skip messages from skipped account: maybe they expired
self.try_skip_account_msgs(&account_id, &mut msg_filter);
self.try_skip_account_msgs(
&account_id,
&mut fill_cx.sorted_index_updates,
&mut msg_filter,
);

continue;
}
Expand All @@ -191,8 +218,7 @@ impl MessagesBuffer {
account_id,
msg_group,
&mut slot_cx,
&mut collected_int_msgs,
&mut slots_index_updates,
&mut fill_cx,
&mut msg_filter,
);
ops_count.saturating_add_assign(move_res.ops_count);
Expand Down Expand Up @@ -265,7 +291,11 @@ impl MessagesBuffer {
ops_count.saturating_add_assign(check_ops_count);
if skip_account {
// try skip messages from skipped account: maybe they expired
self.try_skip_account_msgs(&account_id, &mut msg_filter);
self.try_skip_account_msgs(
&account_id,
&mut fill_cx.sorted_index_updates,
&mut msg_filter,
);

continue;
}
Expand All @@ -274,8 +304,7 @@ impl MessagesBuffer {
account_id,
msg_group,
&mut slot_cx,
&mut collected_int_msgs,
&mut slots_index_updates,
&mut fill_cx,
&mut msg_filter,
);
ops_count.saturating_add_assign(move_res.ops_count);
Expand All @@ -296,7 +325,11 @@ impl MessagesBuffer {
while let Some(account_id) = buffer_accounts.pop_front() {
if msg_group.msgs.contains_key(&account_id) {
// try skip messages from skipped account: maybe they expired
self.try_skip_account_msgs(&account_id, &mut msg_filter);
self.try_skip_account_msgs(
&account_id,
&mut fill_cx.sorted_index_updates,
&mut msg_filter,
);

continue;
}
Expand All @@ -306,7 +339,11 @@ impl MessagesBuffer {
ops_count.saturating_add_assign(check_ops_count);
if skip_account {
// try skip messages from skipped account: maybe they expired
self.try_skip_account_msgs(&account_id, &mut msg_filter);
self.try_skip_account_msgs(
&account_id,
&mut fill_cx.sorted_index_updates,
&mut msg_filter,
);

continue;
}
Expand All @@ -315,8 +352,7 @@ impl MessagesBuffer {
account_id,
msg_group,
&mut slot_cx,
&mut collected_int_msgs,
&mut slots_index_updates,
&mut fill_cx,
&mut msg_filter,
);
ops_count.saturating_add_assign(move_res.ops_count);
Expand All @@ -332,7 +368,11 @@ impl MessagesBuffer {
// check and skip messages in remaning accounts if required
if msg_filter.can_skip() {
while let Some(account_id) = buffer_accounts.pop_front() {
self.try_skip_account_msgs(&account_id, &mut msg_filter);
self.try_skip_account_msgs(
&account_id,
&mut fill_cx.sorted_index_updates,
&mut msg_filter,
);
}
}

Expand All @@ -342,7 +382,7 @@ impl MessagesBuffer {

// 3. update slots index
let mut remove_slot_ids = BTreeMap::<usize, FastHashSet<u16>>::new();
for (slot_id, index_update) in slots_index_updates {
for (slot_id, index_update) in fill_cx.slots_index_updates {
// remove slot from old count basket
if index_update.old_count() != 0 {
remove_slot_ids
Expand Down Expand Up @@ -372,8 +412,30 @@ impl MessagesBuffer {
// put slots info into group
std::mem::swap(&mut slots_info, &mut msg_group.slots_info);

// 4. update sorted index
for (account_id, (old_count, new_count)) in fill_cx.sorted_index_updates {
if old_count > 0
&& let Some(set) = self.sorted_index.get_mut(&old_count)
{
set.remove(&account_id);
if set.is_empty() {
self.sorted_index.remove(&old_count);
}
}
if new_count > 0 {
match self.sorted_index.entry(new_count) {
btree_map::Entry::Vacant(vacant) => {
vacant.insert([account_id].into_iter().collect());
}
btree_map::Entry::Occupied(mut occupied) => {
occupied.get_mut().insert(account_id);
}
}
}
}

FillMessageGroupResult {
collected_int_msgs,
collected_int_msgs: fill_cx.collected_int_msgs,
collected_count,
ops_count,
}
Expand All @@ -384,8 +446,7 @@ impl MessagesBuffer {
account_id: HashBytes,
msg_group: &mut MessageGroup,
slot_cx: &mut SlotContext<'_>,
collected_int_msgs: &mut Vec<QueueKey>,
slots_index_updates: &mut BTreeMap<SlotId, SlotIndexUpdate>,
fill_cx: &mut FillMessageGroupContext,
mut msg_filter: FM,
) -> MoveMessagesResult
where
Expand All @@ -405,6 +466,8 @@ impl MessagesBuffer {
if let Some(account_msgs) = self.msgs.get_mut(&account_id) {
res.ops_count.saturating_add_assign(1);

let prev_len = account_msgs.len();

amount = account_msgs.len().min(slot_cx.remaning_capacity);
if amount == 0 {
return res;
Expand Down Expand Up @@ -435,7 +498,7 @@ impl MessagesBuffer {
// collect message if it was not skipped
match &msg.info {
MsgInfo::Int(info) => {
collected_int_msgs.push(QueueKey {
fill_cx.collected_int_msgs.push(QueueKey {
lt: info.created_lt,
hash: *msg.cell.repr_hash(),
});
Expand All @@ -449,6 +512,17 @@ impl MessagesBuffer {
slot_account_msgs.push(msg);
}

// accum sorted index updates
let new_len = prev_len - amount;
if new_len != prev_len {
note_sorted_index_update(
&mut fill_cx.sorted_index_updates,
account_id,
prev_len,
new_len,
);
}

if slot_account_msgs.is_empty() {
msg_group.msgs.remove(&account_id);
res.ops_count.saturating_add_assign(1);
Expand Down Expand Up @@ -485,7 +559,8 @@ impl MessagesBuffer {
}

// collect slot index updates
slots_index_updates
fill_cx
.slots_index_updates
.entry(slot_cx.slot_id)
.and_modify(|index_update| {
index_update.new_int_count = slot_cx.slot.int_count;
Expand All @@ -501,29 +576,67 @@ impl MessagesBuffer {
res
}

fn try_skip_account_msgs<FM>(&mut self, account_id: &HashBytes, mut filter: FM)
where
fn try_skip_account_msgs<FM>(
&mut self,
account_id: &HashBytes,
sorted_index_updates: &mut SortedIndexUpdates,
mut filter: FM,
) where
FM: MessageFilter,
{
if !filter.can_skip() {
return;
}

if let Some(account_msgs) = self.msgs.get_mut(account_id) {
let prev_len = account_msgs.len();
let mut skip_count = 0;

account_msgs.retain(|msg| {
let skip = filter.should_skip(msg);
debug_assert!(
!skip || msg.info.is_external_in(),
"we should only skip external messages"
);
self.ext_count -= skip as usize;
if skip {
skip_count += 1;
}
!skip
});

let new_len = prev_len - skip_count;
if new_len != prev_len {
note_sorted_index_update(sorted_index_updates, *account_id, prev_len, new_len);
}
}
}
}

#[inline]
fn note_sorted_index_update(
index_updates: &mut SortedIndexUpdates,
account_id: HashBytes,
old_count: usize,
new_count: usize,
) {
match index_updates.entry(account_id) {
hash_map::Entry::Occupied(mut occupied) => {
occupied.get_mut().1 = new_count;
}
hash_map::Entry::Vacant(vacant) => {
vacant.insert((old_count, new_count));
}
}
}

#[derive(Default)]
struct FillMessageGroupContext {
pub collected_int_msgs: Vec<QueueKey>,
pub slots_index_updates: BTreeMap<SlotId, SlotIndexUpdate>,
pub sorted_index_updates: SortedIndexUpdates,
}

pub struct FillMessageGroupResult {
pub collected_int_msgs: Vec<QueueKey>,
pub collected_count: usize,
Expand Down Expand Up @@ -653,8 +766,27 @@ impl MessagesBuffer {
BufferFillStateByCount::NotFull
};

// TODO: msgs-v3: check if we can already fill required slots
let by_slots = BufferFillStateBySlots::CanNotFill;
// check if we can already fill required slots count
let mut by_slots = BufferFillStateBySlots::CanNotFill;

if self.msgs.len() >= limits.slots_count {
let mut full_slots_count = 0;
let mut current_slot_msgs_count = 0;
for (&account_msgs_count, account_ids) in self.sorted_index.iter().rev() {
for _ in account_ids {
current_slot_msgs_count += account_msgs_count;
if current_slot_msgs_count >= limits.slot_vert_size * 3 {
full_slots_count += 1;
current_slot_msgs_count = 0;

if full_slots_count >= limits.slots_count {
by_slots = BufferFillStateBySlots::CanFill;
break;
}
}
}
}
}

(by_count, by_slots)
}
Expand Down