diff --git a/crates/commitlog/benches/write.rs b/crates/commitlog/benches/write.rs index 601767be620..d05e84dea99 100644 --- a/crates/commitlog/benches/write.rs +++ b/crates/commitlog/benches/write.rs @@ -14,6 +14,7 @@ struct Params { txs_per_commit: NonZeroU16, total_appends: u64, fsync_every: u64, + flush_on_commit: Option, } impl Params { @@ -23,6 +24,7 @@ impl Params { txs_per_commit: NonZeroU16::new(1).unwrap(), total_appends: 1_000, fsync_every: 32, + flush_on_commit: None, } } } @@ -31,8 +33,8 @@ impl fmt::Display for Params { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "n={} tx/commit={} fsync={}", - self.total_appends, self.txs_per_commit, self.fsync_every + "n={} tx/commit={} fsync={} flush-on-commit={:?}", + self.total_appends, self.txs_per_commit, self.fsync_every, self.flush_on_commit, ) } } @@ -52,12 +54,14 @@ fn bench_append(c: &mut Criterion, label: &str, params: Params) { txs_per_commit, total_appends, fsync_every, + flush_on_commit, }| { let tmp = tempdir_in(".").unwrap(); let clog = Commitlog::open( CommitLogDir::from_path_unchecked(tmp.path()), Options { max_records_in_commit: *txs_per_commit, + flush_on_commit: flush_on_commit.is_none(), ..<_>::default() }, None, @@ -75,6 +79,9 @@ fn bench_append(c: &mut Criterion, label: &str, params: Params) { retry = Some(txdata); } } + if flush_on_commit.is_some_and(|every| i % every == 0) { + clog.flush_to_disk().unwrap(); + } if i % fsync_every == 0 { clog.flush_and_sync().unwrap(); } @@ -120,11 +127,26 @@ fn mixed_payloads_with_batching(c: &mut Criterion) { bench_append(c, "mixed payloads with batching", params); } +fn mixed_payloads_with_manual_flush(c: &mut Criterion) { + let params = Params { + flush_on_commit: Some(16), + ..Params::with_payloads([ + Payload::new([b'a'; 64]), + Payload::new([b'b'; 512]), + Payload::new([b'c'; 1024]), + Payload::new([b'd'; 4096]), + Payload::new([b'e'; 8102]), + ]) + }; + bench_append(c, "mixed payloads with manual flush", params); +} + criterion_group!( benches, baseline, large_payload, mixed_payloads, - mixed_payloads_with_batching + mixed_payloads_with_batching, + mixed_payloads_with_manual_flush, ); criterion_main!(benches); diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 03c590e4950..620115a51ad 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -161,6 +161,7 @@ impl Generic { // results in a huge segment. let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size; let writer = if should_rotate { + self.head.flush()?; self.sync(); self.start_new_segment()? } else { @@ -178,6 +179,10 @@ impl Generic { ret } + pub fn flush(&mut self) -> io::Result<()> { + self.head.flush() + } + /// Force the currently active segment to be flushed to storage. /// /// Using a filesystem backend, this means to call `fsync(2)`. diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index a3fb912d827..44c6af3c0ea 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -96,6 +96,9 @@ pub struct Options { /// Has no effect if the `fallocate` feature is not enabled. #[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))] pub preallocate_segments: bool, + + #[cfg_attr(feature = "serde", serde(default = "Options::default_flush_on_commit"))] + pub flush_on_commit: bool, } impl Default for Options { @@ -110,6 +113,7 @@ impl Options { pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false; pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false; + pub const DEFAULT_FLUSH_ON_COMMIT: bool = true; pub const DEFAULT: Self = Self { log_format_version: DEFAULT_LOG_FORMAT_VERSION, @@ -118,6 +122,7 @@ impl Options { offset_index_interval_bytes: Self::default_offset_index_interval_bytes(), offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(), preallocate_segments: Self::default_preallocate_segments(), + flush_on_commit: Self::default_flush_on_commit(), }; pub const fn default_log_format_version() -> u8 { @@ -144,6 +149,10 @@ impl Options { Self::DEFAULT_PREALLOCATE_SEGMENTS } + pub const fn default_flush_on_commit() -> bool { + Self::DEFAULT_FLUSH_ON_COMMIT + } + /// Compute the length in bytes of an offset index based on the settings in /// `self`. pub fn offset_index_len(&self) -> u64 { @@ -267,6 +276,14 @@ impl Commitlog { Ok(inner.max_committed_offset()) } + pub fn flush_to_disk(&self) -> io::Result<()> { + let mut inner = self.inner.write().unwrap(); + inner.commit()?; + inner.flush()?; + + Ok(()) + } + /// Write all outstanding transaction records to disk and flush OS buffers. /// /// Equivalent to calling [`Self::flush`] followed by [`Self::sync`], but diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 5be633bf5c7..c002421f4a3 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -220,6 +220,7 @@ pub fn create_segment_writer( bytes_written: Header::LEN as u64, max_records_in_commit: opts.max_records_in_commit, + flush_on_commit: opts.flush_on_commit, offset_index_head: create_offset_index_writer(repo, offset, opts), }) @@ -294,6 +295,7 @@ pub fn resume_segment_writer( bytes_written: size_in_bytes, max_records_in_commit: opts.max_records_in_commit, + flush_on_commit: opts.flush_on_commit, offset_index_head: create_offset_index_writer(repo, offset, opts), })) diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 7e8c054467b..71dee829180 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -101,6 +101,7 @@ pub struct Writer { pub(crate) bytes_written: u64, pub(crate) max_records_in_commit: NonZeroU16, + pub(crate) flush_on_commit: bool, pub(crate) offset_index_head: Option, } @@ -138,7 +139,9 @@ impl Writer { return Ok(None); } let checksum = self.commit.write(&mut self.inner)?; - self.inner.flush()?; + if self.flush_on_commit { + self.inner.flush()?; + } let commit_len = self.commit.encoded_len() as u64; self.offset_index_head.as_mut().map(|index| { @@ -166,6 +169,10 @@ impl Writer { })) } + pub fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } + /// Get the current epoch. pub fn epoch(&self) -> u64 { self.commit.epoch @@ -943,6 +950,7 @@ mod tests { bytes_written: 0, max_records_in_commit, + flush_on_commit: true, offset_index_head: None, }; @@ -973,6 +981,7 @@ mod tests { bytes_written: 0, max_records_in_commit: NonZeroU16::MAX, + flush_on_commit: true, offset_index_head: None, };