Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions crates/commitlog/benches/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ struct Params {
txs_per_commit: NonZeroU16,
total_appends: u64,
fsync_every: u64,
flush_on_commit: Option<u64>,
}

impl Params {
Expand All @@ -23,6 +24,7 @@ impl Params {
txs_per_commit: NonZeroU16::new(1).unwrap(),
total_appends: 1_000,
fsync_every: 32,
flush_on_commit: None,
}
}
}
Expand All @@ -31,8 +33,8 @@ impl fmt::Display for Params {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"n={} tx/commit={} fsync={}",
self.total_appends, self.txs_per_commit, self.fsync_every
"n={} tx/commit={} fsync={} flush-on-commit={:?}",
self.total_appends, self.txs_per_commit, self.fsync_every, self.flush_on_commit,
)
}
}
Expand All @@ -52,12 +54,14 @@ fn bench_append(c: &mut Criterion, label: &str, params: Params) {
txs_per_commit,
total_appends,
fsync_every,
flush_on_commit,
}| {
let tmp = tempdir_in(".").unwrap();
let clog = Commitlog::open(
CommitLogDir::from_path_unchecked(tmp.path()),
Options {
max_records_in_commit: *txs_per_commit,
flush_on_commit: flush_on_commit.is_none(),
..<_>::default()
},
None,
Expand All @@ -75,6 +79,9 @@ fn bench_append(c: &mut Criterion, label: &str, params: Params) {
retry = Some(txdata);
}
}
if flush_on_commit.is_some_and(|every| i % every == 0) {
clog.flush_to_disk().unwrap();
}
if i % fsync_every == 0 {
clog.flush_and_sync().unwrap();
}
Expand Down Expand Up @@ -120,11 +127,26 @@ fn mixed_payloads_with_batching(c: &mut Criterion) {
bench_append(c, "mixed payloads with batching", params);
}

fn mixed_payloads_with_manual_flush(c: &mut Criterion) {
let params = Params {
flush_on_commit: Some(16),
..Params::with_payloads([
Payload::new([b'a'; 64]),
Payload::new([b'b'; 512]),
Payload::new([b'c'; 1024]),
Payload::new([b'd'; 4096]),
Payload::new([b'e'; 8102]),
])
};
bench_append(c, "mixed payloads with manual flush", params);
}

criterion_group!(
benches,
baseline,
large_payload,
mixed_payloads,
mixed_payloads_with_batching
mixed_payloads_with_batching,
mixed_payloads_with_manual_flush,
);
criterion_main!(benches);
5 changes: 5 additions & 0 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl<R: Repo, T> Generic<R, T> {
// results in a huge segment.
let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size;
let writer = if should_rotate {
self.head.flush()?;
self.sync();
self.start_new_segment()?
} else {
Expand All @@ -178,6 +179,10 @@ impl<R: Repo, T> Generic<R, T> {
ret
}

pub fn flush(&mut self) -> io::Result<()> {
self.head.flush()
}

/// Force the currently active segment to be flushed to storage.
///
/// Using a filesystem backend, this means to call `fsync(2)`.
Expand Down
17 changes: 17 additions & 0 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ pub struct Options {
/// Has no effect if the `fallocate` feature is not enabled.
#[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))]
pub preallocate_segments: bool,

#[cfg_attr(feature = "serde", serde(default = "Options::default_flush_on_commit"))]
pub flush_on_commit: bool,
}

impl Default for Options {
Expand All @@ -110,6 +113,7 @@ impl Options {
pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false;
pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false;
pub const DEFAULT_FLUSH_ON_COMMIT: bool = true;

pub const DEFAULT: Self = Self {
log_format_version: DEFAULT_LOG_FORMAT_VERSION,
Expand All @@ -118,6 +122,7 @@ impl Options {
offset_index_interval_bytes: Self::default_offset_index_interval_bytes(),
offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(),
preallocate_segments: Self::default_preallocate_segments(),
flush_on_commit: Self::default_flush_on_commit(),
};

pub const fn default_log_format_version() -> u8 {
Expand All @@ -144,6 +149,10 @@ impl Options {
Self::DEFAULT_PREALLOCATE_SEGMENTS
}

pub const fn default_flush_on_commit() -> bool {
Self::DEFAULT_FLUSH_ON_COMMIT
}

/// Compute the length in bytes of an offset index based on the settings in
/// `self`.
pub fn offset_index_len(&self) -> u64 {
Expand Down Expand Up @@ -267,6 +276,14 @@ impl<T> Commitlog<T> {
Ok(inner.max_committed_offset())
}

pub fn flush_to_disk(&self) -> io::Result<()> {
let mut inner = self.inner.write().unwrap();
inner.commit()?;
inner.flush()?;

Ok(())
}

/// Write all outstanding transaction records to disk and flush OS buffers.
///
/// Equivalent to calling [`Self::flush`] followed by [`Self::sync`], but
Expand Down
2 changes: 2 additions & 0 deletions crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ pub fn create_segment_writer<R: Repo>(
bytes_written: Header::LEN as u64,

max_records_in_commit: opts.max_records_in_commit,
flush_on_commit: opts.flush_on_commit,

offset_index_head: create_offset_index_writer(repo, offset, opts),
})
Expand Down Expand Up @@ -294,6 +295,7 @@ pub fn resume_segment_writer<R: Repo>(
bytes_written: size_in_bytes,

max_records_in_commit: opts.max_records_in_commit,
flush_on_commit: opts.flush_on_commit,

offset_index_head: create_offset_index_writer(repo, offset, opts),
}))
Expand Down
11 changes: 10 additions & 1 deletion crates/commitlog/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub struct Writer<W: io::Write> {
pub(crate) bytes_written: u64,

pub(crate) max_records_in_commit: NonZeroU16,
pub(crate) flush_on_commit: bool,

pub(crate) offset_index_head: Option<OffsetIndexWriter>,
}
Expand Down Expand Up @@ -138,7 +139,9 @@ impl<W: io::Write> Writer<W> {
return Ok(None);
}
let checksum = self.commit.write(&mut self.inner)?;
self.inner.flush()?;
if self.flush_on_commit {
self.inner.flush()?;
}

let commit_len = self.commit.encoded_len() as u64;
self.offset_index_head.as_mut().map(|index| {
Expand Down Expand Up @@ -166,6 +169,10 @@ impl<W: io::Write> Writer<W> {
}))
}

pub fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}

/// Get the current epoch.
pub fn epoch(&self) -> u64 {
self.commit.epoch
Expand Down Expand Up @@ -943,6 +950,7 @@ mod tests {
bytes_written: 0,

max_records_in_commit,
flush_on_commit: true,

offset_index_head: None,
};
Expand Down Expand Up @@ -973,6 +981,7 @@ mod tests {
bytes_written: 0,

max_records_in_commit: NonZeroU16::MAX,
flush_on_commit: true,
offset_index_head: None,
};

Expand Down
Loading