Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/vector-buffers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ rand.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
temp-dir = "0.1.16"
tokio = { workspace = true, features = ["test-util"] }
tokio-test.workspace = true
tracing-fluent-assertions = { version = "0.3" }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "registry", "std", "ansi"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ cc 99f23327ef7e759a9cb6424a3fa3495aaf4f1c7ef23145b5f5d72eb6cc5e0173 # shrinks to
cc 54be1ee096dc5169013a1cd8a114f23c5aca7a209a663d783c7f74b4c4ff4746 # shrinks to mut config = DiskBufferConfig { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: 153516, max_data_file_size: 14228, max_record_size: 14214, write_buffer_size: 61440, flush_interval: 10s, filesystem: TestFilesystem { inner: Mutex { data: FilesystemInner { files: {} } } } }, actions = [WriteRecord(Record { id: 0, size: 14163, event_count: 1 })]
cc d812fdee8da4aae904579f08cadae1b585944da58040cad4f18544a87faf240e # shrinks to mut config = DiskBufferConfig { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: 53736, max_data_file_size: 46808, max_record_size: 28912, write_buffer_size: 61440, flush_interval: 10s, filesystem: TestFilesystem { files: {} } }, actions = [WriteRecord(Record { id: 0, size: 23565, event_count: 1, aligned_len: 23632 }), WriteRecord(Record { id: 0, size: 18445, event_count: 1, aligned_len: 18512 }), WriteRecord(Record { id: 0, size: 11557, event_count: 1, aligned_len: 11616 })]
cc b08fb47ac81e9148d8dc6a4d3332e92c21751685be9da9bd0c5d962ad7436285 # shrinks to mut config = DiskBufferConfig { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: 69602, max_data_file_size: 2462, max_record_size: 2450, write_buffer_size: 61440, flush_interval: 10s, filesystem: TestFilesystem { files: {} } }, actions = [WriteRecord(Record { id: 0, size: 2389, event_count: 1, archived_len: 2464 })]
cc 5099a6694efc56a1ba467216d9cec28af741ee63d394f26fec03c9a57f2b973b # shrinks to mut builder = DiskBufferConfigBuilder { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: Some(257344), max_data_file_size: Some(13000), max_record_size: Some(6198), write_buffer_size: Some(61440), flush_interval: Some(10s), filesystem: TestFilesystem { atomicity: Sector, files: {} } }, actions = [Writeback(DirEntry(DataFile(0))), AdvanceTime(10s), ReadRecord, WriteRecord(Record { id: 0, size: 0, event_count: 1, encoded_len: 12, archived_len: 64, .. }), FlushWrites, Crash]
24 changes: 18 additions & 6 deletions lib/vector-buffers/src/variants/disk_v2/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
Arc,
atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering},
},
time::Instant,
};

use bytecheck::CheckBytes;
Expand All @@ -15,7 +14,7 @@ use fslock::LockFile;
use futures::StreamExt;
use rkyv::{Archive, Serialize, with::Atomic};
use snafu::{ResultExt, Snafu};
use tokio::{fs, io::AsyncWriteExt, sync::Notify};
use tokio::{fs, io::AsyncWriteExt, sync::Notify, time::Instant};
use vector_common::finalizer::OrderedFinalizer;

use super::{
Expand All @@ -29,6 +28,9 @@ use crate::buffer_usage_data::BufferUsageHandle;

pub const LEDGER_LEN: usize = align16(mem::size_of::<ArchivedLedgerState>());

/// File name of the ledger within the buffer's data directory.
const LEDGER_FILE_NAME: &str = "buffer.db";

/// Error that occurred during calls to [`Ledger`].
#[derive(Debug, Snafu)]
pub enum LedgerLoadCreateError {
Expand Down Expand Up @@ -380,6 +382,12 @@ where
.join(format!("buffer-data-{file_id}.dat"))
}

/// Gets the path to the ledger file.
#[cfg(test)]
pub fn ledger_path(&self) -> PathBuf {
self.config.data_dir.join(LEDGER_FILE_NAME)
}

/// Waits for a signal from the reader that progress has been made.
///
/// This will only occur when a record is read, which may allow enough space (below the maximum
Expand Down Expand Up @@ -603,7 +611,7 @@ where
}

// Open the ledger file, which may involve creating it if it doesn't yet exist.
let ledger_path = config.data_dir.join("buffer.db");
let ledger_path = config.data_dir.join(LEDGER_FILE_NAME);
let mut ledger_handle = config
.filesystem
.open_file_writable(&ledger_path)
Expand Down Expand Up @@ -737,16 +745,20 @@ where
Ok(())
}

/// Returns the finalizer together with the join handle of the task draining it. Callers that
/// do not need the handle let it drop, leaving the task to run until the finalizer is dropped.
#[must_use]
pub(super) fn spawn_finalizer(self: Arc<Self>) -> OrderedFinalizer<u64> {
pub(super) fn spawn_finalizer(
self: Arc<Self>,
) -> (OrderedFinalizer<u64>, tokio::task::JoinHandle<()>) {
let (finalizer, mut stream) = OrderedFinalizer::new(None);
vector_common::spawn_in_current_span(async move {
let handle = vector_common::spawn_in_current_span(async move {
while let Some((_status, amount)) = stream.next().await {
self.increment_pending_acks(amount);
self.notify_writer_waiters();
}
});
finalizer
(finalizer, handle)
}
}

Expand Down
16 changes: 12 additions & 4 deletions lib/vector-buffers/src/variants/disk_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,15 @@ where
pub(crate) async fn from_config_inner<FS>(
config: DiskBufferConfig<FS>,
usage_handle: BufferUsageHandle,
) -> Result<(BufferWriter<T, FS>, BufferReader<T, FS>, Arc<Ledger<FS>>), BufferError<T>>
) -> Result<
(
BufferWriter<T, FS>,
BufferReader<T, FS>,
Arc<Ledger<FS>>,
tokio::task::JoinHandle<()>,
),
BufferError<T>,
>
where
FS: Filesystem + fmt::Debug + Clone + 'static,
FS::File: Unpin,
Expand All @@ -259,7 +267,7 @@ where
.await
.context(WriterSeekFailedSnafu)?;

let finalizer = Arc::clone(&ledger).spawn_finalizer();
let (finalizer, finalizer_handle) = Arc::clone(&ledger).spawn_finalizer();

let mut reader = BufferReader::new(Arc::clone(&ledger), finalizer);
reader
Expand All @@ -269,7 +277,7 @@ where

ledger.synchronize_buffer_usage();

Ok((writer, reader, ledger))
Ok((writer, reader, ledger, finalizer_handle))
}

/// Creates a new disk buffer from the given [`DiskBufferConfig`].
Expand All @@ -291,7 +299,7 @@ where
FS: Filesystem + fmt::Debug + Clone + 'static,
FS::File: Unpin,
{
let (writer, reader, _) = Self::from_config_inner(config, usage_handle).await?;
let (writer, reader, _, _) = Self::from_config_inner(config, usage_handle).await?;

Ok((writer, reader))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn ack_updates_ledger_correctly() {

// Create our ledger, and make sure it's empty.
let ledger = Arc::new(ledger);
let finalizer = Arc::clone(&ledger).spawn_finalizer();
let (finalizer, _) = Arc::clone(&ledger).spawn_finalizer();
assert_eq!(ledger.consume_pending_acks(), 0);

// Now make sure it updates pending acks.
Expand Down Expand Up @@ -66,7 +66,7 @@ async fn ack_wakes_reader() {
// Create our ledger, as well as a future for awaiting
// writer progress, and make sure it's not yet woken up.
let ledger = Arc::new(ledger);
let finalizer = Arc::clone(&ledger).spawn_finalizer();
let (finalizer, _) = Arc::clone(&ledger).spawn_finalizer();

let mut wait_for_writer = spawn(ledger.wait_for_writer());
assert_pending!(wait_for_writer.poll());
Expand Down
27 changes: 16 additions & 11 deletions lib/vector-buffers/src/variants/disk_v2/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,10 @@ where
.build()
.expect("creating buffer should not fail");
let usage_handle = BufferUsageHandle::noop();
Buffer::from_config_inner(config, usage_handle)
let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle)
.await
.expect("should not fail to create buffer")
.expect("should not fail to create buffer");
(writer, reader, ledger)
}

/// Creates a disk v2 buffer with all default values, but returns a handle to the buffer usage tracker.
Expand All @@ -228,7 +229,7 @@ where
.build()
.expect("creating buffer should not fail");
let usage_handle = BufferUsageHandle::noop();
let (writer, reader, ledger) = Buffer::from_config_inner(config, usage_handle.clone())
let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle.clone())
.await
.expect("should not fail to create buffer");
(writer, reader, ledger, usage_handle)
Expand Down Expand Up @@ -278,9 +279,10 @@ where
.expect("creating buffer should not fail");
let usage_handle = BufferUsageHandle::noop();

Buffer::from_config_inner(config, usage_handle)
let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle)
.await
.expect("should not fail to create buffer")
.expect("should not fail to create buffer");
(writer, reader, ledger)
}

/// Creates a disk v2 buffer with the specified maximum record size.
Expand All @@ -302,9 +304,10 @@ where
.expect("creating buffer should not fail");
let usage_handle = BufferUsageHandle::noop();

Buffer::from_config_inner(config, usage_handle)
let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle)
.await
.expect("should not fail to create buffer")
.expect("should not fail to create buffer");
(writer, reader, ledger)
}

/// Creates a disk v2 buffer with the specified maximum data file size.
Expand All @@ -331,9 +334,10 @@ where
.expect("creating buffer should not fail");
let usage_handle = BufferUsageHandle::noop();

Buffer::from_config_inner(config, usage_handle)
let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle)
.await
.expect("should not fail to create buffer")
.expect("should not fail to create buffer");
(writer, reader, ledger)
}

/// Creates a disk v2 buffer with the specified write buffer size.
Expand All @@ -355,9 +359,10 @@ where
.expect("creating buffer should not fail");
let usage_handle = BufferUsageHandle::noop();

Buffer::from_config_inner(config, usage_handle)
let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle)
.await
.expect("should not fail to create buffer")
.expect("should not fail to create buffer");
(writer, reader, ledger)
}

pub(crate) fn get_corrected_max_record_size<T>(payload: &T) -> usize
Expand Down
Loading
Loading