From e5ef168a6b4a6e5cb757cff1cd9e08b061f25234 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 14 Mar 2024 17:25:44 +0900 Subject: [PATCH 1/5] Using an unrolled linked list to halve memory usage --- src/lib.rs | 51 ++++ src/mem/arena.rs | 201 +++++++++++++ src/mem/mod.rs | 4 + src/mem/queue.rs | 144 +++------- src/mem/queues.rs | 46 +-- src/mem/rolling_buffer.rs | 266 ++++++++++++++++++ src/multi_record_log.rs | 9 +- .../{file_number.rs => file_tracker.rs} | 53 +--- src/rolling/mod.rs | 5 +- src/tests.rs | 1 + 10 files changed, 606 insertions(+), 174 deletions(-) create mode 100644 src/mem/arena.rs create mode 100644 src/mem/rolling_buffer.rs rename src/rolling/{file_number.rs => file_tracker.rs} (72%) diff --git a/src/lib.rs b/src/lib.rs index 0a6d1ce..f3a6c67 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ use std::borrow::Cow; +use std::sync::Arc; mod block_read_write; @@ -28,6 +29,56 @@ impl<'a> Record<'a> { } } +#[derive(Clone, Default, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub struct FileNumber { + file_number: Arc, +} + +impl FileNumber { + fn new(file_number: u64) -> Self { + FileNumber { + file_number: Arc::new(file_number), + } + } + + /// Returns whether there is no clone of this FileNumber in existance. + /// + /// /!\ care should be taken to not have some other code store a &FileNumber which could alias + /// with self as it might then be sementically incorrect to delete content based only on this + /// returning `true`. + pub fn can_be_deleted(&self) -> bool { + Arc::strong_count(&self.file_number) == 1 + } + + #[cfg(test)] + pub fn unroll(&self, tracker: &crate::rolling::FileTracker) -> Vec { + let mut file = self.clone(); + let mut file_numbers = Vec::new(); + loop { + file_numbers.push(file.file_number()); + if let Some(next_file) = tracker.next(&file) { + file = next_file; + } else { + return file_numbers; + } + } + } + + pub fn filename(&self) -> String { + format!("wal-{:020}", self.file_number) + } + + #[cfg(test)] + pub fn file_number(&self) -> u64 { + *self.file_number + } + + #[cfg(test)] + pub fn for_test(file_number: u64) -> Self { + FileNumber::new(file_number) + } +} + /// Resources used by mrecordlog #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ResourceUsage { diff --git a/src/mem/arena.rs b/src/mem/arena.rs new file mode 100644 index 0000000..2a093bc --- /dev/null +++ b/src/mem/arena.rs @@ -0,0 +1,201 @@ +use std::time::{Duration, Instant}; + +#[cfg(not(test))] +pub const PAGE_SIZE: usize = 1 << 20; + +#[cfg(test)] +pub const PAGE_SIZE: usize = 7; + +// TODO make it an array once we get a way to allocate array on the heap. +pub type Page = Box<[u8]>; + +#[derive(Clone, Copy, Eq, PartialEq, Debug)] +pub struct PageId(usize); + +/// An arena of fixed sized pages. +#[derive(Default)] +pub struct Arena { + /// We use an array to store the list of pages. + /// It can be seen as an efficient map from page id to pages. + /// + /// This map's len can-only grows. Its size is therefore the maximum number of pages + /// that was ever allocated. One page being 1MB long, this is not a problem. + /// + /// If a page is not allocated, the corresponding entry is `None`. + pages: Vec>, + /// `free_slots` slots keeps track of the pages that are not allocated. + free_slots: Vec, + /// `free_page_ids` keeps track of the allocated pages that are + /// available. + free_page_ids: Vec, + /// Arena stats used to track how many pages should be freed. + stats: ArenaStats, +} + +// The idea here is that we keep track of the maximum number of pages used through time. +// To identify if it is worth deallocating pages, we look at the maximum number of pages +// in use in the last few minutes minutes. +// +// We then allow ourselves to free memory down to this value. +// Tracking exactly the maximum number of pages in use in the last 5 minutes is unnecessarily +// complicated. +// +// For instance, we could run an extra task or thread. +// +// Instead, we just run a routine whenever someone interacts with the GC. +// This routine only checks time 1 out of 256 calls. +// +// Pitfall: If pages are requests way less often than 256 times per minutes, +// this arena may take way too much time to release its memory. +struct ArenaStats { + max_num_used_pages_former: usize, + max_num_used_pages_current: usize, + call_counter: u8, + next_window_start: Instant, +} + +const WINDOW: Duration = Duration::from_secs(60); + +impl Default for ArenaStats { + fn default() -> ArenaStats { + ArenaStats { + // We arbitrarily initialize num used pages former to 100. + max_num_used_pages_former: 100, + max_num_used_pages_current: 0, + call_counter: 0u8, + next_window_start: Instant::now(), + } + } +} + +impl ArenaStats { + /// This method happens when we are changing time window. + fn roll(&mut self, now: Instant) { + self.max_num_used_pages_former = self.max_num_used_pages_current; + self.max_num_used_pages_current = 0; + self.next_window_start = now + WINDOW; + } + + pub fn record_num_used_page(&mut self, num_used_pages: usize) -> usize { + // The only function of the call counter is to avoid calling `Instant::now()` + // at every single call. + self.call_counter = self.call_counter.wrapping_add(1); + if self.call_counter == 0u8 { + let now = Instant::now(); + if now > self.next_window_start { + self.roll(now); + } + } + self.max_num_used_pages_current = self.max_num_used_pages_current.max(num_used_pages); + self.target_num_pages() + } + + // This method returns a target number of pages. + // + // If we currently have a number of allocated pages higher than this, we need to free + // pages until we reach this number. + fn target_num_pages(&self) -> usize { + let max_over_both_windows = self + .max_num_used_pages_former + .max(self.max_num_used_pages_current); + (max_over_both_windows + 10).max(max_over_both_windows * 105 / 100) + } +} + +impl Arena { + /// Returns an allocated page id. + pub fn get_page_id(&mut self) -> PageId { + if let Some(page_id) = self.free_page_ids.pop() { + assert!(self.pages[page_id.0].is_some()); + return page_id; + } + let page: Page = vec![0u8; PAGE_SIZE].into_boxed_slice(); + if let Some(free_slot) = self.free_slots.pop() { + let slot = &mut self.pages[free_slot.0]; + assert!(slot.is_none()); + *slot = Some(page); + return free_slot; + } else { + let new_page_id = self.pages.len(); + self.pages.push(Some(page)); + PageId(new_page_id) + } + } + + #[inline] + pub fn page(&self, page_id: PageId) -> &[u8] { + self.pages[page_id.0].as_ref().unwrap() + } + + #[inline] + pub fn page_mut(&mut self, page_id: PageId) -> &mut [u8] { + self.pages[page_id.0].as_mut().unwrap() + } + + pub fn release_page(&mut self, page_id: PageId) { + self.free_page_ids.push(page_id); + assert!(self.pages[page_id.0].is_some()); + self.gc(); + } + + /// `gc` releases memory by deallocating ALL of the free pages. + pub fn gc(&mut self) { + let num_used_pages = self.num_used_pages(); + let target_num_pages = self.stats.record_num_used_page(num_used_pages); + let num_pages_to_free = self.num_allocated_pages().saturating_sub(target_num_pages); + assert!(num_pages_to_free <= self.free_page_ids.len()); + for _ in 0..num_pages_to_free { + let page_id = self.free_page_ids.pop().unwrap(); + self.pages[page_id.0] = None; + self.free_slots.push(page_id); + } + } + + /// Returns the number of pages that are allocated + /// (regardless of whether they are in use or not). + pub fn num_allocated_pages(&self) -> usize { + self.pages.len() - self.free_slots.len() + } + + /// Returns the number of pages that are allocated AND actually used. + pub fn num_used_pages(&self) -> usize { + self.pages.len() - self.free_slots.len() - self.free_page_ids.len() + } + + pub fn capacity(&self) -> usize { + self.num_allocated_pages() * PAGE_SIZE + } + + pub fn unused_capacity(&self) -> usize { + self.free_page_ids.len() * PAGE_SIZE + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_arena_simple() { + let mut arena = Arena::default(); + assert_eq!(arena.capacity(), 0); + assert_eq!(arena.get_page_id(), PageId(0)); + assert_eq!(arena.get_page_id(), PageId(1)); + arena.release_page(PageId(0)); + assert_eq!(arena.get_page_id(), PageId(0)); + } + + #[test] + fn test_arena_gc() { + let mut arena = Arena::default(); + assert_eq!(arena.capacity(), 0); + assert_eq!(arena.get_page_id(), PageId(0)); + assert_eq!(arena.get_page_id(), PageId(1)); + arena.release_page(PageId(1)); + assert_eq!(arena.num_allocated_pages(), 2); + arena.gc(); + assert_eq!(arena.num_allocated_pages(), 1); + assert_eq!(arena.get_page_id(), PageId(1)); + assert_eq!(arena.num_allocated_pages(), 2); + } +} diff --git a/src/mem/mod.rs b/src/mem/mod.rs index 11db7db..b08aa67 100644 --- a/src/mem/mod.rs +++ b/src/mem/mod.rs @@ -1,8 +1,12 @@ +mod arena; mod queue; mod queues; +mod rolling_buffer; +use self::arena::{Arena, PAGE_SIZE}; pub(crate) use self::queue::MemQueue; pub(crate) use self::queues::MemQueues; +use self::rolling_buffer::RollingBuffer; #[cfg(test)] mod tests; diff --git a/src/mem/queue.rs b/src/mem/queue.rs index 74b66a2..cc1d7b3 100644 --- a/src/mem/queue.rs +++ b/src/mem/queue.rs @@ -1,94 +1,13 @@ use std::borrow::Cow; -use std::collections::VecDeque; use std::ops::{Bound, RangeBounds}; -use crate::error::AppendError; -use crate::rolling::FileNumber; -use crate::Record; - -#[derive(Default)] -struct RollingBuffer { - buffer: VecDeque, -} - -impl RollingBuffer { - fn new() -> Self { - RollingBuffer { - buffer: VecDeque::new(), - } - } - - fn len(&self) -> usize { - self.buffer.len() - } - - fn capacity(&self) -> usize { - self.buffer.capacity() - } - - fn clear(&mut self) { - self.buffer.clear(); - self.buffer.shrink_to_fit(); - } - - fn drain_start(&mut self, pos: usize) { - let target_capacity = self.len() * 9 / 8; - self.buffer.drain(..pos); - // In order to avoid leaking memory we shrink the buffer. - // The last maximum length (= the length before drain) - // is a good estimate of what we will need in the future. - // - // We add 1/8 to that in order to make sure that we don't end up - // shrinking / allocating for small variations. - - if self.buffer.capacity() > target_capacity { - self.buffer.shrink_to(target_capacity); - } - } +use bytes::buf::Buf; - fn extend(&mut self, slice: &[u8]) { - self.buffer.extend(slice.iter().copied()); - } - - fn get_range(&self, bounds: impl RangeBounds) -> Cow<[u8]> { - let start = match bounds.start_bound() { - Bound::Included(pos) => *pos, - Bound::Excluded(pos) => pos + 1, - Bound::Unbounded => 0, - }; - - let end = match bounds.end_bound() { - Bound::Included(pos) => pos + 1, - Bound::Excluded(pos) => *pos, - Bound::Unbounded => self.len(), - }; - - let (left_part_of_queue, right_part_of_queue) = self.buffer.as_slices(); - - if end < left_part_of_queue.len() { - Cow::Borrowed(&left_part_of_queue[start..end]) - } else if start >= left_part_of_queue.len() { - let start = start - left_part_of_queue.len(); - let end = end - left_part_of_queue.len(); - - Cow::Borrowed(&right_part_of_queue[start..end]) - } else { - // VecDeque is a rolling buffer. As a result, we do not have - // access to a continuous buffer. - // - // Here the requested slice cross the boundary and we need to allocate and copy the data - // in a new buffer. - let mut res = Vec::with_capacity(end - start); - res.extend_from_slice(&left_part_of_queue[start..]); - let end = end - left_part_of_queue.len(); - res.extend_from_slice(&right_part_of_queue[..end]); - - Cow::Owned(res) - } - } -} +use crate::error::AppendError; +use crate::mem::{Arena, RollingBuffer}; +use crate::{FileNumber, Record}; -#[derive(Clone)] +#[derive(Clone, Debug)] struct RecordMeta { start_offset: usize, // in a vec of RecordMeta, this field should be set only on the last record @@ -105,6 +24,22 @@ pub(crate) struct MemQueue { record_metas: Vec, } +fn concatenate_buffers<'a>(mut buffers: impl Iterator) -> Cow<'a, [u8]> { + let Some(first) = buffers.next() else { + return Cow::Borrowed(&[]); + }; + let Some(second) = buffers.next() else { + return Cow::Borrowed(first); + }; + let mut concatenated_buffer: Vec = Vec::with_capacity(first.len() + second.len()); + concatenated_buffer.extend_from_slice(first); + concatenated_buffer.extend_from_slice(second); + for buffer in buffers { + concatenated_buffer.extend_from_slice(buffer); + } + Cow::Owned(concatenated_buffer) +} + impl MemQueue { pub fn with_next_position(next_position: u64) -> Self { MemQueue { @@ -124,10 +59,15 @@ impl MemQueue { } /// Returns the last record stored in the queue. - pub fn last_record(&self) -> Option { - self.record_metas.last().map(|record| Record { + pub fn last_record<'a>(&self, arena: &'a Arena) -> Option> { + let record = self.record_metas.last()?; + let buf_iter = self + .concatenated_records + .get_range(record.start_offset.., arena); + let payload = concatenate_buffers(buf_iter); + Some(Record { position: record.position, - payload: self.concatenated_records.get_range(record.start_offset..), + payload, }) } @@ -148,6 +88,7 @@ impl MemQueue { file_number: &FileNumber, target_position: u64, payload: &[u8], + arena: &mut Arena, ) -> Result<(), AppendError> { let next_position = self.next_position(); if target_position < next_position { @@ -174,7 +115,7 @@ impl MemQueue { position: target_position, }; self.record_metas.push(record_meta); - self.concatenated_records.extend(payload); + self.concatenated_records.extend_from_slice(payload, arena); Ok(()) } @@ -187,7 +128,7 @@ impl MemQueue { .binary_search_by_key(&position, |record| record.position) } - pub fn range(&self, range: R) -> impl Iterator + '_ + pub fn range<'a, R>(&'a self, range: R, arena: &'a Arena) -> impl Iterator + 'a where R: RangeBounds + 'static { let start_idx: usize = match range.start_bound() { Bound::Included(&start_from) => { @@ -209,14 +150,16 @@ impl MemQueue { .map(move |idx| { let record = &self.record_metas[idx]; let position = record.position; - let start_offset = record.start_offset; - let payload = if let Some(next_record_meta) = self.record_metas.get(idx + 1) { - let end_offset = next_record_meta.start_offset; - self.concatenated_records - .get_range(start_offset..end_offset) + let start_bound = Bound::Included(record.start_offset); + let end_bound = if let Some(next_record_meta) = self.record_metas.get(idx + 1) { + Bound::Excluded(next_record_meta.start_offset) } else { - self.concatenated_records.get_range(start_offset..) + Bound::Unbounded }; + let payload_iter = self + .concatenated_records + .get_range((start_bound, end_bound), arena); + let payload = concatenate_buffers(payload_iter); Record { position, payload } }) } @@ -225,13 +168,13 @@ impl MemQueue { /// /// If truncating to a future position, make the queue go forward to that position. /// Return the number of record removed. - pub fn truncate(&mut self, truncate_up_to_pos: u64) -> usize { + pub fn truncate(&mut self, truncate_up_to_pos: u64, arena: &mut Arena) -> usize { if self.start_position > truncate_up_to_pos { return 0; } if truncate_up_to_pos + 1 >= self.next_position() { self.start_position = truncate_up_to_pos + 1; - self.concatenated_records.clear(); + self.concatenated_records.clear(arena); let record_count = self.record_metas.len(); self.record_metas.clear(); return record_count; @@ -245,7 +188,8 @@ impl MemQueue { for record_meta in &mut self.record_metas { record_meta.start_offset -= start_offset_to_keep; } - self.concatenated_records.drain_start(start_offset_to_keep); + self.concatenated_records + .truncate_to(start_offset_to_keep, arena); self.start_position = truncate_up_to_pos + 1; first_record_to_keep } diff --git a/src/mem/queues.rs b/src/mem/queues.rs index 3adbbac..d8d95fb 100644 --- a/src/mem/queues.rs +++ b/src/mem/queues.rs @@ -4,14 +4,15 @@ use std::ops::RangeBounds; use tracing::{info, warn}; use crate::error::{AlreadyExists, AppendError, MissingQueue}; -use crate::mem::MemQueue; -use crate::rolling::FileNumber; -use crate::Record; +use crate::mem::{Arena, MemQueue}; +use crate::{FileNumber, Record}; #[derive(Default)] pub(crate) struct MemQueues { queues: HashMap, + pub(crate) arena: Arena, } + impl MemQueues { /// The file number argument is here unused. Its point is just to make sure we /// flushed the file before updating the in memory queue. @@ -52,7 +53,7 @@ impl MemQueues { R: RangeBounds + 'static, { if let Some(queue) = self.queues.get(queue) { - Ok(queue.range(range)) + Ok(queue.range(range, &self.arena)) } else { Err(MissingQueue(queue.to_string())) } @@ -66,12 +67,17 @@ impl MemQueues { .ok_or_else(|| MissingQueue(queue.to_string())) } - pub(crate) fn get_queue_mut(&mut self, queue: &str) -> Result<&mut MemQueue, MissingQueue> { + pub(crate) fn get_queue_mut( + &mut self, + queue: &str, + ) -> Result<(&mut MemQueue, &mut Arena), MissingQueue> { // We do not rely on `entry` in order to avoid // the allocation. - self.queues + let queue = self + .queues .get_mut(queue) - .ok_or_else(|| MissingQueue(queue.to_string())) + .ok_or_else(|| MissingQueue(queue.to_string()))?; + Ok((queue, &mut self.arena)) } pub fn append_record( @@ -81,8 +87,11 @@ impl MemQueues { target_position: u64, payload: &[u8], ) -> Result<(), AppendError> { - self.get_queue_mut(queue)? - .append_record(file_number, target_position, payload) + let queue = self + .queues + .get_mut(queue) + .ok_or_else(|| MissingQueue(queue.to_string()))?; + queue.append_record(file_number, target_position, payload, &mut self.arena) } pub fn contains_queue(&self, queue: &str) -> bool { @@ -134,7 +143,7 @@ impl MemQueues { /// Returns the last record stored in the queue. pub fn last_record(&self, queue: &str) -> Result, MissingQueue> { - Ok(self.get_queue(queue)?.last_record()) + Ok(self.get_queue(queue)?.last_record(&self.arena)) } pub fn next_position(&self, queue: &str) -> Result { @@ -146,12 +155,9 @@ impl MemQueues { /// /// If there are no records `<= position`, the method will /// not do anything. - pub fn truncate(&mut self, queue: &str, position: u64) -> Option { - if let Ok(queue) = self.get_queue_mut(queue) { - Some(queue.truncate(position)) - } else { - None - } + pub fn truncate(&mut self, queue_id: &str, position: u64) -> Option { + let queue = self.queues.get_mut(queue_id)?; + Some(queue.truncate(position, &mut self.arena)) } /// Return a tuple of (size, capacity) of memory used by the memqueues @@ -159,14 +165,18 @@ impl MemQueues { let size = self .queues .iter() - .map(|(name, queue)| name.len() + queue.size()) + .map(|(name, queue)| { + dbg!(queue.size()); + name.len() + queue.size() + }) .sum(); let capacity = self .queues .iter() .map(|(name, queue)| name.capacity() + queue.capacity()) - .sum(); + .sum::() + + self.arena.unused_capacity(); (size, capacity) } diff --git a/src/mem/rolling_buffer.rs b/src/mem/rolling_buffer.rs new file mode 100644 index 0000000..5ce066a --- /dev/null +++ b/src/mem/rolling_buffer.rs @@ -0,0 +1,266 @@ +use std::ops::{Bound, Range, RangeBounds}; + +use bytes::Buf; + +use crate::mem::arena::{Arena, PageId, PAGE_SIZE}; + +/// A rolling buffer stores a drainable buffer. +/// +/// On +#[derive(Default)] +pub struct RollingBuffer { + page_ids: Vec, + range: Range, +} + +fn num_pages_required(range: Range) -> usize { + let Range { start, end } = range; + let first_page = start / PAGE_SIZE; + let last_page = (end - 1) / PAGE_SIZE; + last_page - first_page + 1 +} + +impl RollingBuffer { + pub fn new() -> Self { + RollingBuffer { + page_ids: Vec::new(), + range: 0..0, + } + } + + pub fn len(&self) -> usize { + self.range.len() + } + + pub fn capacity(&self) -> usize { + self.page_ids.len() * PAGE_SIZE + } + + pub fn clear(&mut self, arena: &mut Arena) { + for page_id in self.page_ids.drain(..) { + arena.release_page(page_id); + } + self.range = 0..0; + } + + /// Truncate the buffer, dropping the first `truncate_len` bytes. + pub fn truncate_to(&mut self, truncate_len: usize, arena: &mut Arena) { + if truncate_len >= self.range.len() { + self.clear(arena); + return; + } + let num_pages = num_pages_required((self.range.start + truncate_len)..self.range.end); + assert!(num_pages <= self.page_ids.len()); + let num_pages_to_drop = self.page_ids.len() - num_pages; + let new_start = self.range.start + truncate_len - num_pages_to_drop * PAGE_SIZE; + let new_end = self.range.end - num_pages_to_drop * PAGE_SIZE; + self.range = new_start..new_end; + if num_pages_to_drop > 0 { + for page_id in self.page_ids.drain(..(self.page_ids.len() - num_pages)) { + arena.release_page(page_id); + } + } + } + + fn get_page_with_room<'a>(&mut self, arena: &'a mut Arena) -> &'a mut [u8] { + let start_offset = self.range.end % PAGE_SIZE; + if start_offset == 0 { + // The page is entirely used. + let new_page_id = arena.get_page_id(); + self.page_ids.push(new_page_id); + let page = arena.page_mut(new_page_id); + &mut page[..] + } else { + let page_id = self.page_ids.last().copied().unwrap(); + let page = arena.page_mut(page_id); + &mut page[start_offset..] + } + } + + pub fn extend_from_slice(&mut self, mut slice: &[u8], arena: &mut Arena) { + while !slice.is_empty() { + let page = self.get_page_with_room(arena); + let len = page.len().min(slice.len()); + let (head, queue) = slice.split_at(len); + page[..len].copy_from_slice(head); + slice = queue; + self.range.end += len; + } + } + + fn get_range_buf_aux<'slf, 'a: 'slf>( + &'slf self, + range: Range, + arena: &'a Arena, + ) -> impl Buf + 'slf { + let start = (self.range.start + range.start).min(self.range.end); + let end = (self.range.start + range.end).clamp(start, self.range.end); + let len = end - start; + let skip_pages = start / PAGE_SIZE; + let start_offset = start % PAGE_SIZE; + PagesBuf { + arena, + start_offset, + page_ids: &self.page_ids[skip_pages..], + remaining_len: len, + } + .take(len) + } + + pub fn get_range_buf<'slf, 'a: 'slf, R>( + &'slf self, + range_bounds: R, + arena: &'a Arena, + ) -> impl Buf + 'slf + where + R: RangeBounds + 'static, + { + let start = match range_bounds.start_bound() { + Bound::Included(pos) => *pos, + Bound::Excluded(pos) => pos + 1, + Bound::Unbounded => 0, + }; + let end = match range_bounds.end_bound() { + Bound::Included(pos) => pos + 1, + Bound::Excluded(pos) => *pos, + Bound::Unbounded => self.len(), + }; + self.get_range_buf_aux(start..end, arena) + } + + pub fn get_range<'slf, 'a: 'slf, R>( + &'slf self, + range_bounds: R, + arena: &'a Arena, + ) -> impl Iterator + 'slf + where + R: RangeBounds + 'static, + { + let start = match range_bounds.start_bound() { + Bound::Included(pos) => *pos, + Bound::Excluded(pos) => pos + 1, + Bound::Unbounded => 0, + }; + let end = match range_bounds.end_bound() { + Bound::Included(pos) => pos + 1, + Bound::Excluded(pos) => *pos, + Bound::Unbounded => self.len(), + }; + self.get_range_aux(start..end, arena) + } + + pub fn get_range_aux<'slf, 'a: 'slf>( + &'slf self, + range: Range, + arena: &'a Arena, + ) -> impl Iterator + 'slf { + let start = (self.range.start + range.start).min(self.range.end); + let end = (self.range.start + range.end).clamp(start, self.range.end); + let mut remaining_len = end - start; + let skip_pages = start / PAGE_SIZE; + let mut start_in_page = start % PAGE_SIZE; + self.page_ids[skip_pages..] + .iter() + .copied() + .map(move |page_id| { + let page = &arena.page(page_id)[start_in_page..]; + start_in_page = 0; + let page_len = page.len().min(remaining_len); + remaining_len -= page_len; + &page[..page_len] + }) + .take_while(|page| !page.is_empty()) + } +} + +struct PagesBuf<'a> { + arena: &'a Arena, + page_ids: &'a [PageId], + start_offset: usize, + remaining_len: usize, +} + +impl<'a> Buf for PagesBuf<'a> { + fn remaining(&self) -> usize { + self.remaining_len + } + + fn chunk(&self) -> &[u8] { + let Some(first_page_id) = self.page_ids.first().copied() else { + return &[]; + }; + let current_page = &self.arena.page(first_page_id)[self.start_offset..]; + if current_page.len() > self.remaining_len { + ¤t_page[..self.remaining_len] + } else { + current_page + } + } + + fn advance(&mut self, mut cnt: usize) { + while cnt > 0 { + if self.page_ids.is_empty() { + return; + } + let page = self.chunk(); + if page.len() > cnt { + self.start_offset += cnt; + self.remaining_len -= cnt; + } else { + cnt -= page.len(); + self.remaining_len -= page.len(); + self.start_offset = 0; + self.page_ids = &self.page_ids[1..]; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn to_vec(mut buf: B) -> Vec { + let mut output = Vec::with_capacity(buf.remaining()); + while buf.has_remaining() { + let chunk = buf.chunk(); + output.extend_from_slice(chunk); + buf.advance(chunk.len()); + } + output + } + + #[test] + fn test_rolling_buffer() { + let mut arena = Arena::default(); + let text = b"hello happy tax payer"; + for truncate_len in 0..text.len() { + let expected = &text[truncate_len..]; + let mut rolling_buffer = RollingBuffer::new(); + rolling_buffer.extend_from_slice(&b"hello"[..], &mut arena); + rolling_buffer.extend_from_slice(&b" happy"[..], &mut arena); + rolling_buffer.extend_from_slice(&b" tax payer"[..], &mut arena); + rolling_buffer.truncate_to(truncate_len, &mut arena); + for start in 0..expected.len() { + for end in start..expected.len() { + let bytes: Vec = to_vec(rolling_buffer.get_range_buf(start..end, &arena)); + assert_eq!(&expected[start..end], &bytes[..]); + } + } + } + } + + #[test] + fn test_rolling_buffer_clear() { + let mut arena = Arena::default(); + let mut rolling_buffer = RollingBuffer::new(); + rolling_buffer.clear(&mut arena); + assert_eq!(rolling_buffer.len(), 0); + rolling_buffer.extend_from_slice(&b"abcdefghik"[..], &mut arena); + assert_eq!(rolling_buffer.len(), 10); + assert_eq!(arena.num_used_pages(), 2); + rolling_buffer.clear(&mut arena); + assert_eq!(rolling_buffer.len(), 0); + assert_eq!(arena.num_used_pages(), 0); + } +} diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index 44963bc..2e09787 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -242,12 +242,12 @@ impl MultiRecordLog { self.record_log_writer.write_record(record)?; self.sync_on_policy()?; - let mem_queue = self.in_mem_queues.get_queue_mut(queue)?; + let (mem_queue, arena) = self.in_mem_queues.get_queue_mut(queue)?; let mut max_position = position; for record in records { // we just serialized it, we know it's valid let (position, payload) = record.unwrap(); - mem_queue.append_record(&file_number, position, payload)?; + mem_queue.append_record(&file_number, position, payload, arena)?; max_position = position; } @@ -314,7 +314,10 @@ impl MultiRecordLog { if event_enabled!(Level::DEBUG) { for queue in self.list_queues() { let queue: &MemQueue = self.in_mem_queues.get_queue(queue).unwrap(); - let first_pos = queue.range(..).next().map(|record| record.position); + let first_pos = queue + .range(.., &self.in_mem_queues.arena) + .next() + .map(|record| record.position); let last_pos = queue.last_position(); debug!(first_pos=?first_pos, last_pos=?last_pos, "queue positions after gc"); } diff --git a/src/rolling/file_number.rs b/src/rolling/file_tracker.rs similarity index 72% rename from src/rolling/file_number.rs rename to src/rolling/file_tracker.rs index 6f697ad..1623e55 100644 --- a/src/rolling/file_number.rs +++ b/src/rolling/file_tracker.rs @@ -1,5 +1,6 @@ use std::collections::BTreeSet; -use std::sync::Arc; + +use crate::FileNumber; /// RefCount a set of ordered files. Always track at least one file. pub struct FileTracker { @@ -79,56 +80,6 @@ impl FileTracker { } } -#[derive(Clone, Default, Debug, Ord, PartialOrd, Eq, PartialEq)] -pub struct FileNumber { - file_number: Arc, -} - -impl FileNumber { - fn new(file_number: u64) -> Self { - FileNumber { - file_number: Arc::new(file_number), - } - } - - /// Returns whether there is no clone of this FileNumber in existance. - /// - /// /!\ care should be taken to not have some other code store a &FileNumber which could alias - /// with self as it might then be sementically incorrect to delete content based only on this - /// returning `true`. - pub fn can_be_deleted(&self) -> bool { - Arc::strong_count(&self.file_number) == 1 - } - - #[cfg(test)] - pub fn unroll(&self, tracker: &FileTracker) -> Vec { - let mut file = self.clone(); - let mut file_numbers = Vec::new(); - loop { - file_numbers.push(file.file_number()); - if let Some(next_file) = tracker.next(&file) { - file = next_file; - } else { - return file_numbers; - } - } - } - - pub fn filename(&self) -> String { - format!("wal-{:020}", self.file_number) - } - - #[cfg(test)] - pub fn file_number(&self) -> u64 { - *self.file_number - } - - #[cfg(test)] - pub fn for_test(file_number: u64) -> Self { - FileNumber::new(file_number) - } -} - impl std::borrow::Borrow for FileNumber { fn borrow(&self) -> &u64 { &self.file_number diff --git a/src/rolling/mod.rs b/src/rolling/mod.rs index 5cca8d0..2b27ff2 100644 --- a/src/rolling/mod.rs +++ b/src/rolling/mod.rs @@ -1,8 +1,9 @@ mod directory; -mod file_number; +mod file_tracker; pub use self::directory::{Directory, RollingReader, RollingWriter}; -pub use self::file_number::{FileNumber, FileTracker}; +pub use self::file_tracker::FileTracker; +pub use crate::FileNumber; const FRAME_NUM_BYTES: usize = 1 << 15; diff --git a/src/tests.rs b/src/tests.rs index 7617f45..110ba32 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -336,6 +336,7 @@ fn test_multi_record_size() { .unwrap(); let size_mem_append = multi_record_log.resource_usage(); assert!(size_mem_append.memory_used_bytes > size_mem_create.memory_used_bytes); + dbg!(&size_mem_append); assert!(size_mem_append.memory_allocated_bytes >= size_mem_append.memory_used_bytes); assert!(size_mem_append.memory_allocated_bytes >= size_mem_create.memory_allocated_bytes); From e69fcc6f028f62586cf21cdc3c25e82d49a4ce1b Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 16 Mar 2024 10:44:43 +0900 Subject: [PATCH 2/5] blop --- src/lib.rs | 36 ++++++++-- src/mem/arena.rs | 2 +- src/mem/mod.rs | 1 + src/mem/queue.rs | 57 +++++++--------- src/mem/queues.rs | 2 +- src/mem/rolling_buffer.rs | 140 ++++++++++++++++++++------------------ src/mem/tests.rs | 88 +++++++++++++----------- src/proptests.rs | 25 ++++--- src/tests.rs | 53 +++++++++------ 9 files changed, 224 insertions(+), 180 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f3a6c67..066d828 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::sync::Arc; mod block_read_write; @@ -12,23 +11,46 @@ mod record; mod recordlog; mod rolling; +pub use mem::PagesBuf; pub use multi_record_log::{MultiRecordLog, SyncPolicy}; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug)] pub struct Record<'a> { pub position: u64, - pub payload: Cow<'a, [u8]>, + payload: PagesBuf<'a>, } + impl<'a> Record<'a> { - pub fn new(position: u64, payload: &'a [u8]) -> Self { - Record { - position, - payload: Cow::Borrowed(payload), + #[cfg(test)] + pub fn payload_equal(&self, mut payload: &[u8]) -> bool { + use bytes::Buf; + let mut self_payload = self.payload; + if self_payload.remaining() != payload.len() { + return false; } + while self_payload.has_remaining() { + let chunk = self_payload.chunk(); + let chunk_len = chunk.len(); + if chunk != &payload[..chunk_len] { + return false; + } + self_payload.advance(chunk_len); + payload = &payload[chunk_len..]; + } + true } } +// impl<'a> Record<'a> { +// pub fn new(position: u64, payload: &'a [u8]) -> Self { +// Record { +// position, +// payload: Cow::Borrowed(payload), +// } +// } +// } + #[derive(Clone, Default, Debug, Ord, PartialOrd, Eq, PartialEq)] pub struct FileNumber { file_number: Arc, diff --git a/src/mem/arena.rs b/src/mem/arena.rs index 2a093bc..473999a 100644 --- a/src/mem/arena.rs +++ b/src/mem/arena.rs @@ -194,7 +194,7 @@ mod tests { arena.release_page(PageId(1)); assert_eq!(arena.num_allocated_pages(), 2); arena.gc(); - assert_eq!(arena.num_allocated_pages(), 1); + assert_eq!(arena.num_allocated_pages(), 2); assert_eq!(arena.get_page_id(), PageId(1)); assert_eq!(arena.num_allocated_pages(), 2); } diff --git a/src/mem/mod.rs b/src/mem/mod.rs index b08aa67..79ef6ca 100644 --- a/src/mem/mod.rs +++ b/src/mem/mod.rs @@ -7,6 +7,7 @@ use self::arena::{Arena, PAGE_SIZE}; pub(crate) use self::queue::MemQueue; pub(crate) use self::queues::MemQueues; use self::rolling_buffer::RollingBuffer; +pub use self::rolling_buffer::PagesBuf; #[cfg(test)] mod tests; diff --git a/src/mem/queue.rs b/src/mem/queue.rs index cc1d7b3..20167b1 100644 --- a/src/mem/queue.rs +++ b/src/mem/queue.rs @@ -1,8 +1,4 @@ -use std::borrow::Cow; use std::ops::{Bound, RangeBounds}; - -use bytes::buf::Buf; - use crate::error::AppendError; use crate::mem::{Arena, RollingBuffer}; use crate::{FileNumber, Record}; @@ -24,21 +20,19 @@ pub(crate) struct MemQueue { record_metas: Vec, } -fn concatenate_buffers<'a>(mut buffers: impl Iterator) -> Cow<'a, [u8]> { - let Some(first) = buffers.next() else { - return Cow::Borrowed(&[]); - }; - let Some(second) = buffers.next() else { - return Cow::Borrowed(first); - }; - let mut concatenated_buffer: Vec = Vec::with_capacity(first.len() + second.len()); - concatenated_buffer.extend_from_slice(first); - concatenated_buffer.extend_from_slice(second); - for buffer in buffers { - concatenated_buffer.extend_from_slice(buffer); - } - Cow::Owned(concatenated_buffer) -} +// fn concatenate_buffers<'a>(mut buf: impl Buf + 'a) -> Cow<'a, [u8]> { +// let first_chunk: &'a buf = buf.chunk(); +// if buf.remaining() == first_chunk.len() { +// return Cow::Borrowed(first_chunk); +// } +// let mut concatenated_buffer: Vec = Vec::with_capacity(buf.remaining()); +// while buf.has_remaining() { +// let chunk = buf.chunk(); +// concatenated_buffer.extend_from_slice(chunk); +// buf.advance(chunk.len()); +// } +// Cow::Owned(concatenated_buffer) +// } impl MemQueue { pub fn with_next_position(next_position: u64) -> Self { @@ -59,15 +53,14 @@ impl MemQueue { } /// Returns the last record stored in the queue. - pub fn last_record<'a>(&self, arena: &'a Arena) -> Option> { + pub fn last_record<'a>(&'a self, arena: &'a Arena) -> Option> { let record = self.record_metas.last()?; - let buf_iter = self + let record_payload = self .concatenated_records - .get_range(record.start_offset.., arena); - let payload = concatenate_buffers(buf_iter); + .get_range_buf(record.start_offset.., arena); Some(Record { position: record.position, - payload, + payload: record_payload, }) } @@ -156,10 +149,10 @@ impl MemQueue { } else { Bound::Unbounded }; - let payload_iter = self + let payload= self .concatenated_records - .get_range((start_bound, end_bound), arena); - let payload = concatenate_buffers(payload_iter); + .get_range_buf((start_bound, end_bound), arena); + // let payload = concatenate_buffers(payload_buf); Record { position, payload } }) } @@ -168,7 +161,7 @@ impl MemQueue { /// /// If truncating to a future position, make the queue go forward to that position. /// Return the number of record removed. - pub fn truncate(&mut self, truncate_up_to_pos: u64, arena: &mut Arena) -> usize { + pub fn truncate_up_to_included(&mut self, truncate_up_to_pos: u64, arena: &mut Arena) -> usize { if self.start_position > truncate_up_to_pos { return 0; } @@ -185,11 +178,11 @@ impl MemQueue { let start_offset_to_keep: usize = self.record_metas[first_record_to_keep].start_offset; self.record_metas.drain(..first_record_to_keep); - for record_meta in &mut self.record_metas { - record_meta.start_offset -= start_offset_to_keep; - } + // for record_meta in &mut self.record_metas { + // record_meta.start_offset -= start_offset_to_keep; + // } self.concatenated_records - .truncate_to(start_offset_to_keep, arena); + .truncate_up_to_included(start_offset_to_keep, arena); self.start_position = truncate_up_to_pos + 1; first_record_to_keep } diff --git a/src/mem/queues.rs b/src/mem/queues.rs index d8d95fb..e0da9da 100644 --- a/src/mem/queues.rs +++ b/src/mem/queues.rs @@ -157,7 +157,7 @@ impl MemQueues { /// not do anything. pub fn truncate(&mut self, queue_id: &str, position: u64) -> Option { let queue = self.queues.get_mut(queue_id)?; - Some(queue.truncate(position, &mut self.arena)) + Some(queue.truncate_up_to_included(position, &mut self.arena)) } /// Return a tuple of (size, capacity) of memory used by the memqueues diff --git a/src/mem/rolling_buffer.rs b/src/mem/rolling_buffer.rs index 5ce066a..9f31960 100644 --- a/src/mem/rolling_buffer.rs +++ b/src/mem/rolling_buffer.rs @@ -44,7 +44,7 @@ impl RollingBuffer { } /// Truncate the buffer, dropping the first `truncate_len` bytes. - pub fn truncate_to(&mut self, truncate_len: usize, arena: &mut Arena) { + pub fn truncate_up_to_included(&mut self, truncate_len: usize, arena: &mut Arena) { if truncate_len >= self.range.len() { self.clear(arena); return; @@ -52,9 +52,7 @@ impl RollingBuffer { let num_pages = num_pages_required((self.range.start + truncate_len)..self.range.end); assert!(num_pages <= self.page_ids.len()); let num_pages_to_drop = self.page_ids.len() - num_pages; - let new_start = self.range.start + truncate_len - num_pages_to_drop * PAGE_SIZE; - let new_end = self.range.end - num_pages_to_drop * PAGE_SIZE; - self.range = new_start..new_end; + self.range.start = self.range.start + truncate_len; if num_pages_to_drop > 0 { for page_id in self.page_ids.drain(..(self.page_ids.len() - num_pages)) { arena.release_page(page_id); @@ -92,26 +90,35 @@ impl RollingBuffer { &'slf self, range: Range, arena: &'a Arena, - ) -> impl Buf + 'slf { - let start = (self.range.start + range.start).min(self.range.end); - let end = (self.range.start + range.end).clamp(start, self.range.end); - let len = end - start; - let skip_pages = start / PAGE_SIZE; + ) -> PagesBuf<'slf> { + let Range {start, end} = range; + assert!(start >= self.range.start); + assert!(end <= self.range.end); + if end <= start { + return PagesBuf { + arena, + start_offset: 0, + page_ids: &[], + remaining_len: 0, + }; + } + let start_page_id = start / PAGE_SIZE; + let start_inner_page_id = self.range.start / PAGE_SIZE; + let skip_pages = start_page_id - start_inner_page_id; let start_offset = start % PAGE_SIZE; PagesBuf { arena, start_offset, page_ids: &self.page_ids[skip_pages..], - remaining_len: len, + remaining_len: end - start, } - .take(len) } pub fn get_range_buf<'slf, 'a: 'slf, R>( &'slf self, range_bounds: R, arena: &'a Arena, - ) -> impl Buf + 'slf + ) -> PagesBuf<'slf> where R: RangeBounds + 'static, { @@ -123,69 +130,49 @@ impl RollingBuffer { let end = match range_bounds.end_bound() { Bound::Included(pos) => pos + 1, Bound::Excluded(pos) => *pos, - Bound::Unbounded => self.len(), + Bound::Unbounded => self.range.end }; self.get_range_buf_aux(start..end, arena) } - - pub fn get_range<'slf, 'a: 'slf, R>( - &'slf self, - range_bounds: R, - arena: &'a Arena, - ) -> impl Iterator + 'slf - where - R: RangeBounds + 'static, - { - let start = match range_bounds.start_bound() { - Bound::Included(pos) => *pos, - Bound::Excluded(pos) => pos + 1, - Bound::Unbounded => 0, - }; - let end = match range_bounds.end_bound() { - Bound::Included(pos) => pos + 1, - Bound::Excluded(pos) => *pos, - Bound::Unbounded => self.len(), - }; - self.get_range_aux(start..end, arena) - } - - pub fn get_range_aux<'slf, 'a: 'slf>( - &'slf self, - range: Range, - arena: &'a Arena, - ) -> impl Iterator + 'slf { - let start = (self.range.start + range.start).min(self.range.end); - let end = (self.range.start + range.end).clamp(start, self.range.end); - let mut remaining_len = end - start; - let skip_pages = start / PAGE_SIZE; - let mut start_in_page = start % PAGE_SIZE; - self.page_ids[skip_pages..] - .iter() - .copied() - .map(move |page_id| { - let page = &arena.page(page_id)[start_in_page..]; - start_in_page = 0; - let page_len = page.len().min(remaining_len); - remaining_len -= page_len; - &page[..page_len] - }) - .take_while(|page| !page.is_empty()) - } } -struct PagesBuf<'a> { +#[derive(Clone, Copy)] +pub struct PagesBuf<'a> { arena: &'a Arena, page_ids: &'a [PageId], start_offset: usize, remaining_len: usize, } -impl<'a> Buf for PagesBuf<'a> { - fn remaining(&self) -> usize { - self.remaining_len +impl<'a> std::fmt::Debug for PagesBuf<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let mut buf = self.clone(); + let mut vec = Vec::with_capacity(self.remaining_len); + while buf.has_remaining() { + let chunk = buf.chunk(); + vec.extend_from_slice(chunk); + buf.advance(chunk.len()); + } + vec.as_slice().fmt(f) } +} - fn chunk(&self) -> &[u8] { +impl<'a> PagesBuf<'a> { + pub fn to_cow(mut self) -> std::borrow::Cow<'a, [u8]> { + if self.page_ids.len() <= 1 { + let chunk = self.chunk_with_lifetime(); + return std::borrow::Cow::Borrowed(chunk); + } + let mut buf = Vec::with_capacity(self.remaining_len); + while self.has_remaining() { + let chunk = self.chunk_with_lifetime(); + buf.extend_from_slice(chunk); + self.advance(chunk.len()); + } + std::borrow::Cow::Owned(buf) + } + + fn chunk_with_lifetime(&self) -> &'a [u8] { let Some(first_page_id) = self.page_ids.first().copied() else { return &[]; }; @@ -197,6 +184,26 @@ impl<'a> Buf for PagesBuf<'a> { } } +} + +impl<'a> Buf for PagesBuf<'a> { + fn remaining(&self) -> usize { + self.remaining_len + } + + fn chunk(&self) -> &[u8] { + // let Some(first_page_id) = self.page_ids.first().copied() else { + // return &[]; + // }; + // let current_page = &self.arena.page(first_page_id)[self.start_offset..]; + // if current_page.len() > self.remaining_len { + // ¤t_page[..self.remaining_len] + // } else { + // current_page + // } + self.chunk_with_lifetime() + } + fn advance(&mut self, mut cnt: usize) { while cnt > 0 { if self.page_ids.is_empty() { @@ -235,16 +242,15 @@ mod tests { let mut arena = Arena::default(); let text = b"hello happy tax payer"; for truncate_len in 0..text.len() { - let expected = &text[truncate_len..]; let mut rolling_buffer = RollingBuffer::new(); rolling_buffer.extend_from_slice(&b"hello"[..], &mut arena); rolling_buffer.extend_from_slice(&b" happy"[..], &mut arena); rolling_buffer.extend_from_slice(&b" tax payer"[..], &mut arena); - rolling_buffer.truncate_to(truncate_len, &mut arena); - for start in 0..expected.len() { - for end in start..expected.len() { + rolling_buffer.truncate_up_to_included(truncate_len, &mut arena); + for start in truncate_len..text.len() { + for end in start..text.len() { let bytes: Vec = to_vec(rolling_buffer.get_range_buf(start..end, &arena)); - assert_eq!(&expected[start..end], &bytes[..]); + assert_eq!(&text[start..end], &bytes[..]); } } } diff --git a/src/mem/tests.rs b/src/mem/tests.rs index aca9c28..a3908bb 100644 --- a/src/mem/tests.rs +++ b/src/mem/tests.rs @@ -1,6 +1,6 @@ use super::*; use crate::error::{AlreadyExists, AppendError}; -use crate::rolling::FileNumber; +use crate::FileNumber; use crate::Record; #[test] @@ -43,22 +43,23 @@ fn test_mem_queues() { assert!(mem_queues .append_record("droopy", &FileNumber::for_test(1), 3, b"payer") .is_ok()); - assert_eq!( - mem_queues.range("droopy", 0..).unwrap().next(), - Some(Record::new(0, b"hello")) - ); + let record = mem_queues.range("droopy", 0..).unwrap().next().unwrap(); + assert_eq!( record.position, 0); + assert!(record.payload_equal(b"hello")); let droopy: Vec = mem_queues.range("droopy", 1..).unwrap().collect(); - assert_eq!( - &droopy, - &[ - Record::new(1, b"happy"), - Record::new(2, b"tax"), - Record::new(3, b"payer"), - ], - ); + assert_eq!(droopy.len(), 3); + assert_eq!(droopy[0].position, 1); + assert_eq!(droopy[1].position, 2); + assert_eq!(droopy[2].position, 3); + assert!(droopy[0].payload_equal(b"happy")); + assert!(droopy[1].payload_equal(b"tax")); + assert!(droopy[2].payload_equal(b"payer")); } let fable: Vec = mem_queues.range("fable", 1..).unwrap().collect(); - assert_eq!(&fable, &[Record::new(1, b"corbeau")]); + assert_eq!(fable.len(), 1); + let fable_record = fable.into_iter().next().unwrap(); + assert_eq!(fable_record.position, 1); + assert!(fable_record.payload_equal(b"corbeau")); } #[test] @@ -82,15 +83,19 @@ fn test_mem_queues_truncate() { .append_record("droopy", &1.into(), 4, b"!") .is_ok()); mem_queues - .append_record("droopy", &1.into(), 5, b"payer") + .append_record("droopy", &1.into(), 5, b"payer2") .unwrap(); } mem_queues.truncate("droopy", 3); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); - assert_eq!( - &droopy[..], - &[Record::new(4, b"!"), Record::new(5, b"payer"),] - ); + dbg!(&droopy); + assert_eq!(droopy.len(), 2); + assert_eq!(droopy[0].position, 4); + assert!(droopy[0].payload_equal(b"!")); + + assert_eq!(droopy[1].position, 5); + dbg!(&droopy[1]); + assert!(droopy[1].payload_equal(b"payer2")); } #[test] @@ -110,26 +115,29 @@ fn test_mem_queues_skip_advance() { .append_record("droopy", &1.into(), 1, b"happy") .is_err()); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); - assert_eq!( - &droopy[..], - &[ - Record::new(0, b"hello"), - Record::new(2, b"happy"), - Record::new(3, b"happy"), - ] - ); + assert_eq!(droopy.len(), 3); + assert_eq!(droopy[0].position, 0); + assert!(droopy[0].payload_equal(b"hello")); + assert_eq!(droopy[1].position, 2); + assert!(droopy[1].payload_equal(b"happy")); + assert_eq!(droopy[2].position, 3); + assert!(droopy[2].payload_equal(b"happy")); let droopy: Vec = mem_queues.range("droopy", 1..).unwrap().collect(); - assert_eq!( - &droopy[..], - &[Record::new(2, b"happy"), Record::new(3, b"happy"),] - ); + assert_eq!(droopy.len(), 2); + assert_eq!(droopy[0].position, 2); + assert!(droopy[0].payload_equal(b"happy")); + assert_eq!(droopy[1].position, 3); + assert!(droopy[1].payload_equal(b"happy")); let droopy: Vec = mem_queues.range("droopy", 2..).unwrap().collect(); - assert_eq!( - &droopy[..], - &[Record::new(2, b"happy"), Record::new(3, b"happy"),] - ); + assert_eq!(droopy.len(), 2); + assert_eq!(droopy[0].position, 2); + assert!(droopy[0].payload_equal(b"happy")); + assert_eq!(droopy[1].position, 3); + assert!(droopy[1].payload_equal(b"happy")); let droopy: Vec = mem_queues.range("droopy", 3..).unwrap().collect(); - assert_eq!(&droopy[..], &[Record::new(3, b"happy")]); + assert_eq!(droopy.len(), 1); + assert_eq!(droopy[0].position, 3); + assert!(droopy[0].payload_equal(b"happy")); } #[test] @@ -162,7 +170,9 @@ fn test_mem_queues_append_idempotence() { AppendError::Past )); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); - assert_eq!(&droopy, &[Record::new(0, b"hello")]); + assert_eq!(droopy.len(), 1); + assert_eq!(droopy[0].position, 0); + assert!(droopy[0].payload_equal(b"hello")); } #[test] @@ -173,7 +183,9 @@ fn test_mem_queues_non_zero_first_el() { .append_record("droopy", &1.into(), 5, b"hello") .is_ok()); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); - assert_eq!(droopy, &[Record::new(5, b"hello")]); + assert_eq!(droopy.len(), 1); + assert_eq!(droopy[0].position, 5); + assert!(droopy[0].payload_equal(b"hello")); } #[test] diff --git a/src/proptests.rs b/src/proptests.rs index a523a7a..63fed94 100644 --- a/src/proptests.rs +++ b/src/proptests.rs @@ -357,16 +357,13 @@ fn test_multi_record() { { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); multi_record_log.truncate("queue", 0).unwrap(); - assert_eq!( - multi_record_log + let records: Vec = multi_record_log .range("queue", ..) .unwrap() - .collect::>(), - [Record { - position: 1, - payload: Cow::Borrowed(&b"22"[..]) - }], - ); + .collect::>(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].position, 1); + assert_eq!(records[0].payload.to_cow(), b"22".as_slice()); } { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); @@ -379,13 +376,15 @@ fn test_multi_record() { } { let multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); - assert_eq!( - multi_record_log + let records = multi_record_log .range("queue", ..) .unwrap() - .collect::>(), - [Record::new(1, b"22"), Record::new(2, b"hello"),] - ); + .collect::>(); + assert_eq!(records.len(), 2); + assert_eq!(records[0].position, 1); + assert_eq!(records[0].payload.to_cow(), b"22".as_slice()); + assert_eq!(records[1].position, 2); + assert_eq!(records[1].payload.to_cow(), b"hello".as_slice()); } } diff --git a/src/tests.rs b/src/tests.rs index 110ba32..bae589c 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -9,7 +9,7 @@ fn read_all_records<'a>(multi_record_log: &'a MultiRecordLog, queue: &str) -> Ve let mut next_pos = u64::default(); for Record { position, payload } in multi_record_log.range(queue, next_pos..).unwrap() { assert_eq!(position, next_pos); - records.push(payload); + records.push(payload.to_cow()); next_pos += 1; } records @@ -56,6 +56,7 @@ fn test_multi_record_log_simple() { multi_record_log .append_record("queue", None, &b"happy"[..]) .unwrap(); + let payload_bufs = read_all_records(&multi_record_log, "queue"); assert_eq!( &read_all_records(&multi_record_log, "queue"), &[b"hello".as_slice(), b"happy".as_slice()] @@ -235,7 +236,7 @@ fn test_multi_insert_truncate() { &multi_record_log .range("queue", ..) .unwrap() - .map(|record| record.payload) + .map(|record| record.payload.to_cow()) .collect::>(), &[b"2".as_slice(), b"3".as_slice(), b"4".as_slice()] ) @@ -248,7 +249,7 @@ fn test_multi_insert_truncate() { &multi_record_log .range("queue", ..) .unwrap() - .map(|record| record.payload) + .map(|record| record.payload.to_cow()) .collect::>(), &[b"3".as_slice(), b"4".as_slice()] ) @@ -259,7 +260,7 @@ fn test_multi_insert_truncate() { &multi_record_log .range("queue", ..) .unwrap() - .map(|record| record.payload) + .map(|record| record.payload.to_cow()) .collect::>(), &[b"3".as_slice(), b"4".as_slice()] ) @@ -291,30 +292,40 @@ fn test_truncate_range_correct_pos() { .unwrap(), Some(2) ); - assert_eq!( - multi_record_log + { + let records = multi_record_log .range("queue", ..) .unwrap() - .collect::>(), - &[Record::new(2u64, b"3")] + .collect::>(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].position, 2); + assert_eq!( + records[0].payload.to_cow(), + b"3".as_slice() ); + } - assert_eq!( - multi_record_log + + { + let records = multi_record_log .range("queue", 2..) .unwrap() - .collect::>(), - &[Record::new(2, b"3")] - ); + .collect::>(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].position, 2); + assert_eq!(records[0].payload.to_cow(), b"3".as_slice()); + } - use std::ops::Bound; - assert_eq!( - multi_record_log + { + use std::ops::Bound; + let records = multi_record_log .range("queue", (Bound::Excluded(1), Bound::Unbounded)) .unwrap() - .collect::>(), - &[Record::new(2, b"3")] - ); + .collect::>(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].position, 2); + assert_eq!(records[0].payload.to_cow(), b"3".as_slice()); + } } } @@ -383,7 +394,7 @@ fn test_open_corrupted() { let mut count = 0; for Record { position, payload } in multi_record_log.range("queue", ..).unwrap() { - assert_eq!(payload, format!("{position:08}").as_bytes()); + assert_eq!(payload.to_cow(), format!("{position:08}").as_bytes()); count += 1; } assert!(count > 4096); @@ -449,7 +460,7 @@ fn test_last_record() { let Record { position, payload } = multi_record_log.last_record("queue1").unwrap().unwrap(); assert_eq!(position, 0); - assert_eq!(payload, &b"hello"[..]); + assert_eq!(payload.to_cow(), &b"hello"[..]); multi_record_log.truncate("queue1", 0).unwrap(); From 2d8c989b216a75c548236513661439daef4a0115 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 16 Mar 2024 11:43:53 +0900 Subject: [PATCH 3/5] simplified (hopefully) --- Cargo.toml | 1 + src/lib.rs | 30 +--------- src/mem/arena.rs | 87 ++++++++++++++++++---------- src/mem/mod.rs | 4 +- src/mem/queue.rs | 42 +++++--------- src/mem/queues.rs | 5 +- src/mem/rolling_buffer.rs | 115 +++++++++++++++++++++----------------- src/mem/tests.rs | 7 +-- src/proptests.rs | 15 +++-- src/tests.rs | 97 +++++++++++++++----------------- 10 files changed, 195 insertions(+), 208 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6e3723d..85a2675 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ thiserror = "1" tracing = "0.1.37" [dev-dependencies] +mock_instant = "0.3" criterion = "0.4" futures = "0.3" proptest = "1" diff --git a/src/lib.rs b/src/lib.rs index 066d828..169630d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,40 +17,16 @@ pub use multi_record_log::{MultiRecordLog, SyncPolicy}; #[derive(Debug)] pub struct Record<'a> { pub position: u64, - payload: PagesBuf<'a>, + pub payload: PagesBuf<'a>, } - impl<'a> Record<'a> { #[cfg(test)] - pub fn payload_equal(&self, mut payload: &[u8]) -> bool { - use bytes::Buf; - let mut self_payload = self.payload; - if self_payload.remaining() != payload.len() { - return false; - } - while self_payload.has_remaining() { - let chunk = self_payload.chunk(); - let chunk_len = chunk.len(); - if chunk != &payload[..chunk_len] { - return false; - } - self_payload.advance(chunk_len); - payload = &payload[chunk_len..]; - } - true + pub fn payload_equal(&self, payload: &[u8]) -> bool { + self.payload.to_cow() == payload } } -// impl<'a> Record<'a> { -// pub fn new(position: u64, payload: &'a [u8]) -> Self { -// Record { -// position, -// payload: Cow::Borrowed(payload), -// } -// } -// } - #[derive(Clone, Default, Debug, Ord, PartialOrd, Eq, PartialEq)] pub struct FileNumber { file_number: Arc, diff --git a/src/mem/arena.rs b/src/mem/arena.rs index 473999a..7a6800d 100644 --- a/src/mem/arena.rs +++ b/src/mem/arena.rs @@ -1,4 +1,9 @@ -use std::time::{Duration, Instant}; +use std::time::Duration; +#[cfg(not(test))] +use std::time::Instant; + +#[cfg(test)] +use mock_instant::Instant; #[cfg(not(test))] pub const PAGE_SIZE: usize = 1 << 20; @@ -60,7 +65,7 @@ impl Default for ArenaStats { fn default() -> ArenaStats { ArenaStats { // We arbitrarily initialize num used pages former to 100. - max_num_used_pages_former: 100, + max_num_used_pages_former: 0, max_num_used_pages_current: 0, call_counter: 0u8, next_window_start: Instant::now(), @@ -76,10 +81,12 @@ impl ArenaStats { self.next_window_start = now + WINDOW; } + /// Records the number of used pages, and returns an estimation of the maximum number of pages + /// in the last 5 minutes. pub fn record_num_used_page(&mut self, num_used_pages: usize) -> usize { // The only function of the call counter is to avoid calling `Instant::now()` // at every single call. - self.call_counter = self.call_counter.wrapping_add(1); + self.call_counter = (self.call_counter + 1) % 64; if self.call_counter == 0u8 { let now = Instant::now(); if now > self.next_window_start { @@ -87,26 +94,17 @@ impl ArenaStats { } } self.max_num_used_pages_current = self.max_num_used_pages_current.max(num_used_pages); - self.target_num_pages() - } - - // This method returns a target number of pages. - // - // If we currently have a number of allocated pages higher than this, we need to free - // pages until we reach this number. - fn target_num_pages(&self) -> usize { - let max_over_both_windows = self - .max_num_used_pages_former - .max(self.max_num_used_pages_current); - (max_over_both_windows + 10).max(max_over_both_windows * 105 / 100) + self.max_num_used_pages_former + .max(self.max_num_used_pages_current) } } impl Arena { /// Returns an allocated page id. - pub fn get_page_id(&mut self) -> PageId { + pub fn acquire_page(&mut self) -> PageId { if let Some(page_id) = self.free_page_ids.pop() { assert!(self.pages[page_id.0].is_some()); + self.gc(); return page_id; } let page: Page = vec![0u8; PAGE_SIZE].into_boxed_slice(); @@ -114,10 +112,12 @@ impl Arena { let slot = &mut self.pages[free_slot.0]; assert!(slot.is_none()); *slot = Some(page); - return free_slot; + self.gc(); + free_slot } else { let new_page_id = self.pages.len(); self.pages.push(Some(page)); + self.gc(); PageId(new_page_id) } } @@ -141,7 +141,11 @@ impl Arena { /// `gc` releases memory by deallocating ALL of the free pages. pub fn gc(&mut self) { let num_used_pages = self.num_used_pages(); - let target_num_pages = self.stats.record_num_used_page(num_used_pages); + let max_used_num_pages_in_last_5_min = self.stats.record_num_used_page(num_used_pages); + // We pick a target slightly higher than the maximum number of pages used in the last 5 + // minutes to avoid needless allocations when we are experience a general increase + // in memory usage. + let target_num_pages = (max_used_num_pages_in_last_5_min * 105 / 100).max(10); let num_pages_to_free = self.num_allocated_pages().saturating_sub(target_num_pages); assert!(num_pages_to_free <= self.free_page_ids.len()); for _ in 0..num_pages_to_free { @@ -162,10 +166,6 @@ impl Arena { self.pages.len() - self.free_slots.len() - self.free_page_ids.len() } - pub fn capacity(&self) -> usize { - self.num_allocated_pages() * PAGE_SIZE - } - pub fn unused_capacity(&self) -> usize { self.free_page_ids.len() * PAGE_SIZE } @@ -173,29 +173,56 @@ impl Arena { #[cfg(test)] mod tests { + use mock_instant::MockClock; + use super::*; #[test] fn test_arena_simple() { let mut arena = Arena::default(); - assert_eq!(arena.capacity(), 0); - assert_eq!(arena.get_page_id(), PageId(0)); - assert_eq!(arena.get_page_id(), PageId(1)); + assert_eq!(arena.num_allocated_pages(), 0); + assert_eq!(arena.acquire_page(), PageId(0)); + assert_eq!(arena.acquire_page(), PageId(1)); arena.release_page(PageId(0)); - assert_eq!(arena.get_page_id(), PageId(0)); + assert_eq!(arena.acquire_page(), PageId(0)); } #[test] fn test_arena_gc() { let mut arena = Arena::default(); - assert_eq!(arena.capacity(), 0); - assert_eq!(arena.get_page_id(), PageId(0)); - assert_eq!(arena.get_page_id(), PageId(1)); + assert_eq!(arena.num_allocated_pages(), 0); + assert_eq!(arena.acquire_page(), PageId(0)); + assert_eq!(arena.acquire_page(), PageId(1)); arena.release_page(PageId(1)); assert_eq!(arena.num_allocated_pages(), 2); arena.gc(); assert_eq!(arena.num_allocated_pages(), 2); - assert_eq!(arena.get_page_id(), PageId(1)); + assert_eq!(arena.acquire_page(), PageId(1)); assert_eq!(arena.num_allocated_pages(), 2); } + + #[test] + fn test_arena_stats() { + let mut arena_stats = ArenaStats::default(); + for _ in 0..256 { + assert_eq!(arena_stats.record_num_used_page(10), 10); + } + MockClock::advance(WINDOW.mul_f32(1.1f32)); + for _ in 0..256 { + assert_eq!(arena_stats.record_num_used_page(1), 10); + } + MockClock::advance(WINDOW.mul_f32(1.1f32)); + for _ in 0..256 { + arena_stats.record_num_used_page(1); + } + assert_eq!(arena_stats.record_num_used_page(1), 1); + assert_eq!(arena_stats.record_num_used_page(2), 2); + for _ in 0..256 { + assert_eq!(arena_stats.record_num_used_page(1), 2); + } + MockClock::advance(WINDOW); + for _ in 0..256 { + assert_eq!(arena_stats.record_num_used_page(1), 2); + } + } } diff --git a/src/mem/mod.rs b/src/mem/mod.rs index 79ef6ca..9587e60 100644 --- a/src/mem/mod.rs +++ b/src/mem/mod.rs @@ -3,11 +3,11 @@ mod queue; mod queues; mod rolling_buffer; -use self::arena::{Arena, PAGE_SIZE}; +use self::arena::Arena; pub(crate) use self::queue::MemQueue; pub(crate) use self::queues::MemQueues; -use self::rolling_buffer::RollingBuffer; pub use self::rolling_buffer::PagesBuf; +use self::rolling_buffer::RollingBuffer; #[cfg(test)] mod tests; diff --git a/src/mem/queue.rs b/src/mem/queue.rs index 20167b1..e292a25 100644 --- a/src/mem/queue.rs +++ b/src/mem/queue.rs @@ -1,4 +1,6 @@ +use std::collections::VecDeque; use std::ops::{Bound, RangeBounds}; + use crate::error::AppendError; use crate::mem::{Arena, RollingBuffer}; use crate::{FileNumber, Record}; @@ -16,30 +18,17 @@ struct RecordMeta { pub(crate) struct MemQueue { // Concatenated records concatenated_records: RollingBuffer, + // If `record_metas` is not empty, `start_position` should be the position of the first record. start_position: u64, - record_metas: Vec, + record_metas: VecDeque, } -// fn concatenate_buffers<'a>(mut buf: impl Buf + 'a) -> Cow<'a, [u8]> { -// let first_chunk: &'a buf = buf.chunk(); -// if buf.remaining() == first_chunk.len() { -// return Cow::Borrowed(first_chunk); -// } -// let mut concatenated_buffer: Vec = Vec::with_capacity(buf.remaining()); -// while buf.has_remaining() { -// let chunk = buf.chunk(); -// concatenated_buffer.extend_from_slice(chunk); -// buf.advance(chunk.len()); -// } -// Cow::Owned(concatenated_buffer) -// } - impl MemQueue { pub fn with_next_position(next_position: u64) -> Self { MemQueue { concatenated_records: RollingBuffer::new(), start_position: next_position, - record_metas: Vec::new(), + record_metas: Default::default(), } } @@ -54,10 +43,10 @@ impl MemQueue { /// Returns the last record stored in the queue. pub fn last_record<'a>(&'a self, arena: &'a Arena) -> Option> { - let record = self.record_metas.last()?; + let record = self.record_metas.back()?; let record_payload = self .concatenated_records - .get_range_buf(record.start_offset.., arena); + .get_range(record.start_offset.., arena); Some(Record { position: record.position, payload: record_payload, @@ -67,7 +56,7 @@ impl MemQueue { /// Returns what the next position should be. pub fn next_position(&self) -> u64 { self.record_metas - .last() + .back() .map(|record| record.position + 1) .unwrap_or(self.start_position) } @@ -92,7 +81,7 @@ impl MemQueue { self.start_position = target_position; } - let file_number = if let Some(record_meta) = self.record_metas.last_mut() { + let file_number = if let Some(record_meta) = self.record_metas.back_mut() { if record_meta.file_number.as_ref() == Some(file_number) { record_meta.file_number.take().unwrap() } else { @@ -103,11 +92,11 @@ impl MemQueue { }; let record_meta = RecordMeta { - start_offset: self.concatenated_records.len(), + start_offset: self.concatenated_records.end_offset(), file_number: Some(file_number), position: target_position, }; - self.record_metas.push(record_meta); + self.record_metas.push_back(record_meta); self.concatenated_records.extend_from_slice(payload, arena); Ok(()) } @@ -149,9 +138,9 @@ impl MemQueue { } else { Bound::Unbounded }; - let payload= self + let payload = self .concatenated_records - .get_range_buf((start_bound, end_bound), arena); + .get_range((start_bound, end_bound), arena); // let payload = concatenate_buffers(payload_buf); Record { position, payload } }) @@ -178,11 +167,8 @@ impl MemQueue { let start_offset_to_keep: usize = self.record_metas[first_record_to_keep].start_offset; self.record_metas.drain(..first_record_to_keep); - // for record_meta in &mut self.record_metas { - // record_meta.start_offset -= start_offset_to_keep; - // } self.concatenated_records - .truncate_up_to_included(start_offset_to_keep, arena); + .truncate_up_to_excluded(start_offset_to_keep, arena); self.start_position = truncate_up_to_pos + 1; first_record_to_keep } diff --git a/src/mem/queues.rs b/src/mem/queues.rs index e0da9da..c6ca444 100644 --- a/src/mem/queues.rs +++ b/src/mem/queues.rs @@ -165,10 +165,7 @@ impl MemQueues { let size = self .queues .iter() - .map(|(name, queue)| { - dbg!(queue.size()); - name.len() + queue.size() - }) + .map(|(name, queue)| name.len() + queue.size()) .sum(); let capacity = self diff --git a/src/mem/rolling_buffer.rs b/src/mem/rolling_buffer.rs index 9f31960..240f291 100644 --- a/src/mem/rolling_buffer.rs +++ b/src/mem/rolling_buffer.rs @@ -4,9 +4,11 @@ use bytes::Buf; use crate::mem::arena::{Arena, PageId, PAGE_SIZE}; -/// A rolling buffer stores a drainable buffer. +/// `RollingBuffer` stores a short slice of an seemingly infinite buffer with offset spawning from +/// [0..u64::MAX). /// -/// On +/// It is possible to append bytes to the buffer with `.extend_from_slice(..)`, +/// or drop the bytes up to a given offset with `truncate_up_to()`. #[derive(Default)] pub struct RollingBuffer { page_ids: Vec, @@ -15,8 +17,13 @@ pub struct RollingBuffer { fn num_pages_required(range: Range) -> usize { let Range { start, end } = range; + if start >= end { + // This is an important non-trivial edge case. + // If the range is empty, we retain no pages. + return 0; + } let first_page = start / PAGE_SIZE; - let last_page = (end - 1) / PAGE_SIZE; + let last_page = end.saturating_sub(1) / PAGE_SIZE; last_page - first_page + 1 } @@ -32,47 +39,50 @@ impl RollingBuffer { self.range.len() } + pub fn end_offset(&self) -> usize { + self.range.end + } + pub fn capacity(&self) -> usize { self.page_ids.len() * PAGE_SIZE } pub fn clear(&mut self, arena: &mut Arena) { - for page_id in self.page_ids.drain(..) { - arena.release_page(page_id); - } - self.range = 0..0; + self.truncate_up_to_excluded(self.range.end, arena); } - /// Truncate the buffer, dropping the first `truncate_len` bytes. - pub fn truncate_up_to_included(&mut self, truncate_len: usize, arena: &mut Arena) { - if truncate_len >= self.range.len() { - self.clear(arena); - return; - } - let num_pages = num_pages_required((self.range.start + truncate_len)..self.range.end); + fn check_invariants(&self) { + debug_assert_eq!(num_pages_required(self.range.clone()), self.page_ids.len()); + } + + /// Truncate the buffer, all of the bytes striclty below `new_start``. + pub fn truncate_up_to_excluded(&mut self, new_start: usize, arena: &mut Arena) { + assert!(new_start <= self.range.end); + let num_pages = num_pages_required(new_start..self.range.end); assert!(num_pages <= self.page_ids.len()); let num_pages_to_drop = self.page_ids.len() - num_pages; - self.range.start = self.range.start + truncate_len; + self.range.start = new_start; if num_pages_to_drop > 0 { for page_id in self.page_ids.drain(..(self.page_ids.len() - num_pages)) { arena.release_page(page_id); } } + self.check_invariants(); } + /// Returns a chunk of available memory, either remaining from the last page, + /// or acquires a new page from the arena. fn get_page_with_room<'a>(&mut self, arena: &'a mut Arena) -> &'a mut [u8] { let start_offset = self.range.end % PAGE_SIZE; - if start_offset == 0 { - // The page is entirely used. - let new_page_id = arena.get_page_id(); + if start_offset == 0 || self.page_ids.is_empty() { + // The page is entirely used, or there are no pages at all. + // Let's allocate a new page. + let new_page_id = arena.acquire_page(); self.page_ids.push(new_page_id); - let page = arena.page_mut(new_page_id); - &mut page[..] - } else { - let page_id = self.page_ids.last().copied().unwrap(); - let page = arena.page_mut(page_id); - &mut page[start_offset..] } + let page_id = self.page_ids.last().copied().unwrap(); + let page = arena.page_mut(page_id); + &mut page[start_offset..] } pub fn extend_from_slice(&mut self, mut slice: &[u8], arena: &mut Arena) { @@ -84,14 +94,15 @@ impl RollingBuffer { slice = queue; self.range.end += len; } + self.check_invariants(); } - fn get_range_buf_aux<'slf, 'a: 'slf>( + fn get_range_aux<'slf, 'a: 'slf>( &'slf self, range: Range, arena: &'a Arena, ) -> PagesBuf<'slf> { - let Range {start, end} = range; + let Range { start, end } = range; assert!(start >= self.range.start); assert!(end <= self.range.end); if end <= start { @@ -114,7 +125,7 @@ impl RollingBuffer { } } - pub fn get_range_buf<'slf, 'a: 'slf, R>( + pub fn get_range<'slf, 'a: 'slf, R>( &'slf self, range_bounds: R, arena: &'a Arena, @@ -130,9 +141,9 @@ impl RollingBuffer { let end = match range_bounds.end_bound() { Bound::Included(pos) => pos + 1, Bound::Excluded(pos) => *pos, - Bound::Unbounded => self.range.end + Bound::Unbounded => self.range.end, }; - self.get_range_buf_aux(start..end, arena) + self.get_range_aux(start..end, arena) } } @@ -146,14 +157,7 @@ pub struct PagesBuf<'a> { impl<'a> std::fmt::Debug for PagesBuf<'a> { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let mut buf = self.clone(); - let mut vec = Vec::with_capacity(self.remaining_len); - while buf.has_remaining() { - let chunk = buf.chunk(); - vec.extend_from_slice(chunk); - buf.advance(chunk.len()); - } - vec.as_slice().fmt(f) + self.to_cow().fmt(f) } } @@ -172,6 +176,7 @@ impl<'a> PagesBuf<'a> { std::borrow::Cow::Owned(buf) } + // Contrary to Buf::chunk, this method returns a slice with a `'a` lifetime (so it can outlive 'self). fn chunk_with_lifetime(&self) -> &'a [u8] { let Some(first_page_id) = self.page_ids.first().copied() else { return &[]; @@ -183,7 +188,6 @@ impl<'a> PagesBuf<'a> { current_page } } - } impl<'a> Buf for PagesBuf<'a> { @@ -191,20 +195,13 @@ impl<'a> Buf for PagesBuf<'a> { self.remaining_len } + #[inline] fn chunk(&self) -> &[u8] { - // let Some(first_page_id) = self.page_ids.first().copied() else { - // return &[]; - // }; - // let current_page = &self.arena.page(first_page_id)[self.start_offset..]; - // if current_page.len() > self.remaining_len { - // ¤t_page[..self.remaining_len] - // } else { - // current_page - // } self.chunk_with_lifetime() } fn advance(&mut self, mut cnt: usize) { + cnt = cnt.min(self.remaining_len); while cnt > 0 { if self.page_ids.is_empty() { return; @@ -241,15 +238,15 @@ mod tests { fn test_rolling_buffer() { let mut arena = Arena::default(); let text = b"hello happy tax payer"; - for truncate_len in 0..text.len() { + for new_start in 0..text.len() { let mut rolling_buffer = RollingBuffer::new(); rolling_buffer.extend_from_slice(&b"hello"[..], &mut arena); rolling_buffer.extend_from_slice(&b" happy"[..], &mut arena); rolling_buffer.extend_from_slice(&b" tax payer"[..], &mut arena); - rolling_buffer.truncate_up_to_included(truncate_len, &mut arena); - for start in truncate_len..text.len() { + rolling_buffer.truncate_up_to_excluded(new_start, &mut arena); + for start in new_start..text.len() { for end in start..text.len() { - let bytes: Vec = to_vec(rolling_buffer.get_range_buf(start..end, &arena)); + let bytes: Vec = to_vec(rolling_buffer.get_range(start..end, &arena)); assert_eq!(&text[start..end], &bytes[..]); } } @@ -269,4 +266,20 @@ mod tests { assert_eq!(rolling_buffer.len(), 0); assert_eq!(arena.num_used_pages(), 0); } + + #[test] + fn test_num_pages_required() { + assert_eq!(num_pages_required(0..0), 0); + assert_eq!(num_pages_required(2..2), 0); + assert_eq!(num_pages_required(2..1), 0); + assert_eq!(num_pages_required(0..1), 1); + assert_eq!(num_pages_required(0..PAGE_SIZE), 1); + assert_eq!(num_pages_required(0..PAGE_SIZE + 1), 2); + assert_eq!(num_pages_required(0..2 * PAGE_SIZE), 2); + assert_eq!(num_pages_required(0..2 * PAGE_SIZE + 1), 3); + assert_eq!(num_pages_required(PAGE_SIZE - 1..2 * PAGE_SIZE), 2); + assert_eq!(num_pages_required(PAGE_SIZE - 1..2 * PAGE_SIZE + 1), 3); + assert_eq!(num_pages_required(PAGE_SIZE..2 * PAGE_SIZE), 1); + assert_eq!(num_pages_required(PAGE_SIZE..2 * PAGE_SIZE + 1), 2); + } } diff --git a/src/mem/tests.rs b/src/mem/tests.rs index a3908bb..1f0704b 100644 --- a/src/mem/tests.rs +++ b/src/mem/tests.rs @@ -1,7 +1,6 @@ use super::*; use crate::error::{AlreadyExists, AppendError}; -use crate::FileNumber; -use crate::Record; +use crate::{FileNumber, Record}; #[test] fn test_mem_queues_already_exists() { @@ -44,7 +43,7 @@ fn test_mem_queues() { .append_record("droopy", &FileNumber::for_test(1), 3, b"payer") .is_ok()); let record = mem_queues.range("droopy", 0..).unwrap().next().unwrap(); - assert_eq!( record.position, 0); + assert_eq!(record.position, 0); assert!(record.payload_equal(b"hello")); let droopy: Vec = mem_queues.range("droopy", 1..).unwrap().collect(); assert_eq!(droopy.len(), 3); @@ -88,13 +87,11 @@ fn test_mem_queues_truncate() { } mem_queues.truncate("droopy", 3); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); - dbg!(&droopy); assert_eq!(droopy.len(), 2); assert_eq!(droopy[0].position, 4); assert!(droopy[0].payload_equal(b"!")); assert_eq!(droopy[1].position, 5); - dbg!(&droopy[1]); assert!(droopy[1].payload_equal(b"payer2")); } diff --git a/src/proptests.rs b/src/proptests.rs index 63fed94..d488600 100644 --- a/src/proptests.rs +++ b/src/proptests.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::collections::HashMap; use std::ops::Range; @@ -357,10 +356,10 @@ fn test_multi_record() { { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); multi_record_log.truncate("queue", 0).unwrap(); - let records: Vec = multi_record_log - .range("queue", ..) - .unwrap() - .collect::>(); + let records: Vec = multi_record_log + .range("queue", ..) + .unwrap() + .collect::>(); assert_eq!(records.len(), 1); assert_eq!(records[0].position, 1); assert_eq!(records[0].payload.to_cow(), b"22".as_slice()); @@ -377,9 +376,9 @@ fn test_multi_record() { { let multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); let records = multi_record_log - .range("queue", ..) - .unwrap() - .collect::>(); + .range("queue", ..) + .unwrap() + .collect::>(); assert_eq!(records.len(), 2); assert_eq!(records[0].position, 1); assert_eq!(records[0].payload.to_cow(), b"22".as_slice()); diff --git a/src/tests.rs b/src/tests.rs index bae589c..fc51b72 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -56,7 +56,6 @@ fn test_multi_record_log_simple() { multi_record_log .append_record("queue", None, &b"happy"[..]) .unwrap(); - let payload_bufs = read_all_records(&multi_record_log, "queue"); assert_eq!( &read_all_records(&multi_record_log, "queue"), &[b"hello".as_slice(), b"happy".as_slice()] @@ -244,7 +243,6 @@ fn test_multi_insert_truncate() { { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); multi_record_log.truncate("queue", 1).unwrap(); - assert_eq!( &multi_record_log .range("queue", ..) @@ -270,62 +268,56 @@ fn test_multi_insert_truncate() { #[test] fn test_truncate_range_correct_pos() { let tempdir = tempfile::tempdir().unwrap(); + let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); + multi_record_log.create_queue("queue").unwrap(); + assert_eq!( + multi_record_log + .append_record("queue", None, &b"1"[..]) + .unwrap(), + Some(0) + ); + assert_eq!( + multi_record_log + .append_record("queue", None, &b"2"[..]) + .unwrap(), + Some(1) + ); + multi_record_log.truncate("queue", 1).unwrap(); + assert_eq!( + multi_record_log + .append_record("queue", None, &b"3"[..]) + .unwrap(), + Some(2) + ); { - let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); - multi_record_log.create_queue("queue").unwrap(); - assert_eq!( - multi_record_log - .append_record("queue", None, &b"1"[..]) - .unwrap(), - Some(0) - ); - assert_eq!( - multi_record_log - .append_record("queue", None, &b"2"[..]) - .unwrap(), - Some(1) - ); - multi_record_log.truncate("queue", 1).unwrap(); - assert_eq!( - multi_record_log - .append_record("queue", None, &b"3"[..]) - .unwrap(), - Some(2) - ); - { let records = multi_record_log - .range("queue", ..) - .unwrap() - .collect::>(); + .range("queue", ..) + .unwrap() + .collect::>(); assert_eq!(records.len(), 1); assert_eq!(records[0].position, 2); - assert_eq!( - records[0].payload.to_cow(), - b"3".as_slice() - ); - } - + assert_eq!(records[0].payload.to_cow(), b"3".as_slice()); + } - { - let records = multi_record_log - .range("queue", 2..) - .unwrap() - .collect::>(); - assert_eq!(records.len(), 1); - assert_eq!(records[0].position, 2); - assert_eq!(records[0].payload.to_cow(), b"3".as_slice()); - } + { + let records = multi_record_log + .range("queue", 2..) + .unwrap() + .collect::>(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].position, 2); + assert_eq!(records[0].payload.to_cow(), b"3".as_slice()); + } - { - use std::ops::Bound; - let records = multi_record_log - .range("queue", (Bound::Excluded(1), Bound::Unbounded)) - .unwrap() - .collect::>(); - assert_eq!(records.len(), 1); - assert_eq!(records[0].position, 2); - assert_eq!(records[0].payload.to_cow(), b"3".as_slice()); - } + { + use std::ops::Bound; + let records = multi_record_log + .range("queue", (Bound::Excluded(1), Bound::Unbounded)) + .unwrap() + .collect::>(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].position, 2); + assert_eq!(records[0].payload.to_cow(), b"3".as_slice()); } } @@ -347,7 +339,6 @@ fn test_multi_record_size() { .unwrap(); let size_mem_append = multi_record_log.resource_usage(); assert!(size_mem_append.memory_used_bytes > size_mem_create.memory_used_bytes); - dbg!(&size_mem_append); assert!(size_mem_append.memory_allocated_bytes >= size_mem_append.memory_used_bytes); assert!(size_mem_append.memory_allocated_bytes >= size_mem_create.memory_allocated_bytes); From bdd759f9b0cbba8c118ea991aa530f99c0306f77 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 16 Mar 2024 15:21:44 +0900 Subject: [PATCH 4/5] Reducing page size to 512KB --- src/mem/arena.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mem/arena.rs b/src/mem/arena.rs index 7a6800d..fb0571f 100644 --- a/src/mem/arena.rs +++ b/src/mem/arena.rs @@ -5,8 +5,9 @@ use std::time::Instant; #[cfg(test)] use mock_instant::Instant; +/// 256 KiB. #[cfg(not(test))] -pub const PAGE_SIZE: usize = 1 << 20; +pub const PAGE_SIZE: usize = 1 << 18; #[cfg(test)] pub const PAGE_SIZE: usize = 7; @@ -145,7 +146,7 @@ impl Arena { // We pick a target slightly higher than the maximum number of pages used in the last 5 // minutes to avoid needless allocations when we are experience a general increase // in memory usage. - let target_num_pages = (max_used_num_pages_in_last_5_min * 105 / 100).max(10); + let target_num_pages = max_used_num_pages_in_last_5_min + 10; let num_pages_to_free = self.num_allocated_pages().saturating_sub(target_num_pages); assert!(num_pages_to_free <= self.free_page_ids.len()); for _ in 0..num_pages_to_free { From 94b7b874693c46b97686cd3a7ec01e1ba4e7a641 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 16 Mar 2024 18:54:25 +0900 Subject: [PATCH 5/5] roll is done externally --- src/mem/arena.rs | 52 +++++++++++---------------------------- src/mem/queues.rs | 4 +++ src/mem/rolling_buffer.rs | 3 ++- src/multi_record_log.rs | 4 +++ 4 files changed, 25 insertions(+), 38 deletions(-) diff --git a/src/mem/arena.rs b/src/mem/arena.rs index fb0571f..086cbb3 100644 --- a/src/mem/arena.rs +++ b/src/mem/arena.rs @@ -1,10 +1,3 @@ -use std::time::Duration; -#[cfg(not(test))] -use std::time::Instant; - -#[cfg(test)] -use mock_instant::Instant; - /// 256 KiB. #[cfg(not(test))] pub const PAGE_SIZE: usize = 1 << 18; @@ -56,44 +49,27 @@ pub struct Arena { struct ArenaStats { max_num_used_pages_former: usize, max_num_used_pages_current: usize, - call_counter: u8, - next_window_start: Instant, } -const WINDOW: Duration = Duration::from_secs(60); - impl Default for ArenaStats { fn default() -> ArenaStats { ArenaStats { - // We arbitrarily initialize num used pages former to 100. max_num_used_pages_former: 0, max_num_used_pages_current: 0, - call_counter: 0u8, - next_window_start: Instant::now(), } } } impl ArenaStats { /// This method happens when we are changing time window. - fn roll(&mut self, now: Instant) { + fn roll(&mut self) { self.max_num_used_pages_former = self.max_num_used_pages_current; self.max_num_used_pages_current = 0; - self.next_window_start = now + WINDOW; } /// Records the number of used pages, and returns an estimation of the maximum number of pages /// in the last 5 minutes. pub fn record_num_used_page(&mut self, num_used_pages: usize) -> usize { - // The only function of the call counter is to avoid calling `Instant::now()` - // at every single call. - self.call_counter = (self.call_counter + 1) % 64; - if self.call_counter == 0u8 { - let now = Instant::now(); - if now > self.next_window_start { - self.roll(now); - } - } self.max_num_used_pages_current = self.max_num_used_pages_current.max(num_used_pages); self.max_num_used_pages_former .max(self.max_num_used_pages_current) @@ -105,7 +81,6 @@ impl Arena { pub fn acquire_page(&mut self) -> PageId { if let Some(page_id) = self.free_page_ids.pop() { assert!(self.pages[page_id.0].is_some()); - self.gc(); return page_id; } let page: Page = vec![0u8; PAGE_SIZE].into_boxed_slice(); @@ -113,12 +88,10 @@ impl Arena { let slot = &mut self.pages[free_slot.0]; assert!(slot.is_none()); *slot = Some(page); - self.gc(); free_slot } else { let new_page_id = self.pages.len(); self.pages.push(Some(page)); - self.gc(); PageId(new_page_id) } } @@ -139,20 +112,25 @@ impl Arena { self.gc(); } - /// `gc` releases memory by deallocating ALL of the free pages. - pub fn gc(&mut self) { + /// Clients are expected roll the stats regularly. + pub fn roll_and_gc(&mut self) { + self.stats.roll(); + self.gc(); + } + + /// `gc` releases memory by some of the free pages. + fn gc(&mut self) { let num_used_pages = self.num_used_pages(); let max_used_num_pages_in_last_5_min = self.stats.record_num_used_page(num_used_pages); - // We pick a target slightly higher than the maximum number of pages used in the last 5 - // minutes to avoid needless allocations when we are experience a general increase + // We pick a target slightly higher than the maximum number of pages to avoid needless + // allocations when we are experience a general increase // in memory usage. let target_num_pages = max_used_num_pages_in_last_5_min + 10; let num_pages_to_free = self.num_allocated_pages().saturating_sub(target_num_pages); - assert!(num_pages_to_free <= self.free_page_ids.len()); - for _ in 0..num_pages_to_free { - let page_id = self.free_page_ids.pop().unwrap(); - self.pages[page_id.0] = None; - self.free_slots.push(page_id); + let num_free_pages_to_keep = self.free_page_ids.len() - num_pages_to_free; + for free_page_id in self.free_page_ids.drain(num_free_pages_to_keep..) { + self.pages[free_page_id.0] = None; + self.free_slots.push(free_page_id); } } diff --git a/src/mem/queues.rs b/src/mem/queues.rs index c6ca444..e116745 100644 --- a/src/mem/queues.rs +++ b/src/mem/queues.rs @@ -160,6 +160,10 @@ impl MemQueues { Some(queue.truncate_up_to_included(position, &mut self.arena)) } + pub fn roll_and_gc(&mut self) { + self.arena.roll_and_gc(); + } + /// Return a tuple of (size, capacity) of memory used by the memqueues pub fn size(&self) -> (usize, usize) { let size = self diff --git a/src/mem/rolling_buffer.rs b/src/mem/rolling_buffer.rs index 240f291..007b062 100644 --- a/src/mem/rolling_buffer.rs +++ b/src/mem/rolling_buffer.rs @@ -176,7 +176,8 @@ impl<'a> PagesBuf<'a> { std::borrow::Cow::Owned(buf) } - // Contrary to Buf::chunk, this method returns a slice with a `'a` lifetime (so it can outlive 'self). + // Contrary to Buf::chunk, this method returns a slice with a `'a` lifetime (so it can outlive + // 'self). fn chunk_with_lifetime(&self) -> &'a [u8] { let Some(first_page_id) = self.page_ids.first().copied() else { return &[]; diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index 2e09787..b7bc935 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -293,6 +293,10 @@ impl MultiRecordLog { Ok(removed_count) } + pub fn memory_gc(&mut self) { + self.in_mem_queues.roll_and_gc(); + } + fn run_gc_if_necessary(&mut self) -> io::Result<()> { debug!("run_gc_if_necessary"); if self