Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use log::trace;
use repo::{fs::OnNewSegmentFn, Repo};
use spacetimedb_paths::server::CommitLogDir;

pub use spacetimedb_fs_utils::compression::CompressionStats;

pub mod commit;
pub mod commitlog;
mod index;
Expand Down Expand Up @@ -330,16 +332,18 @@ impl<T> Commitlog<T> {
}

/// Compress the segments at the offsets provided, marking them as immutable.
pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<CompressionStats> {
// even though `compress_segment` takes &self, we take an
// exclusive lock to avoid any weirdness happening.
#[allow(clippy::readonly_write_lock)]
let inner = self.inner.write().unwrap();
assert!(!offsets.contains(&inner.head.min_tx_offset()));
// TODO: parallelize, maybe
offsets
.iter()
.try_for_each(|&offset| inner.repo.compress_segment(offset))
let mut stats = <_>::default();
for offset in offsets {
stats += inner.repo.compress_segment(*offset)?;
}
Ok(stats)
}

/// Remove all data from the log and reopen it.
Expand Down
17 changes: 7 additions & 10 deletions crates/commitlog/src/repo/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use std::io;
use std::sync::Arc;

use log::{debug, warn};
use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader};
use spacetimedb_fs_utils::compression::{CompressReader, CompressionStats};
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
use tempfile::NamedTempFile;

use crate::segment::FileLike;

use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
use crate::repo::CompressOnce;
use crate::segment::FileLike;

const SEGMENT_FILE_EXT: &str = ".stdb.log";

Expand Down Expand Up @@ -261,21 +261,18 @@ impl Repo for Fs {
fs::remove_file(self.segment_path(offset))
}

fn compress_segment(&self, offset: u64) -> io::Result<()> {
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats> {
let src = self.open_segment_reader(offset)?;
// if it's already compressed, leave it be
let CompressReader::None(mut src) = src.inner else {
return Ok(());
return Ok(<_>::default());
};

let mut dst = NamedTempFile::new_in(&self.root)?;
// bytes per frame. in the future, it might be worth looking into putting
// every commit into its own frame, to make seeking more efficient.
let max_frame_size = 0x1000;
compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?;
let stats = f.compress(&mut src, &mut dst)?;
dst.persist(self.segment_path(offset))?;

Ok(())
Ok(stats)
}

fn existing_offsets(&self) -> io::Result<Vec<u64>> {
Expand Down
5 changes: 3 additions & 2 deletions crates/commitlog/src/repo/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::repo::{

mod segment;
pub use segment::{ReadOnlySegment, Segment};
use spacetimedb_fs_utils::compression::CompressionStats;

pub const PAGE_SIZE: usize = 4096;

Expand Down Expand Up @@ -103,8 +104,8 @@ impl Repo for Memory {
Ok(())
}

fn compress_segment(&self, _offset: u64) -> io::Result<()> {
Ok(())
fn compress_segment_with(&self, _: u64, _: impl super::CompressOnce) -> io::Result<CompressionStats> {
Ok(<_>::default())
}

fn existing_offsets(&self) -> io::Result<Vec<u64>> {
Expand Down
25 changes: 22 additions & 3 deletions crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{
};

use log::{debug, warn};
use spacetimedb_fs_utils::compression::Zstd;
pub use spacetimedb_fs_utils::compression::{CompressOnce, CompressionStats};

use crate::{
commit::Commit,
Expand Down Expand Up @@ -121,7 +123,13 @@ pub trait Repo: Clone + fmt::Display {
fn remove_segment(&self, offset: u64) -> io::Result<()>;

/// Compress a segment in storage, marking it as immutable.
fn compress_segment(&self, offset: u64) -> io::Result<()>;
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats> {
self.compress_segment_with(offset, segment_compressor())
}

/// Compress a segment using a supplied [CompressOnce], marking it as
/// immutable.
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats>;

/// Traverse all segments in this repository and return list of their
/// offsets, sorted in ascending order.
Expand Down Expand Up @@ -164,8 +172,8 @@ impl<T: Repo> Repo for &T {
T::remove_segment(self, offset)
}

fn compress_segment(&self, offset: u64) -> io::Result<()> {
T::compress_segment(self, offset)
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats> {
T::compress_segment_with(self, offset, f)
}

fn existing_offsets(&self) -> io::Result<Vec<u64>> {
Expand Down Expand Up @@ -354,6 +362,17 @@ pub fn open_segment_reader<R: Repo>(
.map_err(|source| with_segment_context("reading segment header", repo, offset, source))
}

/// Obtain the canonical [CompressOnce] compressor for segments.
///
/// The compressor will create seekable [Zstd] archives with a max frame size
/// of 4KiB. That is, seeking to an arbitrary byte offset (of the uncompressed
/// segment) within the archive will decompress 4KiB of data on average.
pub fn segment_compressor() -> Zstd {
Zstd {
max_frame_size: Some(0x1000),
}
}

fn segment_label<R: Repo>(repo: &R, offset: u64) -> String {
repo.segment_file_path(offset)
.unwrap_or_else(|| format!("offset {offset}"))
Expand Down
5 changes: 3 additions & 2 deletions crates/commitlog/src/tests/partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use log::{debug, info};
use pretty_assertions::assert_matches;
use spacetimedb_fs_utils::compression::CompressionStats;

use crate::{
commitlog, payload,
Expand Down Expand Up @@ -295,8 +296,8 @@ impl Repo for ShortMem {
self.inner.remove_segment(offset)
}

fn compress_segment(&self, offset: u64) -> io::Result<()> {
self.inner.compress_segment(offset)
fn compress_segment_with(&self, offset: u64, f: impl repo::CompressOnce) -> io::Result<CompressionStats> {
self.inner.compress_segment_with(offset, f)
}

fn existing_offsets(&self) -> io::Result<Vec<u64>> {
Expand Down
6 changes: 4 additions & 2 deletions crates/durability/src/imp/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use futures::{FutureExt as _, TryFutureExt as _};
use itertools::Itertools as _;
use log::{info, trace, warn};
use scopeguard::ScopeGuard;
use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction};
use spacetimedb_commitlog::{
error, payload::Txdata, Commit, Commitlog, CompressionStats, Decoder, Encode, Transaction,
};
use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile};
use spacetimedb_paths::server::ReplicaDir;
use thiserror::Error;
Expand Down Expand Up @@ -183,7 +185,7 @@ impl<T: Send + Sync + 'static> Local<T> {
}

/// Compress the segments at the offsets provided, marking them as immutable.
pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<()> {
pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<CompressionStats> {
self.clog.compress_segments(offsets)
}

Expand Down
80 changes: 73 additions & 7 deletions crates/fs-utils/src/compression.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fs::File;
use std::io;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::ops::AddAssign;
use tokio::io::AsyncSeek;
use zstd_framed;
use zstd_framed::{ZstdReader, ZstdWriter};
Expand Down Expand Up @@ -149,16 +150,81 @@ pub fn new_zstd_writer<'a, W: io::Write>(inner: W, max_frame_size: Option<u32>)
.build()
}

/// Compress `src` to `dst` in one go.
///
/// Like a `FnOnce` closure, but polymorphic over the arguments.
pub trait CompressOnce {
fn compress(self, src: impl io::Read, dst: impl io::Write) -> io::Result<CompressionStats>;
}

/// Implements [CompressOnce] for the zstd compression algorithm.
pub struct Zstd {
/// If `Some`, add a seek table with `max_frame_size` to the compressed output.
///
/// See [zstd_framed::writer::ZstdWriterBuilder::with_seek_table].
pub max_frame_size: Option<u32>,
}

impl CompressOnce for Zstd {
fn compress(self, src: impl io::Read, dst: impl io::Write) -> io::Result<CompressionStats> {
compress_with_zstd(src, dst, self.max_frame_size)
}
}

#[derive(Clone, Copy, Debug, Default)]
pub struct CompressionStats {
pub bytes_read: u64,
pub bytes_written: u64,
}

impl AddAssign for CompressionStats {
fn add_assign(&mut self, rhs: Self) {
self.bytes_read += rhs.bytes_read;
self.bytes_written += rhs.bytes_written;
}
}

pub fn compress_with_zstd<W: io::Write, R: io::Read>(
mut src: R,
mut dst: W,
dst: W,
max_frame_size: Option<u32>,
) -> io::Result<()> {
let mut writer = new_zstd_writer(&mut dst, max_frame_size)?;
io::copy(&mut src, &mut writer)?;
writer.shutdown()?;
drop(writer);
Ok(())
) -> io::Result<CompressionStats> {
/// [io::Write] wrapper that counts how many bytes were written.
struct Writer<W> {
bytes_written: u64,
inner: W,
}

impl<W: io::Write> io::Write for Writer<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let n = self.inner.write(buf)?;
self.bytes_written += n as u64;
Ok(n)
}

fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}

// Wrap `dst` in [Writer], and use it as the sink for the zstd writer,
// such that we can determine how many (compressed) bytes came out at the end.
let mut dst = Writer {
bytes_written: 0,
inner: dst,
};
let mut zstd_writer = new_zstd_writer(&mut dst, max_frame_size)?;

let bytes_read = io::copy(&mut src, &mut zstd_writer)?;
zstd_writer.shutdown()?;
drop(zstd_writer);

let stats = CompressionStats {
bytes_read,
bytes_written: dst.bytes_written,
};

Ok(stats)
}

pub use async_impls::AsyncCompressReader;
Expand Down
15 changes: 13 additions & 2 deletions crates/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,8 @@ impl fmt::Debug for SnapshotSize {
pub struct ObjectCompressionStats {
/// Number of objects freshly compressed.
pub compressed: usize,
/// Cumulative stats of the compressed objects.
pub compression_stats: spacetimedb_fs_utils::compression::CompressionStats,
/// Number of objects hardlinked from a parent repository.
pub hardlinked: usize,
}
Expand All @@ -579,8 +581,16 @@ impl ObjectCompressionStats {
}

impl AddAssign for ObjectCompressionStats {
fn add_assign(&mut self, Self { compressed, hardlinked }: Self) {
fn add_assign(
&mut self,
Self {
compressed,
compression_stats,
hardlinked,
}: Self,
) {
self.compressed += compressed;
self.compression_stats += compression_stats;
self.hardlinked += hardlinked;
}
}
Expand Down Expand Up @@ -1112,10 +1122,11 @@ impl SnapshotRepository {
let dst = src.with_extension("_tmp");
let mut write = BufWriter::new(o_excl().open(&dst)?);
// The default frame size compress better.
compress_with_zstd(read, &mut write, None)?;
let compression_stats = compress_with_zstd(read, &mut write, None)?;
std::fs::rename(dst, src)?;
if let Some(stats) = stats {
stats.compressed += 1;
stats.compression_stats += compression_stats;
}

Ok(())
Expand Down
Loading