From ea0a10da3c94d57855964b201d0e5c2f7afee37b Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 25 Mar 2026 17:36:31 +0100 Subject: [PATCH 1/2] Collect stats when compressing Namely, bytes in + bytes out. Apart from metrics, we want to use this for throttling I/O when necessary. --- crates/commitlog/src/lib.rs | 12 ++++-- crates/commitlog/src/repo/fs.rs | 13 +++--- crates/commitlog/src/repo/mem.rs | 5 ++- crates/commitlog/src/repo/mod.rs | 6 ++- crates/commitlog/src/tests/partial.rs | 3 +- crates/durability/src/imp/local.rs | 6 ++- crates/fs-utils/src/compression.rs | 59 +++++++++++++++++++++++---- crates/snapshot/src/lib.rs | 15 ++++++- 8 files changed, 92 insertions(+), 27 deletions(-) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 3922f002a84..0ac1f43f57e 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -9,6 +9,8 @@ use log::trace; use repo::{fs::OnNewSegmentFn, Repo}; use spacetimedb_paths::server::CommitLogDir; +pub use spacetimedb_fs_utils::compression::CompressionStats; + pub mod commit; pub mod commitlog; mod index; @@ -330,16 +332,18 @@ impl Commitlog { } /// Compress the segments at the offsets provided, marking them as immutable. - pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> { + pub fn compress_segments(&self, offsets: &[u64]) -> io::Result { // even though `compress_segment` takes &self, we take an // exclusive lock to avoid any weirdness happening. #[allow(clippy::readonly_write_lock)] let inner = self.inner.write().unwrap(); assert!(!offsets.contains(&inner.head.min_tx_offset())); // TODO: parallelize, maybe - offsets - .iter() - .try_for_each(|&offset| inner.repo.compress_segment(offset)) + let mut stats = <_>::default(); + for offset in offsets { + stats += inner.repo.compress_segment(*offset)?; + } + Ok(stats) } /// Remove all data from the log and reopen it. diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 506cde5fb41..4efc26f1e81 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -4,13 +4,12 @@ use std::io; use std::sync::Arc; use log::{debug, warn}; -use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader}; +use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader, CompressionStats}; use spacetimedb_paths::server::{CommitLogDir, SegmentFile}; use tempfile::NamedTempFile; -use crate::segment::FileLike; - use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut}; +use crate::segment::FileLike; const SEGMENT_FILE_EXT: &str = ".stdb.log"; @@ -261,21 +260,21 @@ impl Repo for Fs { fs::remove_file(self.segment_path(offset)) } - fn compress_segment(&self, offset: u64) -> io::Result<()> { + fn compress_segment(&self, offset: u64) -> io::Result { let src = self.open_segment_reader(offset)?; // if it's already compressed, leave it be let CompressReader::None(mut src) = src.inner else { - return Ok(()); + return Ok(<_>::default()); }; let mut dst = NamedTempFile::new_in(&self.root)?; // bytes per frame. in the future, it might be worth looking into putting // every commit into its own frame, to make seeking more efficient. let max_frame_size = 0x1000; - compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?; + let stats = compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?; dst.persist(self.segment_path(offset))?; - Ok(()) + Ok(stats) } fn existing_offsets(&self) -> io::Result> { diff --git a/crates/commitlog/src/repo/mem.rs b/crates/commitlog/src/repo/mem.rs index d2b9bfb8d86..b2a58d26631 100644 --- a/crates/commitlog/src/repo/mem.rs +++ b/crates/commitlog/src/repo/mem.rs @@ -11,6 +11,7 @@ use crate::repo::{ mod segment; pub use segment::{ReadOnlySegment, Segment}; +use spacetimedb_fs_utils::compression::CompressionStats; pub const PAGE_SIZE: usize = 4096; @@ -103,8 +104,8 @@ impl Repo for Memory { Ok(()) } - fn compress_segment(&self, _offset: u64) -> io::Result<()> { - Ok(()) + fn compress_segment(&self, _offset: u64) -> io::Result { + Ok(<_>::default()) } fn existing_offsets(&self) -> io::Result> { diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 3d1968c00b2..6560078a97b 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -5,6 +5,8 @@ use std::{ use log::{debug, warn}; +pub use spacetimedb_fs_utils::compression::CompressionStats; + use crate::{ commit::Commit, error, @@ -121,7 +123,7 @@ pub trait Repo: Clone + fmt::Display { fn remove_segment(&self, offset: u64) -> io::Result<()>; /// Compress a segment in storage, marking it as immutable. - fn compress_segment(&self, offset: u64) -> io::Result<()>; + fn compress_segment(&self, offset: u64) -> io::Result; /// Traverse all segments in this repository and return list of their /// offsets, sorted in ascending order. @@ -164,7 +166,7 @@ impl Repo for &T { T::remove_segment(self, offset) } - fn compress_segment(&self, offset: u64) -> io::Result<()> { + fn compress_segment(&self, offset: u64) -> io::Result { T::compress_segment(self, offset) } diff --git a/crates/commitlog/src/tests/partial.rs b/crates/commitlog/src/tests/partial.rs index 69f54241874..43532e02978 100644 --- a/crates/commitlog/src/tests/partial.rs +++ b/crates/commitlog/src/tests/partial.rs @@ -8,6 +8,7 @@ use std::{ use log::{debug, info}; use pretty_assertions::assert_matches; +use spacetimedb_fs_utils::compression::CompressionStats; use crate::{ commitlog, payload, @@ -295,7 +296,7 @@ impl Repo for ShortMem { self.inner.remove_segment(offset) } - fn compress_segment(&self, offset: u64) -> io::Result<()> { + fn compress_segment(&self, offset: u64) -> io::Result { self.inner.compress_segment(offset) } diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index f7033a7251f..5221fcfdb50 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -12,7 +12,9 @@ use futures::{FutureExt as _, TryFutureExt as _}; use itertools::Itertools as _; use log::{info, trace, warn}; use scopeguard::ScopeGuard; -use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction}; +use spacetimedb_commitlog::{ + error, payload::Txdata, Commit, Commitlog, CompressionStats, Decoder, Encode, Transaction, +}; use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile}; use spacetimedb_paths::server::ReplicaDir; use thiserror::Error; @@ -181,7 +183,7 @@ impl Local { } /// Compress the segments at the offsets provided, marking them as immutable. - pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<()> { + pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result { self.clog.compress_segments(offsets) } diff --git a/crates/fs-utils/src/compression.rs b/crates/fs-utils/src/compression.rs index 8e72fcbe76d..2d71f55729a 100644 --- a/crates/fs-utils/src/compression.rs +++ b/crates/fs-utils/src/compression.rs @@ -1,6 +1,7 @@ use std::fs::File; use std::io; use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::ops::AddAssign; use tokio::io::AsyncSeek; use zstd_framed; use zstd_framed::{ZstdReader, ZstdWriter}; @@ -149,16 +150,60 @@ pub fn new_zstd_writer<'a, W: io::Write>(inner: W, max_frame_size: Option) .build() } +#[derive(Clone, Copy, Debug, Default)] +pub struct CompressionStats { + pub bytes_read: u64, + pub bytes_written: u64, +} + +impl AddAssign for CompressionStats { + fn add_assign(&mut self, rhs: Self) { + self.bytes_read += rhs.bytes_read; + self.bytes_written += rhs.bytes_written; + } +} + pub fn compress_with_zstd( mut src: R, - mut dst: W, + dst: W, max_frame_size: Option, -) -> io::Result<()> { - let mut writer = new_zstd_writer(&mut dst, max_frame_size)?; - io::copy(&mut src, &mut writer)?; - writer.shutdown()?; - drop(writer); - Ok(()) +) -> io::Result { + /// [io::Write] wrapper that counts how many bytes were written. + struct Writer { + bytes_written: u64, + inner: W, + } + + impl io::Write for Writer { + fn write(&mut self, buf: &[u8]) -> io::Result { + let n = self.inner.write(buf)?; + self.bytes_written += n as u64; + Ok(n) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } + } + + // Wrap `dst` in [Writer], and use it as the sink for the zstd writer, + // such that we can determine how many (compressed) bytes came out at the end. + let mut dst = Writer { + bytes_written: 0, + inner: dst, + }; + let mut zstd_writer = new_zstd_writer(&mut dst, max_frame_size)?; + + let bytes_read = io::copy(&mut src, &mut zstd_writer)?; + zstd_writer.shutdown()?; + drop(zstd_writer); + + let stats = CompressionStats { + bytes_read, + bytes_written: dst.bytes_written, + }; + + Ok(stats) } pub use async_impls::AsyncCompressReader; diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 6606d22f9e4..4e45c049003 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -564,6 +564,8 @@ impl fmt::Debug for SnapshotSize { pub struct ObjectCompressionStats { /// Number of objects freshly compressed. pub compressed: usize, + /// Cumulative stats of the compressed objects. + pub compression_stats: spacetimedb_fs_utils::compression::CompressionStats, /// Number of objects hardlinked from a parent repository. pub hardlinked: usize, } @@ -579,8 +581,16 @@ impl ObjectCompressionStats { } impl AddAssign for ObjectCompressionStats { - fn add_assign(&mut self, Self { compressed, hardlinked }: Self) { + fn add_assign( + &mut self, + Self { + compressed, + compression_stats, + hardlinked, + }: Self, + ) { self.compressed += compressed; + self.compression_stats += compression_stats; self.hardlinked += hardlinked; } } @@ -1112,10 +1122,11 @@ impl SnapshotRepository { let dst = src.with_extension("_tmp"); let mut write = BufWriter::new(o_excl().open(&dst)?); // The default frame size compress better. - compress_with_zstd(read, &mut write, None)?; + let compression_stats = compress_with_zstd(read, &mut write, None)?; std::fs::rename(dst, src)?; if let Some(stats) = stats { stats.compressed += 1; + stats.compression_stats += compression_stats; } Ok(()) From 2ed3d8c4d3d93ff01ae1e686a3d81cea4c2ec281 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 30 Mar 2026 10:19:34 +0200 Subject: [PATCH 2/2] Abstract compression function, so we can instrument input and output --- crates/commitlog/src/repo/fs.rs | 10 ++++------ crates/commitlog/src/repo/mem.rs | 2 +- crates/commitlog/src/repo/mod.rs | 27 ++++++++++++++++++++++----- crates/commitlog/src/tests/partial.rs | 4 ++-- crates/fs-utils/src/compression.rs | 21 +++++++++++++++++++++ 5 files changed, 50 insertions(+), 14 deletions(-) diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 4efc26f1e81..bdb2b7aa674 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -4,11 +4,12 @@ use std::io; use std::sync::Arc; use log::{debug, warn}; -use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader, CompressionStats}; +use spacetimedb_fs_utils::compression::{CompressReader, CompressionStats}; use spacetimedb_paths::server::{CommitLogDir, SegmentFile}; use tempfile::NamedTempFile; use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut}; +use crate::repo::CompressOnce; use crate::segment::FileLike; const SEGMENT_FILE_EXT: &str = ".stdb.log"; @@ -260,7 +261,7 @@ impl Repo for Fs { fs::remove_file(self.segment_path(offset)) } - fn compress_segment(&self, offset: u64) -> io::Result { + fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result { let src = self.open_segment_reader(offset)?; // if it's already compressed, leave it be let CompressReader::None(mut src) = src.inner else { @@ -268,10 +269,7 @@ impl Repo for Fs { }; let mut dst = NamedTempFile::new_in(&self.root)?; - // bytes per frame. in the future, it might be worth looking into putting - // every commit into its own frame, to make seeking more efficient. - let max_frame_size = 0x1000; - let stats = compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?; + let stats = f.compress(&mut src, &mut dst)?; dst.persist(self.segment_path(offset))?; Ok(stats) diff --git a/crates/commitlog/src/repo/mem.rs b/crates/commitlog/src/repo/mem.rs index b2a58d26631..ce313c3d6dc 100644 --- a/crates/commitlog/src/repo/mem.rs +++ b/crates/commitlog/src/repo/mem.rs @@ -104,7 +104,7 @@ impl Repo for Memory { Ok(()) } - fn compress_segment(&self, _offset: u64) -> io::Result { + fn compress_segment_with(&self, _: u64, _: impl super::CompressOnce) -> io::Result { Ok(<_>::default()) } diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 6560078a97b..22e3f54df05 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -4,8 +4,8 @@ use std::{ }; use log::{debug, warn}; - -pub use spacetimedb_fs_utils::compression::CompressionStats; +use spacetimedb_fs_utils::compression::Zstd; +pub use spacetimedb_fs_utils::compression::{CompressOnce, CompressionStats}; use crate::{ commit::Commit, @@ -123,7 +123,13 @@ pub trait Repo: Clone + fmt::Display { fn remove_segment(&self, offset: u64) -> io::Result<()>; /// Compress a segment in storage, marking it as immutable. - fn compress_segment(&self, offset: u64) -> io::Result; + fn compress_segment(&self, offset: u64) -> io::Result { + self.compress_segment_with(offset, segment_compressor()) + } + + /// Compress a segment using a supplied [CompressOnce], marking it as + /// immutable. + fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result; /// Traverse all segments in this repository and return list of their /// offsets, sorted in ascending order. @@ -166,8 +172,8 @@ impl Repo for &T { T::remove_segment(self, offset) } - fn compress_segment(&self, offset: u64) -> io::Result { - T::compress_segment(self, offset) + fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result { + T::compress_segment_with(self, offset, f) } fn existing_offsets(&self) -> io::Result> { @@ -356,6 +362,17 @@ pub fn open_segment_reader( .map_err(|source| with_segment_context("reading segment header", repo, offset, source)) } +/// Obtain the canonical [CompressOnce] compressor for segments. +/// +/// The compressor will create seekable [Zstd] archives with a max frame size +/// of 4KiB. That is, seeking to an arbitrary byte offset (of the uncompressed +/// segment) within the archive will decompress 4KiB of data on average. +pub fn segment_compressor() -> Zstd { + Zstd { + max_frame_size: Some(0x1000), + } +} + fn segment_label(repo: &R, offset: u64) -> String { repo.segment_file_path(offset) .unwrap_or_else(|| format!("offset {offset}")) diff --git a/crates/commitlog/src/tests/partial.rs b/crates/commitlog/src/tests/partial.rs index 43532e02978..f07fe2fe7c1 100644 --- a/crates/commitlog/src/tests/partial.rs +++ b/crates/commitlog/src/tests/partial.rs @@ -296,8 +296,8 @@ impl Repo for ShortMem { self.inner.remove_segment(offset) } - fn compress_segment(&self, offset: u64) -> io::Result { - self.inner.compress_segment(offset) + fn compress_segment_with(&self, offset: u64, f: impl repo::CompressOnce) -> io::Result { + self.inner.compress_segment_with(offset, f) } fn existing_offsets(&self) -> io::Result> { diff --git a/crates/fs-utils/src/compression.rs b/crates/fs-utils/src/compression.rs index 2d71f55729a..c285548a894 100644 --- a/crates/fs-utils/src/compression.rs +++ b/crates/fs-utils/src/compression.rs @@ -150,6 +150,27 @@ pub fn new_zstd_writer<'a, W: io::Write>(inner: W, max_frame_size: Option) .build() } +/// Compress `src` to `dst` in one go. +/// +/// Like a `FnOnce` closure, but polymorphic over the arguments. +pub trait CompressOnce { + fn compress(self, src: impl io::Read, dst: impl io::Write) -> io::Result; +} + +/// Implements [CompressOnce] for the zstd compression algorithm. +pub struct Zstd { + /// If `Some`, add a seek table with `max_frame_size` to the compressed output. + /// + /// See [zstd_framed::writer::ZstdWriterBuilder::with_seek_table]. + pub max_frame_size: Option, +} + +impl CompressOnce for Zstd { + fn compress(self, src: impl io::Read, dst: impl io::Write) -> io::Result { + compress_with_zstd(src, dst, self.max_frame_size) + } +} + #[derive(Clone, Copy, Debug, Default)] pub struct CompressionStats { pub bytes_read: u64,