diff --git a/changelog.d/disk_v2_reopen_recovers_missing_data_file.fix.md b/changelog.d/disk_v2_reopen_recovers_missing_data_file.fix.md new file mode 100644 index 0000000000000..f743b2eb9aa25 --- /dev/null +++ b/changelog.d/disk_v2_reopen_recovers_missing_data_file.fix.md @@ -0,0 +1,10 @@ +Fixed a `disk` buffer (v2) crash loop where, after a crash or forced restart, Vector +could fail to reopen the buffer with `failed to seek to position where reader left off: +No such file or directory` and exit with a configuration error on every restart. The +reader now advances past a fully acknowledged data file that was already deleted instead +of failing the buffer build, so the buffer always reopens and continues delivering. + +Disk buffer (v2) durability was also hardened: the directory holding the buffer is now +`fsync`ed after a data file is created. Previously only file contents were synced, so a +crash could lose a freshly created data file's directory entry and drop data that had +been reported as synced to disk. diff --git a/lib/vector-buffers/src/variants/disk_v2/io.rs b/lib/vector-buffers/src/variants/disk_v2/io.rs index a63b46ba1bc7d..4654db5cc7ec3 100644 --- a/lib/vector-buffers/src/variants/disk_v2/io.rs +++ b/lib/vector-buffers/src/variants/disk_v2/io.rs @@ -85,6 +85,14 @@ pub trait Filesystem: Send + Sync { /// If an I/O error occurred when attempting to delete the file, an error variant will be /// returned describing the underlying error. async fn delete_file(&self, path: &Path) -> io::Result<()>; + + /// Durably persists the entries of the directory at `path`. + /// + /// # Errors + /// + /// If an I/O error occurred when attempting to synchronize the directory, an error variant will + /// be returned describing the underlying error. + async fn sync_directory(&self, path: &Path) -> io::Result<()>; } pub trait AsyncFile: AsyncRead + AsyncWrite + Send + Sync { @@ -96,13 +104,10 @@ pub trait AsyncFile: AsyncRead + AsyncWrite + Send + Sync { /// will be returned describing the underlying error. async fn metadata(&self) -> io::Result; - /// Attempts to synchronize all OS-internal data, and metadata, to disk. - /// - /// This function will attempt to ensure that all in-memory data reaches the filesystem before returning. - /// - /// This can be used to handle errors that would otherwise only be caught when the File is closed. Dropping a file will ignore errors in synchronizing this in-memory data. + /// Synchronize all OS-internal data, and metadata, to disk. /// /// # Errors + /// /// If an I/O error occurred when attempting to synchronize the file data and metadata to disk, /// an error variant will be returned describing the underlying error. async fn sync_all(&self) -> io::Result<()>; @@ -164,6 +169,22 @@ impl Filesystem for ProductionFilesystem { async fn delete_file(&self, path: &Path) -> io::Result<()> { tokio::fs::remove_file(path).await } + + async fn sync_directory(&self, path: &Path) -> io::Result<()> { + // A directory can be opened read-only and fsync'd to persist its entries. This is the + // POSIX-blessed way to make a create or unlink durable. Windows has no directory fsync, + // and disk_v2's durability story is Unix-centric, so this is a no-op there. + #[cfg(unix)] + { + let dir = tokio::fs::File::open(path).await?; + dir.sync_all().await + } + #[cfg(not(unix))] + { + let _ = path; + Ok(()) + } + } } /// Builds a set of `OpenOptions` for opening a file as readable/writable. diff --git a/lib/vector-buffers/src/variants/disk_v2/mod.rs b/lib/vector-buffers/src/variants/disk_v2/mod.rs index 6cb43421ae728..ebfb013930b8b 100644 --- a/lib/vector-buffers/src/variants/disk_v2/mod.rs +++ b/lib/vector-buffers/src/variants/disk_v2/mod.rs @@ -262,6 +262,10 @@ where let finalizer = Arc::clone(&ledger).spawn_finalizer(); let mut reader = BufferReader::new(Arc::clone(&ledger), finalizer); + reader + .reconcile_reader_position() + .await + .context(ReaderSeekFailedSnafu)?; reader .seek_to_next_record() .await diff --git a/lib/vector-buffers/src/variants/disk_v2/reader.rs b/lib/vector-buffers/src/variants/disk_v2/reader.rs index 99f5149026ecd..1c98df44417ab 100644 --- a/lib/vector-buffers/src/variants/disk_v2/reader.rs +++ b/lib/vector-buffers/src/variants/disk_v2/reader.rs @@ -800,6 +800,17 @@ where ); self.ledger.wait_for_writer().await; } else { + // The ledger names a data file that is gone while + // the writer has moved on past it. A data file is + // only unlinked after every record in it is acked, + // so a missing file below the writer was fully + // delivered. Advancing past it loses nothing. + warn!( + skipped_file_id = reader_file_id, + writer_file_id, + data_file_path = data_file_path.to_string_lossy().as_ref(), + "Reader resume data file is missing; it was fully acknowledged before deletion. Advancing past it." + ); self.ledger.increment_acked_reader_file_id(); } continue; @@ -819,6 +830,53 @@ where } } + /// Reconciles the reader's resume position on reopen. + /// + /// A crash between unlinking a fully-acked data file and the durable ledger flush can leave + /// the ledger naming a file that is already gone. Walk the reader file id forward to the + /// lowest file that still exists so `seek_to_next_record` opens a real file instead of failing + /// on a missing one. A missing file strictly below the writer was fully delivered, so skipping + /// it loses nothing. `total_buffer_size` is reseeded on reopen from files that exist, so an + /// absent file already contributes zero and is deliberately not adjusted here. + pub(super) async fn reconcile_reader_position(&mut self) -> Result<(), ReaderError> { + let mut advanced = false; + loop { + let (reader_file_id, writer_file_id) = self.ledger.get_current_reader_writer_file_id(); + // Equality guards, not a numeric `<`, so ring wraparound never skips the live writer file. + if reader_file_id == writer_file_id + || reader_file_id == self.ledger.get_next_writer_file_id() + { + break; + } + let data_file_path = self.ledger.get_data_file_path(reader_file_id); + match self + .ledger + .filesystem() + .open_file_readable(&data_file_path) + .await + { + Ok(_) => break, + Err(e) if e.kind() == ErrorKind::NotFound => { + warn!( + skipped_file_id = reader_file_id, + writer_file_id, + data_file_path = data_file_path.to_string_lossy().as_ref(), + "Reader resume data file missing on reopen; fully acknowledged before deletion. Advancing past it." + ); + self.ledger.increment_acked_reader_file_id(); + advanced = true; + } + Err(source) => return Err(ReaderError::Io { source }), + } + } + if advanced { + self.ledger + .flush() + .map_err(|source| ReaderError::Io { source })?; + } + Ok(()) + } + /// Seeks to where this reader previously left off. /// /// In cases where Vector has restarted, but the reader hasn't yet finished a file, we would @@ -864,8 +922,14 @@ where // // Once the reader/writer file IDs are identical, we fall back to the slow path. while self.ledger.get_current_reader_file_id() != self.ledger.get_current_writer_file_id() { - let data_file_path = self.ledger.get_current_reader_data_file_path(); self.ensure_ready_for_read().await.context(IoSnafu)?; + // NOTE we intentionally read the resume path after + // `ensure_ready_for_read` to avoid crash-looping the buffer. If the + // ledger is out of date -- a hard-crash will cause its sync to be + // missed after dat files are unlinked -- we may be pointed to a + // missing dat file. Skipping is harmless as, by construction, the + // file was previously read entirely. + let data_file_path = self.ledger.get_current_reader_data_file_path(); let data_file_mmap = self .ledger .filesystem() diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs b/lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs index d0a51f1f05b0e..18e1e628682bc 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs @@ -1,11 +1,18 @@ use std::time::Duration; -use tokio::time::timeout; +use tokio::{fs, time::timeout}; use tracing::Instrument; use crate::{ + buffer_usage_data::BufferUsageHandle, test::{SizedRecord, acknowledge, install_tracing_helpers, with_temp_dir}, - variants::disk_v2::tests::{create_default_buffer_v2, set_file_length}, + variants::disk_v2::{ + Buffer, DiskBufferConfigBuilder, + tests::{ + create_buffer_v2_with_max_data_file_size, create_default_buffer_v2, + get_minimum_data_file_size_for_record_payload, set_file_length, + }, + }, }; #[tokio::test] @@ -182,3 +189,153 @@ async fn reader_doesnt_block_when_ahead_of_last_record_in_current_data_file() { let parent = trace_span!("reader_doesnt_block_when_ahead_of_last_record_in_current_data_file"); fut.instrument(parent.or_current()).await; } + +#[tokio::test] +async fn reopen_recovers_when_reader_resume_data_file_is_missing() { + // Regression test for SMPTNG-749: a crash that loses the reader's un-fsync'd + // file-id advance leaves a deleted data file the ledger still resumes from. + // Reopen must skip it and recover instead of crash-looping. + // + // `reader::delete_completed_data_file` is not atomic, performing the following operations: + // + // * unlink dat file + // * update ack'd reader file ID in ledger + // * sync ledger + // + // If the final sync is not made before the Vector process terminates -- if + // it is SIGKILL'ed for instance -- then the dat file will be gone but the + // reader will be made to open a missing file and will crash. + // + // This test stages one instance of that crash and asserts the reopen recovers. + // In the scenario below the reader is at data file 0 and the writer at 1. That is: + // + // 1. writer writes 2 records -- data-0, data-1 -- leaving the system + // in state reader=0, writer=1. + // 2. reader reads and acks data-0; delete runs unlink(data-0) -> set + // reader=1 -> CRASH + // 3. on restart reader=0 != writer=1, read(data-0) -> ENOENT -> + // ReaderSeekFailed + // + // The test stages the crash in step 2 without a real crash, copying + // buffer.db from step 1, running the real read+ack+delete and then + // restoring buffer.db, as if the disk buffer had been booted cold after a + // crash. + let _a = install_tracing_helpers(); + + let fut = with_temp_dir(|dir| { + let data_dir = dir.to_path_buf(); + + async move { + let record = SizedRecord::new(64); + // We size things so that one record write per data file means the + // writer will roll to the next dat file. + let max_data_file_size = get_minimum_data_file_size_for_record_payload(&record); + + let (mut writer, mut reader, ledger) = + create_buffer_v2_with_max_data_file_size::<_, SizedRecord>( + data_dir.clone(), + max_data_file_size, + ) + .await; + + // Write two records, one per data file, so file 0 fills and the writer + // rolls ahead onto file 1. + for _ in 0..2 { + writer + .write_record(record.clone()) + .await + .expect("write should not fail"); + writer.flush().await.expect("flush should not fail"); + } + writer.close(); + + // We can't actually SIGKILL this process, so snapshot buffer.db while the + // ledger still resumes from file 0; this copy is the durable state a crash + // leaves (see the comment header above). + let ledger_db = data_dir.join("buffer.db"); + ledger.flush().expect("ledger flush should not fail"); + let durable_snapshot = fs::read(&ledger_db).await.expect("snapshot buffer.db"); + + // Read and acknowledge file 0's record, then file 1's, which rolls the + // reader off file 0. The next read processes those acks and runs the real + // delete_completed_data_file, unlinking file 0 and draining the buffer. + acknowledge( + reader + .next() + .await + .expect("should not fail to read record") + .expect("file 0 record"), + ) + .await; + acknowledge( + reader + .next() + .await + .expect("should not fail to read record") + .expect("file 1 record"), + ) + .await; + assert!( + reader.next().await.expect("read should not fail").is_none(), + "the buffer drains after both records" + ); + + // Simulate the crash. + // + // We drop down reader, writer and ledger and then replace the + // ledger_db with the snapshot we took earlier, simulating a ledger + // shutdown that managed to unlink the reader file but not fsync its + // private storage. + drop(reader); + drop(writer); + drop(ledger); + fs::write(&ledger_db, &durable_snapshot) + .await + .expect("failed to restore buffer.db"); + + // Simulate the reboot: reopen the disk buffer. It resumes from the unlinked + // data-0 and fails to open. + let build_config = || { + DiskBufferConfigBuilder::from_path(data_dir.clone()) + .max_data_file_size(max_data_file_size) + .max_record_size(usize::try_from(max_data_file_size).unwrap()) + .build() + .expect("config build should not fail") + }; + let first = + Buffer::::from_config_inner(build_config(), BufferUsageHandle::noop()) + .await; + // Consume `first` so the internals are all consumed, locks are + // released and so forth. + let first_err = first.err(); + + // Simulate another reboot. + // + // We re-write the ledger to simulate what happens when disk buffer + // is on a durable store but not co-local to the machine. If the + // disk and compute are co-local then the mmap with the right + // indexes _may_ be present in OS page cache and _may_ restart + // properly on the next restart. This will not be true if everything + // is cold, which is what we simulate. + // + // Try removing this fs::write. Depending on how busy your system is + // you may find that this second attempt does not crash. + fs::write(&ledger_db, &durable_snapshot) + .await + .expect("failed to restore buffer.db"); + let second = + Buffer::::from_config_inner(build_config(), BufferUsageHandle::noop()) + .await; + let second_err = second.err(); + + assert!( + first_err.is_none() && second_err.is_none(), + "SMPTNG-749: every reboot must recover (crash-loop); \ + first={first_err:?} second={second_err:?}", + ); + } + }); + + let parent = trace_span!("reopen_recovers_when_reader_resume_data_file_is_missing_smptng_749"); + fut.instrument(parent.or_current()).await; +} diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs index a88204d7f7a9c..4de5af46dd5e5 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs @@ -352,4 +352,8 @@ impl Filesystem for TestFilesystem { Err(io_err_not_found()) } } + + async fn sync_directory(&self, _path: &Path) -> io::Result<()> { + Ok(()) + } } diff --git a/lib/vector-buffers/src/variants/disk_v2/writer.rs b/lib/vector-buffers/src/variants/disk_v2/writer.rs index 65e57ed7af686..883c363f05f99 100644 --- a/lib/vector-buffers/src/variants/disk_v2/writer.rs +++ b/lib/vector-buffers/src/variants/disk_v2/writer.rs @@ -1157,8 +1157,14 @@ where "Opened data file for writing." ); - // Make sure the file is flushed to disk, especially if we just created it. + // Make sure the file is flushed to disk as well as the + // directory it sits in, especially if we just created said + // file. data_file.sync_all().await?; + self.ledger + .filesystem() + .sync_directory(&self.ledger.config().data_dir) + .await?; self.writer = Some(RecordWriter::new( data_file, diff --git a/tests/antithesis/scenarios/vector_e2e/.launch/docker-compose.yaml b/tests/antithesis/scenarios/vector_e2e/.launch/docker-compose.yaml new file mode 100644 index 0000000000000..edb049a926804 --- /dev/null +++ b/tests/antithesis/scenarios/vector_e2e/.launch/docker-compose.yaml @@ -0,0 +1,72 @@ +name: vector-e2e +services: + oracle: + build: + context: /home/ssm-user/src/vector + dockerfile: tests/antithesis/scenarios/vector_e2e/Dockerfile + target: workload + container_name: oracle + depends_on: + vector: + condition: service_healthy + required: true + environment: + NO_COLOR: "1" + ORACLE_URL: http://127.0.0.1:8686 + SCENARIO_NAME: vector_e2e + VECTOR_METRICS_URL: http://vector:9598/metrics + VECTOR_METRICS_URLS: http://vector:9598/metrics + VECTOR_SOURCE_URL: http://vector:8080/ + hostname: oracle + image: ve2e-oracle:3cce5aa6e-dirty + init: true + networks: + default: null + platform: linux/amd64 + vector: + build: + context: /home/ssm-user/src/vector + dockerfile: tests/antithesis/scenarios/vector_e2e/Dockerfile + target: vector + container_name: vector + entrypoint: + - /usr/bin/vector + - --config + - /etc/vector/vector.yaml + environment: + NO_COLOR: "1" + VECTOR_CONFIG: /etc/vector/vector.yaml + VECTOR_CONFIG_ALT: /etc/vector/vector.b.yaml + hostname: vector + healthcheck: + test: + - CMD + - curl + - -fsS + - http://localhost:9598/metrics + timeout: 3s + interval: 5s + retries: 30 + start_period: 10s + image: ve2e-vector:3cce5aa6e-dirty + init: true + networks: + default: null + platform: linux/amd64 +networks: + default: + name: vector-e2e_default +x-node-health: + interval: 5s + retries: 30 + start_period: 10s + test: + - CMD + - curl + - -fsS + - http://localhost:9598/metrics + timeout: 3s +x-vector-build: + context: ../../../.. + dockerfile: tests/antithesis/scenarios/vector_e2e/Dockerfile + target: vector diff --git a/tests/antithesis/scenarios/vector_e2e/Dockerfile b/tests/antithesis/scenarios/vector_e2e/Dockerfile index 350fc48a9e165..a224e9ed77cd3 100644 --- a/tests/antithesis/scenarios/vector_e2e/Dockerfile +++ b/tests/antithesis/scenarios/vector_e2e/Dockerfile @@ -76,13 +76,8 @@ FROM debian:stable-slim AS vector RUN apt-get update && apt-get install -y --no-install-recommends curl ca-certificates \ && rm -rf /var/lib/apt/lists/* COPY --from=vector-build /usr/local/bin/vector /usr/bin/vector -# Bake the node config plus its benign alternate, which the reload fault swaps in -# to force a sink rebuild. +# Bake the node config; compose points --config at it. COPY tests/antithesis/scenarios/vector_e2e/vector.yaml /etc/vector/vector.yaml -COPY tests/antithesis/scenarios/vector_e2e/vector.b.yaml /etc/vector/vector.b.yaml -# The reload fault is an anytime_ test command that runs IN the node container. -# The node stays running because its entrypoint is Vector, not a test command. -COPY --chmod=755 tests/antithesis/scenarios/vector_e2e/anytime_reload.sh /opt/antithesis/test/v1/ve2e/anytime_reload RUN mkdir -p /symbols && ln -s /usr/bin/vector /symbols/vector ENV NO_COLOR=1 EXPOSE 8080 9598 diff --git a/tests/antithesis/scenarios/vector_e2e/README.md b/tests/antithesis/scenarios/vector_e2e/README.md index 4448e918b7e94..4aabdc33a01b3 100644 --- a/tests/antithesis/scenarios/vector_e2e/README.md +++ b/tests/antithesis/scenarios/vector_e2e/README.md @@ -26,9 +26,7 @@ One Vector node and one oracle container. - **vector** takes an `http_server` source (`:8080`) and delivers over `http` to the oracle through an in-memory buffer with `when_full: block` and e2e acks. It - also exposes Prometheus metrics (`:9598`) for the health gate, and runs the - reload fault: an `anytime_` command swaps `vector.yaml`/`vector.b.yaml` and sends - `SIGHUP`, forcing the sink to rebuild mid-run. + also exposes Prometheus metrics (`:9598`) for the health gate. - **oracle** (`:8686`) is one container that injects unique event ids at the node and runs the HTTP endpoint the node's sink delivers back to. diff --git a/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml b/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml index abbc3a136b36d..f18a794ad251a 100644 --- a/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml +++ b/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml @@ -21,12 +21,8 @@ services: build: *vector-build image: ve2e-vector:${ANTITHESIS_IMAGE_TAG:-dev} entrypoint: ["/usr/bin/vector", "--config", "/etc/vector/vector.yaml"] - # vector runs the reload fault: VECTOR_CONFIG_ALT lets anytime_reload swap - # configs and SIGHUP, forcing the sink to rebuild. No disk buffer, so no volume. environment: NO_COLOR: "1" - VECTOR_CONFIG: "/etc/vector/vector.yaml" - VECTOR_CONFIG_ALT: "/etc/vector/vector.b.yaml" healthcheck: *node-health oracle: