Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions changelog.d/disk_v2_reopen_recovers_missing_data_file.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Fixed a `disk` buffer (v2) crash loop where, after a crash or forced restart, Vector
could fail to reopen the buffer with `failed to seek to position where reader left off:
No such file or directory` and exit with a configuration error on every restart. The
reader now advances past a fully acknowledged data file that was already deleted instead
of failing the buffer build, so the buffer always reopens and continues delivering.

Disk buffer (v2) durability was also hardened: the directory holding the buffer is now
`fsync`ed after a data file is created. Previously only file contents were synced, so a
crash could lose a freshly created data file's directory entry and drop data that had
been reported as synced to disk.
31 changes: 26 additions & 5 deletions lib/vector-buffers/src/variants/disk_v2/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ pub trait Filesystem: Send + Sync {
/// If an I/O error occurred when attempting to delete the file, an error variant will be
/// returned describing the underlying error.
async fn delete_file(&self, path: &Path) -> io::Result<()>;

/// Durably persists the entries of the directory at `path`.
///
/// # Errors
///
/// If an I/O error occurred when attempting to synchronize the directory, an error variant will
/// be returned describing the underlying error.
async fn sync_directory(&self, path: &Path) -> io::Result<()>;
}

pub trait AsyncFile: AsyncRead + AsyncWrite + Send + Sync {
Expand All @@ -96,13 +104,10 @@ pub trait AsyncFile: AsyncRead + AsyncWrite + Send + Sync {
/// will be returned describing the underlying error.
async fn metadata(&self) -> io::Result<Metadata>;

/// Attempts to synchronize all OS-internal data, and metadata, to disk.
///
/// This function will attempt to ensure that all in-memory data reaches the filesystem before returning.
///
/// This can be used to handle errors that would otherwise only be caught when the File is closed. Dropping a file will ignore errors in synchronizing this in-memory data.
/// Synchronize all OS-internal data, and metadata, to disk.
///
/// # Errors
///
/// If an I/O error occurred when attempting to synchronize the file data and metadata to disk,
/// an error variant will be returned describing the underlying error.
async fn sync_all(&self) -> io::Result<()>;
Expand Down Expand Up @@ -164,6 +169,22 @@ impl Filesystem for ProductionFilesystem {
async fn delete_file(&self, path: &Path) -> io::Result<()> {
tokio::fs::remove_file(path).await
}

async fn sync_directory(&self, path: &Path) -> io::Result<()> {
// A directory can be opened read-only and fsync'd to persist its entries. This is the
// POSIX-blessed way to make a create or unlink durable. Windows has no directory fsync,
// and disk_v2's durability story is Unix-centric, so this is a no-op there.
#[cfg(unix)]
{
let dir = tokio::fs::File::open(path).await?;
dir.sync_all().await
}
#[cfg(not(unix))]
{
let _ = path;
Ok(())
}
}
}

/// Builds a set of `OpenOptions` for opening a file as readable/writable.
Expand Down
4 changes: 4 additions & 0 deletions lib/vector-buffers/src/variants/disk_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ where
let finalizer = Arc::clone(&ledger).spawn_finalizer();

let mut reader = BufferReader::new(Arc::clone(&ledger), finalizer);
reader
.reconcile_reader_position()
.await
.context(ReaderSeekFailedSnafu)?;
reader
.seek_to_next_record()
.await
Expand Down
66 changes: 65 additions & 1 deletion lib/vector-buffers/src/variants/disk_v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,17 @@ where
);
self.ledger.wait_for_writer().await;
} else {
// The ledger names a data file that is gone while
// the writer has moved on past it. A data file is
// only unlinked after every record in it is acked,
// so a missing file below the writer was fully
// delivered. Advancing past it loses nothing.
warn!(
skipped_file_id = reader_file_id,
writer_file_id,
data_file_path = data_file_path.to_string_lossy().as_ref(),
"Reader resume data file is missing; it was fully acknowledged before deletion. Advancing past it."
);
self.ledger.increment_acked_reader_file_id();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Repair ledger accounting before skipping missing files

When a crash happens after delete_completed_data_file unlinks a data file but before its mmap changes are durably flushed, the restored ledger can still include that file's bytes and last-read record state even though the file is gone. This branch only advances the reader file id, so the skipped file's size remains in total_buffer_size and its record ids may be treated as gaps; after the remaining files drain, the buffer can still appear non-empty/full and the reader can wait for a next writer file instead of reaching an empty state. The recovery path needs to make the same accounting durable before unlink, or explicitly repair the skipped file's ledger state here.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Do not assume missing reader files were acknowledged

This branch also matches buffers affected by the creation-side durability bug this patch calls out: before this change (and still on non-Unix, where sync_directory is a no-op), a crash after the ledger/file contents were flushed but before the new data file's directory entry was durable can leave reader_file_id < writer_file_id with the reader's data file missing even though none of its records were delivered. Incrementing the acked reader file id here silently skips those unread records (and leaves the ledger's byte/record accounting inconsistent), so the recovery path should distinguish/repair the deleted-after-ack case rather than treating every missing file below the writer as fully acknowledged.

Useful? React with 👍 / 👎.

}
continue;
Expand All @@ -819,6 +830,53 @@ where
}
}

/// Reconciles the reader's resume position on reopen.
///
/// A crash between unlinking a fully-acked data file and the durable ledger flush can leave
/// the ledger naming a file that is already gone. Walk the reader file id forward to the
/// lowest file that still exists so `seek_to_next_record` opens a real file instead of failing
/// on a missing one. A missing file strictly below the writer was fully delivered, so skipping
/// it loses nothing. `total_buffer_size` is reseeded on reopen from files that exist, so an
/// absent file already contributes zero and is deliberately not adjusted here.
pub(super) async fn reconcile_reader_position(&mut self) -> Result<(), ReaderError<T>> {
let mut advanced = false;
loop {
let (reader_file_id, writer_file_id) = self.ledger.get_current_reader_writer_file_id();
// Equality guards, not a numeric `<`, so ring wraparound never skips the live writer file.
if reader_file_id == writer_file_id
|| reader_file_id == self.ledger.get_next_writer_file_id()
{
break;
}
let data_file_path = self.ledger.get_data_file_path(reader_file_id);
match self
.ledger
.filesystem()
.open_file_readable(&data_file_path)
.await
{
Ok(_) => break,
Err(e) if e.kind() == ErrorKind::NotFound => {
warn!(
skipped_file_id = reader_file_id,
writer_file_id,
data_file_path = data_file_path.to_string_lossy().as_ref(),
"Reader resume data file missing on reopen; fully acknowledged before deletion. Advancing past it."
);
self.ledger.increment_acked_reader_file_id();
advanced = true;
}
Err(source) => return Err(ReaderError::Io { source }),
}
}
if advanced {
self.ledger
.flush()
.map_err(|source| ReaderError::Io { source })?;
}
Ok(())
}

/// Seeks to where this reader previously left off.
///
/// In cases where Vector has restarted, but the reader hasn't yet finished a file, we would
Expand Down Expand Up @@ -864,8 +922,14 @@ where
//
// Once the reader/writer file IDs are identical, we fall back to the slow path.
while self.ledger.get_current_reader_file_id() != self.ledger.get_current_writer_file_id() {
let data_file_path = self.ledger.get_current_reader_data_file_path();
self.ensure_ready_for_read().await.context(IoSnafu)?;
// NOTE we intentionally read the resume path after
// `ensure_ready_for_read` to avoid crash-looping the buffer. If the
// ledger is out of date -- a hard-crash will cause its sync to be
// missed after dat files are unlinked -- we may be pointed to a
// missing dat file. Skipping is harmless as, by construction, the
// file was previously read entirely.
let data_file_path = self.ledger.get_current_reader_data_file_path();
let data_file_mmap = self
.ledger
.filesystem()
Comment on lines 933 to 935

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Recheck reader position before mmaping writer file

When recovery skips a missing reader file here, ensure_ready_for_read() can advance reader_file_id until it equals writer_file_id and open the writer's current file. Because the outer while condition is not rechecked before this mmap, a crash after the writer rolled to a freshly created but still-empty next data file still reopens into open_mmap_readable() on that empty writer file, which fails instead of falling back to the slow path. Recheck the reader/writer IDs after ensure_ready_for_read() and break once they are equal.

Useful? React with 👍 / 👎.

Expand Down
161 changes: 159 additions & 2 deletions lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use std::time::Duration;

use tokio::time::timeout;
use tokio::{fs, time::timeout};
use tracing::Instrument;

use crate::{
buffer_usage_data::BufferUsageHandle,
test::{SizedRecord, acknowledge, install_tracing_helpers, with_temp_dir},
variants::disk_v2::tests::{create_default_buffer_v2, set_file_length},
variants::disk_v2::{
Buffer, DiskBufferConfigBuilder,
tests::{
create_buffer_v2_with_max_data_file_size, create_default_buffer_v2,
get_minimum_data_file_size_for_record_payload, set_file_length,
},
},
};

#[tokio::test]
Expand Down Expand Up @@ -182,3 +189,153 @@ async fn reader_doesnt_block_when_ahead_of_last_record_in_current_data_file() {
let parent = trace_span!("reader_doesnt_block_when_ahead_of_last_record_in_current_data_file");
fut.instrument(parent.or_current()).await;
}

#[tokio::test]
async fn reopen_recovers_when_reader_resume_data_file_is_missing() {
// Regression test for SMPTNG-749: a crash that loses the reader's un-fsync'd
// file-id advance leaves a deleted data file the ledger still resumes from.
// Reopen must skip it and recover instead of crash-looping.
//
// `reader::delete_completed_data_file` is not atomic, performing the following operations:
//
// * unlink dat file
// * update ack'd reader file ID in ledger
// * sync ledger
//
// If the final sync is not made before the Vector process terminates -- if
// it is SIGKILL'ed for instance -- then the dat file will be gone but the
// reader will be made to open a missing file and will crash.
//
// This test stages one instance of that crash and asserts the reopen recovers.
// In the scenario below the reader is at data file 0 and the writer at 1. That is:
//
// 1. writer writes 2 records -- data-0, data-1 -- leaving the system
// in state reader=0, writer=1.
// 2. reader reads and acks data-0; delete runs unlink(data-0) -> set
// reader=1 -> CRASH
// 3. on restart reader=0 != writer=1, read(data-0) -> ENOENT ->
// ReaderSeekFailed
//
// The test stages the crash in step 2 without a real crash, copying
// buffer.db from step 1, running the real read+ack+delete and then
// restoring buffer.db, as if the disk buffer had been booted cold after a
// crash.
let _a = install_tracing_helpers();

let fut = with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();

async move {
let record = SizedRecord::new(64);
// We size things so that one record write per data file means the
// writer will roll to the next dat file.
let max_data_file_size = get_minimum_data_file_size_for_record_payload(&record);

let (mut writer, mut reader, ledger) =
create_buffer_v2_with_max_data_file_size::<_, SizedRecord>(
data_dir.clone(),
max_data_file_size,
)
.await;

// Write two records, one per data file, so file 0 fills and the writer
// rolls ahead onto file 1.
for _ in 0..2 {
writer
.write_record(record.clone())
.await
.expect("write should not fail");
writer.flush().await.expect("flush should not fail");
}
writer.close();

// We can't actually SIGKILL this process, so snapshot buffer.db while the
// ledger still resumes from file 0; this copy is the durable state a crash
// leaves (see the comment header above).
let ledger_db = data_dir.join("buffer.db");
ledger.flush().expect("ledger flush should not fail");
let durable_snapshot = fs::read(&ledger_db).await.expect("snapshot buffer.db");

// Read and acknowledge file 0's record, then file 1's, which rolls the
// reader off file 0. The next read processes those acks and runs the real
// delete_completed_data_file, unlinking file 0 and draining the buffer.
acknowledge(
reader
.next()
.await
.expect("should not fail to read record")
.expect("file 0 record"),
)
.await;
acknowledge(
reader
.next()
.await
.expect("should not fail to read record")
.expect("file 1 record"),
)
.await;
assert!(
reader.next().await.expect("read should not fail").is_none(),
"the buffer drains after both records"
);

// Simulate the crash.
//
// We drop down reader, writer and ledger and then replace the
// ledger_db with the snapshot we took earlier, simulating a ledger
// shutdown that managed to unlink the reader file but not fsync its
// private storage.
drop(reader);
drop(writer);
drop(ledger);
fs::write(&ledger_db, &durable_snapshot)
.await
.expect("failed to restore buffer.db");

// Simulate the reboot: reopen the disk buffer. It resumes from the unlinked
// data-0 and fails to open.
let build_config = || {
DiskBufferConfigBuilder::from_path(data_dir.clone())
.max_data_file_size(max_data_file_size)
.max_record_size(usize::try_from(max_data_file_size).unwrap())
.build()
.expect("config build should not fail")
};
let first =
Buffer::<SizedRecord>::from_config_inner(build_config(), BufferUsageHandle::noop())
.await;
// Consume `first` so the internals are all consumed, locks are
// released and so forth.
let first_err = first.err();

// Simulate another reboot.
//
// We re-write the ledger to simulate what happens when disk buffer
// is on a durable store but not co-local to the machine. If the
// disk and compute are co-local then the mmap with the right
// indexes _may_ be present in OS page cache and _may_ restart
// properly on the next restart. This will not be true if everything
// is cold, which is what we simulate.
//
// Try removing this fs::write. Depending on how busy your system is
// you may find that this second attempt does not crash.
fs::write(&ledger_db, &durable_snapshot)
.await
.expect("failed to restore buffer.db");
let second =
Buffer::<SizedRecord>::from_config_inner(build_config(), BufferUsageHandle::noop())
.await;
let second_err = second.err();

assert!(
first_err.is_none() && second_err.is_none(),
"SMPTNG-749: every reboot must recover (crash-loop); \
first={first_err:?} second={second_err:?}",
);
}
});

let parent = trace_span!("reopen_recovers_when_reader_resume_data_file_is_missing_smptng_749");
fut.instrument(parent.or_current()).await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,4 +352,8 @@ impl Filesystem for TestFilesystem {
Err(io_err_not_found())
}
}

async fn sync_directory(&self, _path: &Path) -> io::Result<()> {
Ok(())
}
}
8 changes: 7 additions & 1 deletion lib/vector-buffers/src/variants/disk_v2/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,8 +1157,14 @@ where
"Opened data file for writing."
);

// Make sure the file is flushed to disk, especially if we just created it.
// Make sure the file is flushed to disk as well as the
// directory it sits in, especially if we just created said
// file.
data_file.sync_all().await?;
self.ledger
.filesystem()
.sync_directory(&self.ledger.config().data_dir)
.await?;

self.writer = Some(RecordWriter::new(
data_file,
Expand Down
Loading
Loading