From edb70493fb3ab02ee616b0679f781e9ea83e34ad Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Thu, 14 May 2026 08:32:55 -0400 Subject: [PATCH] Return outcome of write functions (bytes written, records gc'ed, etc.) --- src/block_read_write.rs | 3 ++ src/frame/writer.rs | 15 +++++-- src/lib.rs | 30 ++++++++++++++ src/multi_record_log.rs | 92 ++++++++++++++++++++++++++--------------- src/proptests.rs | 21 +++++++--- src/recordlog/writer.rs | 15 ++++--- src/tests.rs | 21 ++++++---- 7 files changed, 141 insertions(+), 56 deletions(-) diff --git a/src/block_read_write.rs b/src/block_read_write.rs index c259283..64235b6 100644 --- a/src/block_read_write.rs +++ b/src/block_read_write.rs @@ -33,11 +33,13 @@ pub trait BlockWrite { fn num_bytes_remaining_in_block(&self) -> usize; } +#[cfg(test)] pub struct ArrayReader<'a> { block: [u8; BLOCK_NUM_BYTES], data: &'a [u8], } +#[cfg(test)] impl<'a> From<&'a [u8]> for ArrayReader<'a> { fn from(data: &'a [u8]) -> Self { assert!(data.len() >= BLOCK_NUM_BYTES); @@ -48,6 +50,7 @@ impl<'a> From<&'a [u8]> for ArrayReader<'a> { } } +#[cfg(test)] impl BlockRead for ArrayReader<'_> { fn next_block(&mut self) -> io::Result { if self.data.len() < BLOCK_NUM_BYTES { diff --git a/src/frame/writer.rs b/src/frame/writer.rs index 69d23c0..487b762 100644 --- a/src/frame/writer.rs +++ b/src/frame/writer.rs @@ -19,21 +19,28 @@ impl FrameWriter { } /// Writes a frame. The payload has to be lower than the - /// remaining space in the frame as defined - /// by `max_writable_frame_length`. - pub fn write_frame(&mut self, frame_type: FrameType, payload: &[u8]) -> io::Result<()> { + /// remaining space in the frame as defined by `max_writable_frame_length`. + /// + /// Returns the number of bytes pushed to the underlying + /// writer (header + payload, plus any zero-padding written to close out the current block). + pub fn write_frame(&mut self, frame_type: FrameType, payload: &[u8]) -> io::Result { + let mut num_bytes_written = 0; let num_bytes_remaining_in_block = self.wrt.num_bytes_remaining_in_block(); + if num_bytes_remaining_in_block < HEADER_LEN { let zero_bytes = [0u8; HEADER_LEN]; self.wrt .write(&zero_bytes[..num_bytes_remaining_in_block])?; + num_bytes_written += num_bytes_remaining_in_block; } let record_len = HEADER_LEN + payload.len(); let (buffer_header, buffer_record) = self.buffer[..record_len].split_at_mut(HEADER_LEN); buffer_record.copy_from_slice(payload); Header::for_payload(frame_type, payload).serialize(buffer_header); self.wrt.write(&self.buffer[..record_len])?; - Ok(()) + + num_bytes_written += record_len; + Ok(num_bytes_written) } /// Flush the buffered writer used in the FrameWriter. diff --git a/src/lib.rs b/src/lib.rs index 41ce3f4..7ecbbdd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,36 @@ pub struct ResourceUsage { pub disk_used_bytes: usize, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct AppendOutcome { + /// Position of the last record appended, or `None` for an idempotent no-op + /// (empty payloads, or `position_opt` already past the queue's head). + pub last_position: Option, + /// Bytes appended to the WAL: frame headers + payload + any end-of-block padding + /// written ahead of the record. Two identical calls may report different values + /// depending on where the write cursor sat within the current block (a frame crossing + /// a block boundary incurs padding). `0` for an idempotent no-op. + pub wal_bytes_written: u64, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TruncateOutcome { + /// Number of records evicted from the in-memory queue by this call. This is a delta: + /// a truncate at a position already covered by a previous truncate reports `0`. + pub evicted_records: usize, + pub wal_bytes_written: u64, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct DeleteQueueOutcome { + pub wal_bytes_written: u64, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct CreateQueueOutcome { + pub wal_bytes_written: u64, +} + #[cfg(test)] mod tests; diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index 2a04590..5f67989 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -12,7 +12,10 @@ use crate::mem::{MemQueue, QueuesSummary}; use crate::record::{MultiPlexedRecord, MultiRecord}; use crate::recordlog::RecordWriter; use crate::rolling::RollingWriter; -use crate::{mem, PersistAction, PersistPolicy, PersistState, Record, ResourceUsage}; +use crate::{ + mem, AppendOutcome, CreateQueueOutcome, DeleteQueueOutcome, PersistAction, PersistPolicy, + PersistState, Record, ResourceUsage, TruncateOutcome, +}; pub struct MultiRecordLog { record_log_writer: crate::recordlog::RecordWriter, @@ -99,7 +102,8 @@ impl MultiRecordLog { next_persist: persist_policy.into(), multi_record_spare_buffer: Vec::new(), }; - multi_record_log.run_gc_if_necessary()?; + // Bytes written by recovery-time GC are not surfaced to any user-facing API. + let _ = multi_record_log.run_gc_if_necessary()?; Ok(multi_record_log) } @@ -112,27 +116,31 @@ impl MultiRecordLog { /// Creates a new queue. /// /// Returns an error if the queue already exists. - pub fn create_queue(&mut self, queue: &str) -> Result<(), CreateQueueError> { + pub fn create_queue(&mut self, queue: &str) -> Result { info!(queue = queue, "create queue"); if self.queue_exists(queue) { return Err(CreateQueueError::AlreadyExists); } let record = MultiPlexedRecord::RecordPosition { queue, position: 0 }; - self.record_log_writer.write_record(record)?; + let num_bytes_written = self.record_log_writer.write_record(record)?; self.persist(PersistAction::FlushAndFsync)?; self.in_mem_queues.create_queue(queue)?; - Ok(()) + Ok(CreateQueueOutcome { + wal_bytes_written: num_bytes_written, + }) } - pub fn delete_queue(&mut self, queue: &str) -> Result<(), DeleteQueueError> { + pub fn delete_queue(&mut self, queue: &str) -> Result { info!(queue = queue, "delete queue"); let position = self.in_mem_queues.next_position(queue)?; let record = MultiPlexedRecord::DeleteQueue { queue, position }; - self.record_log_writer.write_record(record)?; + let mut num_bytes_written = self.record_log_writer.write_record(record)?; self.in_mem_queues.delete_queue(queue)?; - self.run_gc_if_necessary()?; + num_bytes_written += self.run_gc_if_necessary()?; self.persist(PersistAction::FlushAndFsync)?; - Ok(()) + Ok(DeleteQueueOutcome { + wal_bytes_written: num_bytes_written, + }) } pub fn queue_exists(&self, queue: &str) -> bool { @@ -153,7 +161,7 @@ impl MultiRecordLog { queue: &str, position_opt: Option, payload: impl Buf, - ) -> Result, AppendError> { + ) -> Result { self.append_records(queue, position_opt, std::iter::once(payload)) } @@ -168,12 +176,15 @@ impl MultiRecordLog { queue: &str, position_opt: Option, payloads: T, - ) -> Result, AppendError> { + ) -> Result { let next_position = self.in_mem_queues.next_position(queue)?; if let Some(position) = position_opt { // we accept position in the future, and move forward as required. if position + 1 == next_position { - return Ok(None); + return Ok(AppendOutcome { + last_position: None, + wal_bytes_written: 0, + }); } else if position < next_position { return Err(AppendError::Past); } @@ -186,7 +197,10 @@ impl MultiRecordLog { if multi_record_spare_buffer.is_empty() { self.multi_record_spare_buffer = multi_record_spare_buffer; // empty transaction: don't persist it - return Ok(None); + return Ok(AppendOutcome { + last_position: None, + wal_bytes_written: 0, + }); } let records = MultiRecord::new_unchecked(&multi_record_spare_buffer); @@ -195,7 +209,7 @@ impl MultiRecordLog { queue, records, }; - self.record_log_writer.write_record(record)?; + let num_bytes_written = self.record_log_writer.write_record(record)?; self.persist_on_policy()?; let mem_queue = self.in_mem_queues.get_queue_mut(queue)?; @@ -208,26 +222,29 @@ impl MultiRecordLog { } self.multi_record_spare_buffer = multi_record_spare_buffer; - Ok(Some(max_position)) + Ok(AppendOutcome { + last_position: Some(max_position), + wal_bytes_written: num_bytes_written, + }) } - fn record_empty_queues_position(&mut self) -> io::Result<()> { - let mut has_empty_queues = false; + fn record_empty_queues_position(&mut self) -> io::Result { + let mut num_bytes_written: u64 = 0; + for (queue_id, queue) in self.in_mem_queues.empty_queues() { let next_position = queue.next_position(); let record = MultiPlexedRecord::RecordPosition { queue: queue_id, position: next_position, }; - self.record_log_writer.write_record(record)?; - has_empty_queues = true + num_bytes_written += self.record_log_writer.write_record(record)?; } - if has_empty_queues { + if num_bytes_written > 0 { // We need to fsync here! We are remove files from the FS // so we need to make sure our empty queue positions are properly persisted. self.persist(PersistAction::FlushAndFsync)?; } - Ok(()) + Ok(num_bytes_written) } /// Truncates the queue up to a given `position`, included. This method immediately @@ -235,32 +252,39 @@ impl MultiRecordLog { /// asynchronously when they become exclusively composed of deleted records. /// /// This method will always truncate the record log and release the associated memory. - /// It returns the number of records deleted. pub fn truncate( &mut self, queue: &str, truncate_range: RangeToInclusive, - ) -> Result { + ) -> Result { info!(range=?truncate_range, queue = queue, "truncate queue"); if !self.queue_exists(queue) { return Err(TruncateError::MissingQueue(queue.to_string())); } - self.record_log_writer - .write_record(MultiPlexedRecord::Truncate { - truncate_range, - queue, - })?; - let removed_count = self + let mut num_bytes_written = + self.record_log_writer + .write_record(MultiPlexedRecord::Truncate { + truncate_range, + queue, + })?; + let evicted_records = self .in_mem_queues .truncate(queue, truncate_range) .unwrap_or(0); - self.run_gc_if_necessary()?; + num_bytes_written += self.run_gc_if_necessary()?; self.persist_on_policy()?; - Ok(removed_count) + Ok(TruncateOutcome { + evicted_records, + wal_bytes_written: num_bytes_written, + }) } - fn run_gc_if_necessary(&mut self) -> io::Result<()> { + /// Returns the number of bytes the GC pass appended to the WAL — empty-queue position + /// records, if any. Returns 0 when there's no GC work to do. + fn run_gc_if_necessary(&mut self) -> io::Result { debug!("run_gc_if_necessary"); + let mut num_bytes_written = 0; + if self .record_log_writer .directory() @@ -273,7 +297,7 @@ impl MultiRecordLog { // But first we clone the current file number to make sure that the file that will // contain the truncate positions it self won't be GC'ed. let _file_number = self.record_log_writer.current_file().clone(); - self.record_empty_queues_position()?; + num_bytes_written += self.record_empty_queues_position()?; self.record_log_writer.directory().gc()?; } // only execute the following if we are above the debug level in tokio tracing @@ -285,7 +309,7 @@ impl MultiRecordLog { debug!(first_pos=?first_pos, last_pos=?last_pos, "queue positions after gc"); } } - Ok(()) + Ok(num_bytes_written) } pub fn range( diff --git a/src/proptests.rs b/src/proptests.rs index a33c4f1..9a7b509 100644 --- a/src/proptests.rs +++ b/src/proptests.rs @@ -76,12 +76,14 @@ impl PropTestEnv { .record_log .append_records(queue, Some(new_pos), std::iter::once(&b"BB"[..])) .unwrap() + .last_position .unwrap(); assert!(self .record_log .append_records(queue, Some(new_pos), std::iter::once(&b"BB"[..])) .unwrap() + .last_position .is_none()); assert_eq!(new_pos, res); @@ -93,7 +95,7 @@ impl PropTestEnv { let state = self.state.get_mut(queue).unwrap(); let new_pos = state.0.end + skip_one_pos as u64; - let res = self + let outcome = self .record_log .append_records( queue, @@ -103,7 +105,7 @@ impl PropTestEnv { .unwrap(); if count != 0 { - let res = res.unwrap(); + let res = outcome.last_position.unwrap(); assert_eq!(new_pos + count - 1, res); state.0.end = new_pos + count; state.1 += count; @@ -114,7 +116,11 @@ impl PropTestEnv { let state = self.state.get_mut(queue).unwrap(); if state.0.contains(&pos) { state.0.start = pos + 1; - state.1 -= self.record_log.truncate(queue, ..=pos).unwrap() as u64; + state.1 -= self + .record_log + .truncate(queue, ..=pos) + .unwrap() + .evicted_records as u64; } else if pos >= state.0.end { // advance the queue to the position. state.0 = (pos + 1)..(pos + 1); @@ -342,7 +348,8 @@ fn test_multi_record() { assert_eq!( multi_record_log .append_record("queue", None, &b"1"[..]) - .unwrap(), + .unwrap() + .last_position, Some(0) ); } @@ -351,7 +358,8 @@ fn test_multi_record() { assert_eq!( multi_record_log .append_record("queue", None, &b"22"[..]) - .unwrap(), + .unwrap() + .last_position, Some(1) ); } @@ -374,7 +382,8 @@ fn test_multi_record() { assert_eq!( multi_record_log .append_record("queue", None, &b"hello"[..]) - .unwrap(), + .unwrap() + .last_position, Some(2) ); } diff --git a/src/recordlog/writer.rs b/src/recordlog/writer.rs index 0a44d6e..928fadd 100644 --- a/src/recordlog/writer.rs +++ b/src/recordlog/writer.rs @@ -36,19 +36,23 @@ impl RecordWriter { } impl RecordWriter { - /// Writes a record. + /// Writes a record. Returns the total number of bytes pushed to the underlying writer + /// for this record, including frame headers and any block padding. /// - /// Even if this call returns `Ok(())`, at this point the data + /// Even if this call returns `Ok(_)`, at this point the data /// is likely to be not durably stored on disk. /// /// For instance, the data could be stale in a library level buffer, /// by a writer level buffer, or an application buffer, /// or could not be flushed to disk yet by the OS. - pub fn write_record<'a>(&mut self, record: impl Serializable<'a>) -> io::Result<()> { + pub fn write_record<'a>(&mut self, record: impl Serializable<'a>) -> io::Result { let mut is_first_frame = true; + let mut num_bytes_written: u64 = 0; + self.buffer.clear(); record.serialize(&mut self.buffer); let mut payload = &self.buffer[..]; + loop { let frame_payload_len = self .frame_writer @@ -58,13 +62,14 @@ impl RecordWriter { payload = &payload[frame_payload_len..]; let is_last_frame = payload.is_empty(); let frame_type = frame_type(is_first_frame, is_last_frame); - self.frame_writer.write_frame(frame_type, frame_payload)?; + num_bytes_written += self.frame_writer.write_frame(frame_type, frame_payload)? as u64; is_first_frame = false; + if is_last_frame { break; } } - Ok(()) + Ok(num_bytes_written) } /// Persist the data to disk, according to the persist_action. diff --git a/src/tests.rs b/src/tests.rs index f1f03a2..7306d17 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -174,7 +174,8 @@ fn test_multi_record_position_known_after_truncate() { assert_eq!( multi_record_log .append_record("queue", None, &b"1"[..]) - .unwrap(), + .unwrap() + .last_position, Some(0) ); } @@ -183,7 +184,8 @@ fn test_multi_record_position_known_after_truncate() { assert_eq!( multi_record_log .append_record("queue", None, &b"2"[..]) - .unwrap(), + .unwrap() + .last_position, Some(1) ); assert_eq!(&multi_record_log.list_file_numbers(), &[0]); @@ -198,7 +200,8 @@ fn test_multi_record_position_known_after_truncate() { assert_eq!( multi_record_log .append_record("queue", None, &b"hello"[..]) - .unwrap(), + .unwrap() + .last_position, Some(2) ); } @@ -217,7 +220,8 @@ fn test_multi_insert_truncate() { None, [b"1", b"2", b"3", b"4"].into_iter().map(|r| r.as_slice()) ) - .unwrap(), + .unwrap() + .last_position, Some(3) ); assert_eq!( @@ -275,20 +279,23 @@ fn test_truncate_range_correct_pos() { assert_eq!( multi_record_log .append_record("queue", None, &b"1"[..]) - .unwrap(), + .unwrap() + .last_position, Some(0) ); assert_eq!( multi_record_log .append_record("queue", None, &b"2"[..]) - .unwrap(), + .unwrap() + .last_position, Some(1) ); multi_record_log.truncate("queue", ..=1).unwrap(); assert_eq!( multi_record_log .append_record("queue", None, &b"3"[..]) - .unwrap(), + .unwrap() + .last_position, Some(2) ); assert_eq!(