From 610b1ef1a888b9a49474db2fd5b706916faed759 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Mon, 22 Jun 2026 21:00:29 +0000 Subject: [PATCH 1/2] Change liveness check for eventually_conservation The previous version of eventually_conservation relied on polling /metrics to infer whether or not the data topology in the scenarios were active. This inference is a race, metrics/ might report while the source/sink are still coming online, so in a small number of cases scenarios failed on this race. The approach taken here is to bump the "is it back online" check to 180s and poll directly on the topology, not on /metrics. I've taken the opportunity to slim vector_e2e a little further also, removing vestigial restart config update material. This rare failure will be fixed in the disk buffer scenario as well. --- .../src/bin/eventually_conservation.rs | 9 ++++- .../scenarios/vector_e2e/Dockerfile | 7 +--- .../antithesis/scenarios/vector_e2e/README.md | 4 +-- .../scenarios/vector_e2e/anytime_reload.sh | 34 ------------------ .../scenarios/vector_e2e/docker-compose.yaml | 4 --- .../scenarios/vector_e2e/vector.b.yaml | 36 ------------------- 6 files changed, 10 insertions(+), 84 deletions(-) delete mode 100755 tests/antithesis/scenarios/vector_e2e/anytime_reload.sh delete mode 100644 tests/antithesis/scenarios/vector_e2e/vector.b.yaml diff --git a/tests/antithesis/harness/src/bin/eventually_conservation.rs b/tests/antithesis/harness/src/bin/eventually_conservation.rs index 947d6d9372d2a..452a1ef898458 100644 --- a/tests/antithesis/harness/src/bin/eventually_conservation.rs +++ b/tests/antithesis/harness/src/bin/eventually_conservation.rs @@ -226,7 +226,14 @@ async fn main() { // post retry until one sticks, since a node can briefly refuse a write while it is // still recovering. A wedged node never delivers it and fails here. Runs // unconditionally. - let deadline = time::Instant::now() + time::Duration::from_secs(45); + // + // The recovery gate above only proves the metrics endpoint answers. That is a + // separate listener from the source's data path, so the source and sink can still + // be unready while metrics already serve, and a just-restarted node needs time to + // bring them up. The round-trip is therefore the real readiness signal and gets the + // same budget as recovery rather than a tight window that expires before the data + // path is serving. + let deadline = time::Instant::now() + time::Duration::from_secs(180); let mut probe = None; let mut progressed = false; while !progressed && time::Instant::now() < deadline { diff --git a/tests/antithesis/scenarios/vector_e2e/Dockerfile b/tests/antithesis/scenarios/vector_e2e/Dockerfile index 350fc48a9e165..7f2d0e21b170f 100644 --- a/tests/antithesis/scenarios/vector_e2e/Dockerfile +++ b/tests/antithesis/scenarios/vector_e2e/Dockerfile @@ -76,13 +76,8 @@ FROM debian:stable-slim AS vector RUN apt-get update && apt-get install -y --no-install-recommends curl ca-certificates \ && rm -rf /var/lib/apt/lists/* COPY --from=vector-build /usr/local/bin/vector /usr/bin/vector -# Bake the node config plus its benign alternate, which the reload fault swaps in -# to force a sink rebuild. +# Bake the node config; compose selects it via --config. COPY tests/antithesis/scenarios/vector_e2e/vector.yaml /etc/vector/vector.yaml -COPY tests/antithesis/scenarios/vector_e2e/vector.b.yaml /etc/vector/vector.b.yaml -# The reload fault is an anytime_ test command that runs IN the node container. -# The node stays running because its entrypoint is Vector, not a test command. -COPY --chmod=755 tests/antithesis/scenarios/vector_e2e/anytime_reload.sh /opt/antithesis/test/v1/ve2e/anytime_reload RUN mkdir -p /symbols && ln -s /usr/bin/vector /symbols/vector ENV NO_COLOR=1 EXPOSE 8080 9598 diff --git a/tests/antithesis/scenarios/vector_e2e/README.md b/tests/antithesis/scenarios/vector_e2e/README.md index 4448e918b7e94..4aabdc33a01b3 100644 --- a/tests/antithesis/scenarios/vector_e2e/README.md +++ b/tests/antithesis/scenarios/vector_e2e/README.md @@ -26,9 +26,7 @@ One Vector node and one oracle container. - **vector** takes an `http_server` source (`:8080`) and delivers over `http` to the oracle through an in-memory buffer with `when_full: block` and e2e acks. It - also exposes Prometheus metrics (`:9598`) for the health gate, and runs the - reload fault: an `anytime_` command swaps `vector.yaml`/`vector.b.yaml` and sends - `SIGHUP`, forcing the sink to rebuild mid-run. + also exposes Prometheus metrics (`:9598`) for the health gate. - **oracle** (`:8686`) is one container that injects unique event ids at the node and runs the HTTP endpoint the node's sink delivers back to. diff --git a/tests/antithesis/scenarios/vector_e2e/anytime_reload.sh b/tests/antithesis/scenarios/vector_e2e/anytime_reload.sh deleted file mode 100755 index 71416aaf9324b..0000000000000 --- a/tests/antithesis/scenarios/vector_e2e/anytime_reload.sh +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail -[ -n "${VECTOR_CONFIG_ALT:-}" ] || exit 0 -cfg="${VECTOR_CONFIG:?}" -alt="${VECTOR_CONFIG_ALT:?}" - -# Vector only ever reads $cfg, so reload alternates $cfg between two immutable -# sources rather than swapping two live files. The alternate $alt is never -# written, and the baseline (the original $cfg) is snapshotted once, so the only -# mutable file is $cfg and the only writes to it are a single rename of a fully -# written temp. The node-termination fault can therefore interrupt this script at -# any point and leave $cfg as one complete config or the other, never half-written -# and never collapsed so both sources hold the same content. Alternation always -# resumes on the next invocation. -base="$cfg.orig" -if [ ! -f "$base" ]; then - cp "$cfg" "$base.tmp" - mv "$base.tmp" "$base" -fi - -# Pick whichever source is not currently live. cksum reads from stdin so its -# output is the checksum alone, with no filename to differ on. -if [ "$(cksum <"$cfg")" = "$(cksum <"$alt")" ]; then - next="$base" -else - next="$alt" -fi -cp "$next" "$cfg.tmp" -mv "$cfg.tmp" "$cfg" - -# Vector is PID 1 in the node container. SIGHUP triggers reload-from-disk. -kill -HUP 1 -sleep 5 diff --git a/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml b/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml index abbc3a136b36d..f18a794ad251a 100644 --- a/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml +++ b/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml @@ -21,12 +21,8 @@ services: build: *vector-build image: ve2e-vector:${ANTITHESIS_IMAGE_TAG:-dev} entrypoint: ["/usr/bin/vector", "--config", "/etc/vector/vector.yaml"] - # vector runs the reload fault: VECTOR_CONFIG_ALT lets anytime_reload swap - # configs and SIGHUP, forcing the sink to rebuild. No disk buffer, so no volume. environment: NO_COLOR: "1" - VECTOR_CONFIG: "/etc/vector/vector.yaml" - VECTOR_CONFIG_ALT: "/etc/vector/vector.b.yaml" healthcheck: *node-health oracle: diff --git a/tests/antithesis/scenarios/vector_e2e/vector.b.yaml b/tests/antithesis/scenarios/vector_e2e/vector.b.yaml deleted file mode 100644 index d426c7413699a..0000000000000 --- a/tests/antithesis/scenarios/vector_e2e/vector.b.yaml +++ /dev/null @@ -1,36 +0,0 @@ -sources: - in: - type: http_server - address: 0.0.0.0:8080 - decoding: - codec: json - acknowledgements: - enabled: true - - metrics: - type: internal_metrics - scrape_interval_secs: 1 - -sinks: - out: - type: http - inputs: [in] - uri: http://oracle:8686/ingest - method: post - encoding: - codec: json - # Benign alternate the reload fault swaps in. It differs from vector.yaml only - # by an explicit request timeout, enough to make the reload rebuild the sink. - request: - timeout_secs: 45 - buffer: - type: memory - max_events: 500 - when_full: block - acknowledgements: - enabled: true - - prom: - type: prometheus_exporter - inputs: [metrics] - address: 0.0.0.0:9598 From cff5d2eda2aef5c52cba0ade0d5b3d0adac99267 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Mon, 15 Jun 2026 22:19:04 +0000 Subject: [PATCH 2/2] Expand vector-buffers property model to include crashes, writebacks This commit expands on the property model for disk buffers v2, taking inspiration from the paper [All File Systems Are Not Created Equal: On the Complexity of Crafting Crash-Consistent Applications](https://www.usenix.org/conference/osdi14/technical-sessions/presentation/pillai). I have expanded `Action` to have two new variants, `Writeback` and `Crash`. Crash is the simpler, it causes the PBT to simulate a restart of the buffer. The filesystem is updated to have a notion of 'durable' writes, those writes which are not durable are dropped after a crash action and restart. Writeback allows us to simulate OS non-determinism around when mmaps are flushed to disk, directories are synced etc without explicit syscalls to force that. Similar also to the ALICE paper the PBT Filesystem is updated to have a notion of atomicity in writes, whether either block sized or sector sized. The model remains ideal, that is, the model is built such that all writes are durable once a writer ack is sent backward. This means the test now fails because the SUT does not behave this way, now easily demonstrated. A minimal failure sequence: ``` [ WriteRecord(Record { id: 0, size: 0, event_count: 1, encoded_len: 12, archived_len: 64, .. }), Crash, FlushWrites, ] ``` Currently disk buffers v2 are not crash safe. This was flagged by our antithesis test introduced in PR #25562 and is reproduced here also, although these faults are hidden behind the more cheap-to-reproduce failure above. I believe we are missing a few things in the SUT: * synchronization: of directory changes, that is, file names do not become durable until synced * _either_ bulk commits on flush interval with ack handlers not releasing until that commit _or_ flush interval is dropped entirely with acks == sync, which is very slow. --- Cargo.lock | 1 + lib/vector-buffers/Cargo.toml | 1 + .../variants/disk_v2/tests/model/mod.txt | 1 + .../src/variants/disk_v2/ledger.rs | 24 +- .../src/variants/disk_v2/mod.rs | 16 +- .../disk_v2/tests/acknowledgements.rs | 4 +- .../src/variants/disk_v2/tests/mod.rs | 27 +- .../variants/disk_v2/tests/model/action.rs | 324 +++++++++++++++++- .../variants/disk_v2/tests/model/common.rs | 126 ++++--- .../disk_v2/tests/model/filesystem.rs | 157 ++++++++- .../src/variants/disk_v2/tests/model/mod.rs | 97 +++++- .../variants/disk_v2/tests/model/sequencer.rs | 39 ++- tests/antithesis/harness/Cargo.toml | 1 + .../src/bin/eventually_conservation.rs | 24 +- .../src/bin/parallel_driver_produce.rs | 44 +-- tests/antithesis/harness/src/lib.rs | 65 +++- 16 files changed, 759 insertions(+), 192 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb525def3b832..847040a8e12dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -231,6 +231,7 @@ dependencies = [ "antithesis_sdk", "axum 0.6.20", "clap", + "hex", "reqwest 0.11.26", "serde_json", "tokio", diff --git a/lib/vector-buffers/Cargo.toml b/lib/vector-buffers/Cargo.toml index c8d57a8e50862..d493e65e66664 100644 --- a/lib/vector-buffers/Cargo.toml +++ b/lib/vector-buffers/Cargo.toml @@ -54,6 +54,7 @@ rand.workspace = true serde_json.workspace = true serde_yaml.workspace = true temp-dir = "0.1.16" +tokio = { workspace = true, features = ["test-util"] } tokio-test.workspace = true tracing-fluent-assertions = { version = "0.3" } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "registry", "std", "ansi"] } diff --git a/lib/vector-buffers/proptest-regressions/variants/disk_v2/tests/model/mod.txt b/lib/vector-buffers/proptest-regressions/variants/disk_v2/tests/model/mod.txt index 381d1695a05fb..8fda5dbbf20a6 100644 --- a/lib/vector-buffers/proptest-regressions/variants/disk_v2/tests/model/mod.txt +++ b/lib/vector-buffers/proptest-regressions/variants/disk_v2/tests/model/mod.txt @@ -13,3 +13,4 @@ cc 99f23327ef7e759a9cb6424a3fa3495aaf4f1c7ef23145b5f5d72eb6cc5e0173 # shrinks to cc 54be1ee096dc5169013a1cd8a114f23c5aca7a209a663d783c7f74b4c4ff4746 # shrinks to mut config = DiskBufferConfig { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: 153516, max_data_file_size: 14228, max_record_size: 14214, write_buffer_size: 61440, flush_interval: 10s, filesystem: TestFilesystem { inner: Mutex { data: FilesystemInner { files: {} } } } }, actions = [WriteRecord(Record { id: 0, size: 14163, event_count: 1 })] cc d812fdee8da4aae904579f08cadae1b585944da58040cad4f18544a87faf240e # shrinks to mut config = DiskBufferConfig { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: 53736, max_data_file_size: 46808, max_record_size: 28912, write_buffer_size: 61440, flush_interval: 10s, filesystem: TestFilesystem { files: {} } }, actions = [WriteRecord(Record { id: 0, size: 23565, event_count: 1, aligned_len: 23632 }), WriteRecord(Record { id: 0, size: 18445, event_count: 1, aligned_len: 18512 }), WriteRecord(Record { id: 0, size: 11557, event_count: 1, aligned_len: 11616 })] cc b08fb47ac81e9148d8dc6a4d3332e92c21751685be9da9bd0c5d962ad7436285 # shrinks to mut config = DiskBufferConfig { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: 69602, max_data_file_size: 2462, max_record_size: 2450, write_buffer_size: 61440, flush_interval: 10s, filesystem: TestFilesystem { files: {} } }, actions = [WriteRecord(Record { id: 0, size: 2389, event_count: 1, archived_len: 2464 })] +cc 5099a6694efc56a1ba467216d9cec28af741ee63d394f26fec03c9a57f2b973b # shrinks to mut builder = DiskBufferConfigBuilder { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: Some(257344), max_data_file_size: Some(13000), max_record_size: Some(6198), write_buffer_size: Some(61440), flush_interval: Some(10s), filesystem: TestFilesystem { atomicity: Sector, files: {} } }, actions = [Writeback(DirEntry(DataFile(0))), AdvanceTime(10s), ReadRecord, WriteRecord(Record { id: 0, size: 0, event_count: 1, encoded_len: 12, archived_len: 64, .. }), FlushWrites, Crash] diff --git a/lib/vector-buffers/src/variants/disk_v2/ledger.rs b/lib/vector-buffers/src/variants/disk_v2/ledger.rs index 0e32b8266d977..82fecbac0cd03 100644 --- a/lib/vector-buffers/src/variants/disk_v2/ledger.rs +++ b/lib/vector-buffers/src/variants/disk_v2/ledger.rs @@ -5,7 +5,6 @@ use std::{ Arc, atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering}, }, - time::Instant, }; use bytecheck::CheckBytes; @@ -15,7 +14,7 @@ use fslock::LockFile; use futures::StreamExt; use rkyv::{Archive, Serialize, with::Atomic}; use snafu::{ResultExt, Snafu}; -use tokio::{fs, io::AsyncWriteExt, sync::Notify}; +use tokio::{fs, io::AsyncWriteExt, sync::Notify, time::Instant}; use vector_common::finalizer::OrderedFinalizer; use super::{ @@ -29,6 +28,9 @@ use crate::buffer_usage_data::BufferUsageHandle; pub const LEDGER_LEN: usize = align16(mem::size_of::()); +/// File name of the ledger within the buffer's data directory. +const LEDGER_FILE_NAME: &str = "buffer.db"; + /// Error that occurred during calls to [`Ledger`]. #[derive(Debug, Snafu)] pub enum LedgerLoadCreateError { @@ -380,6 +382,12 @@ where .join(format!("buffer-data-{file_id}.dat")) } + /// Gets the path to the ledger file. + #[cfg(test)] + pub fn ledger_path(&self) -> PathBuf { + self.config.data_dir.join(LEDGER_FILE_NAME) + } + /// Waits for a signal from the reader that progress has been made. /// /// This will only occur when a record is read, which may allow enough space (below the maximum @@ -603,7 +611,7 @@ where } // Open the ledger file, which may involve creating it if it doesn't yet exist. - let ledger_path = config.data_dir.join("buffer.db"); + let ledger_path = config.data_dir.join(LEDGER_FILE_NAME); let mut ledger_handle = config .filesystem .open_file_writable(&ledger_path) @@ -737,16 +745,20 @@ where Ok(()) } + /// Returns the finalizer together with the join handle of the task draining it. Callers that + /// do not need the handle let it drop, leaving the task to run until the finalizer is dropped. #[must_use] - pub(super) fn spawn_finalizer(self: Arc) -> OrderedFinalizer { + pub(super) fn spawn_finalizer( + self: Arc, + ) -> (OrderedFinalizer, tokio::task::JoinHandle<()>) { let (finalizer, mut stream) = OrderedFinalizer::new(None); - vector_common::spawn_in_current_span(async move { + let handle = vector_common::spawn_in_current_span(async move { while let Some((_status, amount)) = stream.next().await { self.increment_pending_acks(amount); self.notify_writer_waiters(); } }); - finalizer + (finalizer, handle) } } diff --git a/lib/vector-buffers/src/variants/disk_v2/mod.rs b/lib/vector-buffers/src/variants/disk_v2/mod.rs index 6cb43421ae728..c2a40b2610cb3 100644 --- a/lib/vector-buffers/src/variants/disk_v2/mod.rs +++ b/lib/vector-buffers/src/variants/disk_v2/mod.rs @@ -243,7 +243,15 @@ where pub(crate) async fn from_config_inner( config: DiskBufferConfig, usage_handle: BufferUsageHandle, - ) -> Result<(BufferWriter, BufferReader, Arc>), BufferError> + ) -> Result< + ( + BufferWriter, + BufferReader, + Arc>, + tokio::task::JoinHandle<()>, + ), + BufferError, + > where FS: Filesystem + fmt::Debug + Clone + 'static, FS::File: Unpin, @@ -259,7 +267,7 @@ where .await .context(WriterSeekFailedSnafu)?; - let finalizer = Arc::clone(&ledger).spawn_finalizer(); + let (finalizer, finalizer_handle) = Arc::clone(&ledger).spawn_finalizer(); let mut reader = BufferReader::new(Arc::clone(&ledger), finalizer); reader @@ -269,7 +277,7 @@ where ledger.synchronize_buffer_usage(); - Ok((writer, reader, ledger)) + Ok((writer, reader, ledger, finalizer_handle)) } /// Creates a new disk buffer from the given [`DiskBufferConfig`]. @@ -291,7 +299,7 @@ where FS: Filesystem + fmt::Debug + Clone + 'static, FS::File: Unpin, { - let (writer, reader, _) = Self::from_config_inner(config, usage_handle).await?; + let (writer, reader, _, _) = Self::from_config_inner(config, usage_handle).await?; Ok((writer, reader)) } diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs b/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs index a63ed70635a40..cb40138859df3 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs @@ -34,7 +34,7 @@ async fn ack_updates_ledger_correctly() { // Create our ledger, and make sure it's empty. let ledger = Arc::new(ledger); - let finalizer = Arc::clone(&ledger).spawn_finalizer(); + let (finalizer, _) = Arc::clone(&ledger).spawn_finalizer(); assert_eq!(ledger.consume_pending_acks(), 0); // Now make sure it updates pending acks. @@ -66,7 +66,7 @@ async fn ack_wakes_reader() { // Create our ledger, as well as a future for awaiting // writer progress, and make sure it's not yet woken up. let ledger = Arc::new(ledger); - let finalizer = Arc::clone(&ledger).spawn_finalizer(); + let (finalizer, _) = Arc::clone(&ledger).spawn_finalizer(); let mut wait_for_writer = spawn(ledger.wait_for_writer()); assert_pending!(wait_for_writer.poll()); diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs b/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs index fa8ac8b11ad23..5ce56f0968b09 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs @@ -206,9 +206,10 @@ where .build() .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - Buffer::from_config_inner(config, usage_handle) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle) .await - .expect("should not fail to create buffer") + .expect("should not fail to create buffer"); + (writer, reader, ledger) } /// Creates a disk v2 buffer with all default values, but returns a handle to the buffer usage tracker. @@ -228,7 +229,7 @@ where .build() .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - let (writer, reader, ledger) = Buffer::from_config_inner(config, usage_handle.clone()) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle.clone()) .await .expect("should not fail to create buffer"); (writer, reader, ledger, usage_handle) @@ -278,9 +279,10 @@ where .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - Buffer::from_config_inner(config, usage_handle) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle) .await - .expect("should not fail to create buffer") + .expect("should not fail to create buffer"); + (writer, reader, ledger) } /// Creates a disk v2 buffer with the specified maximum record size. @@ -302,9 +304,10 @@ where .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - Buffer::from_config_inner(config, usage_handle) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle) .await - .expect("should not fail to create buffer") + .expect("should not fail to create buffer"); + (writer, reader, ledger) } /// Creates a disk v2 buffer with the specified maximum data file size. @@ -331,9 +334,10 @@ where .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - Buffer::from_config_inner(config, usage_handle) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle) .await - .expect("should not fail to create buffer") + .expect("should not fail to create buffer"); + (writer, reader, ledger) } /// Creates a disk v2 buffer with the specified write buffer size. @@ -355,9 +359,10 @@ where .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - Buffer::from_config_inner(config, usage_handle) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle) .await - .expect("should not fail to create buffer") + .expect("should not fail to create buffer"); + (writer, reader, ledger) } pub(crate) fn get_corrected_max_record_size(payload: &T) -> usize diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/action.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/action.rs index c68bbc7755ba9..e7c9eca7040f3 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/action.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/action.rs @@ -1,9 +1,13 @@ +use std::fmt; + use proptest::{ arbitrary::any, collection::{SizeRange, vec as arb_vec}, prop_compose, prop_oneof, - strategy::{Just, Strategy}, + strategy::{Just, NewTree, Strategy, ValueTree}, + test_runner::TestRunner, }; +use tokio::time::Duration; use super::record::Record; @@ -19,6 +23,39 @@ pub enum Action { FlushWrites, ReadRecord, AcknowledgeRead, + AdvanceTime(Duration), + Writeback(Writeback), + Crash, +} + +/// A stream the operating system flushed to disk on its own, ahead of any explicit flush. +/// This allows us to model OS non-determinism in durability. +#[derive(Clone, Debug)] +pub enum Writeback { + /// The ledger's memory-mapped pages reach disk, the same durable effect as an `msync`. + LedgerContents, + /// A data file's dirty tail reaches disk, with the given degree of completeness. + DataContents { file: u16, tail: TailPersistence }, + /// A single file's directory entry, its creation or deletion, reaches disk. + DirEntry(DirTarget), +} + +/// The file whose directory entry reaches disk. +#[derive(Clone, Debug)] +pub enum DirTarget { + Ledger, + DataFile(u16), +} + +/// How completely a data file's dirty tail reached disk, modelling append atomicity. +#[derive(Clone, Copy, Debug)] +pub enum TailPersistence { + /// Every dirty unit landed intact. + AllDirty, + /// A trailing unit did not land; the file is truncated at a unit boundary. + TornAtBoundary, + /// The trailing unit's size landed but its contents are garbage. + GarbageBoundary, } prop_compose! { @@ -27,18 +64,62 @@ prop_compose! { } } -fn arb_action() -> impl Strategy { +fn arb_action() -> impl Strategy> { + RemovableAction::new(arb_concrete_action()) +} + +fn arb_concrete_action() -> impl Strategy { // Overall, we want reads and writes to be equal, with slightly fewer acks, and slightly fewer // still flushes of writes. prop_oneof![ + // Generate nonzero durations only so time advances shrink to 1ms. Zero advances are valid + // no-ops, but they add noise to generated repros and are stripped if they appear. + 2 => (1u64..=20_000) + .prop_map(|millis| Action::AdvanceTime(Duration::from_millis(millis))), 3 => Just(Action::FlushWrites), 5 => Just(Action::ReadRecord), 4 => Just(Action::AcknowledgeRead), - 5 => any::<(u32, u16, u8, u8)>().prop_map(|(id, base_size, size_offset, event_count)| { + 3 => arb_writeback().prop_map(Action::Writeback), + 1 => Just(Action::Crash), + // The model is checking durability of successful writes; empty records are immediate + // validation errors and make those failures shrink into unrelated error-path calls. + 5 => arb_write_record(1u32..=6), + ] +} + +fn arb_write_record(event_count: S) -> impl Strategy +where + S: Strategy, +{ + (any::<(u32, u16, u8)>(), event_count).prop_map( + |((id, base_size, size_offset), event_count)| { let size = u32::from(base_size) + u32::from(size_offset); - let event_count = event_count % 7; - Action::WriteRecord(Record::new(id, size, u32::from(event_count))) - }), + Action::WriteRecord(Record::new(id, size, event_count)) + }, + ) +} + +fn arb_tail_persistence() -> impl Strategy { + prop_oneof![ + Just(TailPersistence::AllDirty), + Just(TailPersistence::TornAtBoundary), + Just(TailPersistence::GarbageBoundary), + ] +} + +fn arb_dir_target() -> impl Strategy { + prop_oneof![ + Just(DirTarget::Ledger), + (0u16..5).prop_map(DirTarget::DataFile) + ] +} + +fn arb_writeback() -> impl Strategy { + prop_oneof![ + Just(Writeback::LedgerContents), + (0u16..5, arb_tail_persistence()) + .prop_map(|(file, tail)| Writeback::DataContents { file, tail }), + arb_dir_target().prop_map(Writeback::DirEntry), ] } @@ -46,7 +127,7 @@ pub fn arb_actions(len_range: R) -> impl Strategy> where R: Into, { - arb_vec(arb_action(), len_range).prop_map(sanitize_raw_actions) + ActionList::new(arb_vec(arb_action(), len_range)) } /// Sanitizes raw actions generated by proptest into a valid sequence. @@ -79,7 +160,236 @@ pub fn sanitize_raw_actions(actions: Vec) -> Vec { unacked_events -= 1; Action::AcknowledgeRead }), + Action::AdvanceTime(duration) => Some(Action::AdvanceTime(duration)), Action::FlushWrites => Some(Action::FlushWrites), + // The OS flushing a stream is always a valid thing to happen. + Action::Writeback(w) => Some(Action::Writeback(w)), + // A crash can happen at any point and invalidates outstanding read/ack accounting, + // since the reader restarts from its last durable position after reopen. + Action::Crash => { + unread_event_count += unacked_events; + unacked_events = 0; + Some(Action::Crash) + } }) .collect::>() } + +#[derive(Clone)] +struct RemovableAction { + inner: S, +} + +impl RemovableAction { + const fn new(inner: S) -> Self { + Self { inner } + } +} + +impl fmt::Debug for RemovableAction +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RemovableAction") + .field("inner", &self.inner) + .finish() + } +} + +impl Strategy for RemovableAction +where + S: Strategy, +{ + type Tree = RemovableActionTree; + type Value = Option; + + fn new_tree(&self, runner: &mut TestRunner) -> NewTree { + self.inner.new_tree(runner).map(RemovableActionTree::new) + } +} + +#[derive(Clone, Debug)] +struct RemovableActionTree { + inner: T, + removed: bool, + removal_rejected: bool, +} + +impl RemovableActionTree { + const fn new(inner: T) -> Self { + Self { + inner, + removed: false, + removal_rejected: false, + } + } +} + +impl ValueTree for RemovableActionTree +where + T: ValueTree, +{ + type Value = Option; + + fn current(&self) -> Self::Value { + (!self.removed).then(|| self.inner.current()) + } + + fn simplify(&mut self) -> bool { + // The built-in Option shrinker only tries None after the inner value is done shrinking. + // Retrying removal after each accepted inner shrink keeps no-op time advances out of repros. + if !self.removed && !self.removal_rejected { + self.removed = true; + return true; + } + + if !self.removed && self.inner.simplify() { + self.removal_rejected = false; + return true; + } + + false + } + + fn complicate(&mut self) -> bool { + if self.removed { + self.removed = false; + self.removal_rejected = true; + return true; + } + + if self.inner.complicate() { + self.removal_rejected = false; + return true; + } + + false + } +} + +#[derive(Clone)] +struct ActionList { + inner: S, +} + +impl ActionList { + const fn new(inner: S) -> Self { + Self { inner } + } +} + +impl fmt::Debug for ActionList +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ActionList") + .field("inner", &self.inner) + .finish() + } +} + +impl Strategy for ActionList +where + S: Strategy>>, +{ + type Tree = ActionListTree; + type Value = Vec; + + fn new_tree(&self, runner: &mut TestRunner) -> NewTree { + self.inner.new_tree(runner).map(ActionListTree::new) + } +} + +#[derive(Clone, Debug)] +struct ActionListTree { + inner: T, + strip_zero_advances: bool, + zero_strip_rejected: bool, + last_simplification: Option, +} + +impl ActionListTree { + const fn new(inner: T) -> Self { + Self { + inner, + strip_zero_advances: false, + zero_strip_rejected: false, + last_simplification: None, + } + } +} + +#[derive(Clone, Copy, Debug)] +enum ActionListSimplification { + StripZeroAdvances, + Inner, +} + +impl ValueTree for ActionListTree +where + T: ValueTree>>, +{ + type Value = Vec; + + fn current(&self) -> Self::Value { + raw_actions_to_actions(self.inner.current(), self.strip_zero_advances) + } + + fn simplify(&mut self) -> bool { + // If a zero-time advance appears during shrinking or future generator changes, remove it + // as a no-op before the vector shrinker spends budget elsewhere. + if !self.strip_zero_advances + && !self.zero_strip_rejected + && has_zero_advance(self.inner.current()) + { + self.strip_zero_advances = true; + self.last_simplification = Some(ActionListSimplification::StripZeroAdvances); + return true; + } + + if self.inner.simplify() { + self.zero_strip_rejected = false; + self.last_simplification = Some(ActionListSimplification::Inner); + return true; + } + + self.last_simplification = None; + false + } + + fn complicate(&mut self) -> bool { + match self.last_simplification.take() { + Some(ActionListSimplification::StripZeroAdvances) => { + self.strip_zero_advances = false; + self.zero_strip_rejected = true; + true + } + Some(ActionListSimplification::Inner) if self.inner.complicate() => { + self.zero_strip_rejected = false; + true + } + Some(ActionListSimplification::Inner) | None => false, + } + } +} + +fn raw_actions_to_actions( + raw_actions: Vec>, + strip_zero_advances: bool, +) -> Vec { + let actions = raw_actions.into_iter().flatten().filter(|action| { + !(strip_zero_advances + && matches!(action, Action::AdvanceTime(duration) if *duration == Duration::ZERO)) + }); + + sanitize_raw_actions(actions.collect::>()) +} + +fn has_zero_advance(raw_actions: Vec>) -> bool { + raw_actions + .into_iter() + .flatten() + .any(|action| matches!(action, Action::AdvanceTime(duration) if duration == Duration::ZERO)) +} diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/common.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/common.rs index abbe785b9caef..facc600b3bede 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/common.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/common.rs @@ -1,10 +1,14 @@ use std::time::Duration; -use proptest::{arbitrary::any, strategy::Strategy}; +use proptest::strategy::{Just, Strategy}; -use super::{filesystem::TestFilesystem, record::Record}; +use super::{ + filesystem::{TestFilesystem, arb_fs_atomicity}, + record::Record, +}; use crate::variants::disk_v2::{ - BufferReader, BufferWriter, DiskBufferConfig, DiskBufferConfigBuilder, ReaderError, WriterError, + BufferReader, BufferWriter, DiskBufferConfigBuilder, ReaderError, WriterError, + common::MINIMUM_MAX_RECORD_SIZE, ledger::LEDGER_LEN, }; pub type TestReader = BufferReader; @@ -17,6 +21,10 @@ pub type WriterResult = Result>; // buffer overall, which exercises the "write this record directly to the wrapped writer" logic that // exists in `tokio::io::BufWriter` itself. pub const TEST_WRITE_BUFFER_SIZE: usize = 60 * 1024; +const MODEL_MAX_RAW_BUFFER_SIZE: u64 = 4_194_240; +const MODEL_MAX_DATA_FILE_SIZE: u64 = 131_070; +const MODEL_MAX_RECORD_SIZE: usize = 65_535; +const MIN_VALID_MAX_RECORD_SIZE: usize = MINIMUM_MAX_RECORD_SIZE + 1; /// Result of applying an action to the model. /// @@ -32,57 +40,73 @@ pub enum Progress { Blocked, } -pub fn arb_buffer_config() -> impl Strategy> { - any::<(u16, u16, u16)>() - .prop_map(|(n1, n2, n3)| { - let max_buffer_size = u64::from(n1) * 64; - let max_data_file_size = u64::from(n2) * 2; - let max_record_size = n3.into(); - - let mut path = std::env::temp_dir(); - path.push("vector-disk-v2-model"); +pub fn arb_buffer_config() -> impl Strategy> { + // Generate dependent limits directly as minimum + slack, rather than generating invalid + // triples and rejecting them, so each max_* field has a clean path toward its lower bound. + ( + MIN_VALID_MAX_RECORD_SIZE..=MODEL_MAX_RECORD_SIZE, + 1u64..=60, + arb_fs_atomicity(), + ) + .prop_flat_map(|(max_record_size, flush_interval_secs, atomicity)| { + let min_data_file_size = + u64::try_from(max_record_size).expect("model max record size must fit in u64"); + let max_data_file_size_slack = MODEL_MAX_DATA_FILE_SIZE - min_data_file_size; - DiskBufferConfigBuilder::from_path(path) - .max_buffer_size(max_buffer_size) - .max_data_file_size(max_data_file_size) - .max_record_size(max_record_size) - .write_buffer_size(TEST_WRITE_BUFFER_SIZE) - // This really only affects how often we flush the ledger, because we always `flush` - // after writes to ensure our buffered writes make it to the data files for the - // readers to make progress, and we're not testing anything about whether or not the - // ledger makes it to disk durably. - .flush_interval(Duration::from_secs(10)) - .filesystem(TestFilesystem::default()) + ( + Just((max_record_size, flush_interval_secs, atomicity)), + 0u64..=max_data_file_size_slack, + ) }) - .prop_filter_map( - "maximum size limits were too high, or zero", - validate_buffer_config, + .prop_flat_map( + |((max_record_size, flush_interval_secs, atomicity), max_data_file_size_slack)| { + let min_data_file_size = + u64::try_from(max_record_size).expect("model max record size must fit in u64"); + let max_data_file_size = min_data_file_size + max_data_file_size_slack; + let min_buffer_size = minimum_raw_buffer_size(max_data_file_size); + let max_buffer_size_slack = MODEL_MAX_RAW_BUFFER_SIZE - min_buffer_size; + + ( + Just(( + max_record_size, + max_data_file_size, + flush_interval_secs, + atomicity, + )), + 0u64..=max_buffer_size_slack, + ) + }, + ) + .prop_map( + |( + (max_record_size, max_data_file_size, flush_interval_secs, atomicity), + max_buffer_size_slack, + )| { + let max_buffer_size = + minimum_raw_buffer_size(max_data_file_size) + max_buffer_size_slack; + let mut path = std::env::temp_dir(); + path.push("vector-disk-v2-model"); + + DiskBufferConfigBuilder::from_path(path) + .max_buffer_size(max_buffer_size) + .max_data_file_size(max_data_file_size) + .max_record_size(max_record_size) + .write_buffer_size(TEST_WRITE_BUFFER_SIZE) + // This really only affects how often we flush the ledger, because we always `flush` + // after writes to ensure our buffered writes make it to the data files for the + // readers to make progress, and we're not testing anything about whether or not the + // ledger makes it to disk durably. + .flush_interval(Duration::from_secs(flush_interval_secs)) + .filesystem(TestFilesystem::with_atomicity(atomicity)) + }, ) } -/// Validates the given buffer config builder and generates a resulting configuration. -/// -/// If the builder has been configured incorrectly (i.e. zero values), or if the configuration is -/// valid but has values that are not appropriate for being used under test (i.e. values are way too -/// large and would balloon the run-time of the test) then `None` is returned. -/// -/// Otherwise, `Some(DiskBufferConfig)` is returned. -pub fn validate_buffer_config( - builder: DiskBufferConfigBuilder, -) -> Option> { - builder - .build() - // If building the configuration failed, just return `None`. - .ok() - .filter(|config| { - // Limit our buffer config to the following: - // - max buffer size of 64MB - // - max data file size of 2MB - // - max record size of 1MB - // - // Otherwise, the model just runs uselessly slow. - config.max_buffer_size <= 64_000_000 - && config.max_data_file_size <= 2_000_000 - && config.max_record_size <= 1_000_000 - }) +fn minimum_raw_buffer_size(max_data_file_size: u64) -> u64 { + let ledger_len = u64::try_from(LEDGER_LEN).expect("ledger length must fit in u64"); + + max_data_file_size + .checked_mul(2) + .and_then(|doubled| doubled.checked_add(ledger_len)) + .expect("model max data file size must leave room for the minimum buffer size") } diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs index a88204d7f7a9c..fa5f44d5af516 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs @@ -1,6 +1,6 @@ use std::{ cmp, - collections::HashMap, + collections::{HashMap, HashSet}, fmt, io, path::{Path, PathBuf}, pin::Pin, @@ -10,11 +10,53 @@ use std::{ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use proptest::{ + prop_oneof, + strategy::{Just, Strategy}, +}; + +use super::action::TailPersistence; use crate::variants::disk_v2::{ Filesystem, io::{AsyncFile, Metadata, ReadableMemoryMap, WritableMemoryMap}, }; +/// Byte written into a garbage boundary unit. Arbitrary value chosen to +/// distinguish from bytes already written by the model. +const GARBAGE_BYTE: u8 = 0xFF; + +/// The largest write a filesystem persists atomically, setting the granularity at which a torn +/// writeback tears. +#[derive(Clone, Copy, Debug, Default)] +pub enum FsAtomicity { + /// Writes are atomic per 512-byte sector. Torn writes lands on a 512-byte + /// boundary. + #[default] + Sector, + /// Writes are atomic per 4096-byte block. Torn write lands on a 4096-byte + /// boundary. + Block, +} + +impl FsAtomicity { + fn tear_granularity(self) -> usize { + match self { + FsAtomicity::Sector => 512, + FsAtomicity::Block => 4096, + } + } +} + +pub fn arb_fs_atomicity() -> impl Strategy { + prop_oneof![Just(FsAtomicity::Sector), Just(FsAtomicity::Block)] +} + +/// The largest `unit` boundary strictly below `len` — i.e. the offset that drops the trailing +/// (possibly partial) unit. +fn unit_floor_below(len: usize, unit: usize) -> usize { + (len.saturating_sub(1) / unit) * unit +} + fn io_err_already_exists() -> io::Error { io::Error::new(io::ErrorKind::AlreadyExists, "file already exists") } @@ -28,7 +70,10 @@ fn io_err_permission_denied() -> io::Error { } struct FileInner { + // The live contents, as seen by the running reader and writer. buf: Option>, + // The contents known to have reached disk via a barrier. + durable: Vec, } impl FileInner { @@ -40,12 +85,17 @@ impl FileInner { let previous = self.buf.replace(buf); assert!(previous.is_none()); } + + fn persist(&mut self) { + self.durable = self.buf.as_ref().expect("file buf consumed").clone(); + } } impl Default for FileInner { fn default() -> Self { Self { buf: Some(Vec::new()), + durable: Vec::new(), } } } @@ -79,6 +129,19 @@ impl TestFile { } } + // Rebuilds a file from its durable snapshot after a crash. The live contents equal the + // durable contents, since that is all that reached disk. + fn from_durable(durable: Vec) -> Self { + Self { + inner: Arc::new(Mutex::new(FileInner { + buf: Some(durable.clone()), + durable, + })), + is_writable: false, + read_pos: 0, + } + } + fn set_readable(&mut self) { self.is_writable = false; } @@ -141,6 +204,8 @@ impl ReadableMemoryMap for TestMmap {} impl WritableMemoryMap for TestMmap { fn flush(&self) -> io::Result<()> { + let buf = self.buf.as_ref().expect("mmap buf consumed"); + self.inner.lock().expect("poisoned").durable = buf.clone(); Ok(()) } } @@ -217,6 +282,7 @@ impl AsyncFile for TestFile { } async fn sync_all(&self) -> io::Result<()> { + self.inner.lock().expect("poisoned").persist(); Ok(()) } } @@ -224,7 +290,10 @@ impl AsyncFile for TestFile { // Inner state of the test filesystem. #[derive(Debug, Default)] struct FilesystemInner { + atomicity: FsAtomicity, files: HashMap, + // Paths whose directory entry has reached disk and so survive a crash. + durable_names: HashSet, } impl FilesystemInner { @@ -272,6 +341,51 @@ impl FilesystemInner { fn delete_file(&mut self, path: &Path) -> bool { self.files.remove(path).is_some() } + + // Models the OS flushing a data file's dirty tail to disk ahead of an + // explicit `sync_all`. + fn writeback_data(&mut self, path: &Path, tail: TailPersistence) { + let Some(file) = self.files.get(path) else { + return; + }; + let unit = self.atomicity.tear_granularity(); + let mut inner = file.inner.lock().expect("poisoned"); + let live = inner.buf.as_ref().expect("data file buf consumed").clone(); + let durable_len = inner.durable.len(); + if live.len() <= durable_len { + return; + } + let boundary = unit_floor_below(live.len(), unit).max(durable_len); + inner.durable = match tail { + TailPersistence::AllDirty => live, + TailPersistence::TornAtBoundary => live[..boundary].to_vec(), + TailPersistence::GarbageBoundary => { + let mut durable = live[..boundary].to_vec(); + durable.resize(live.len(), GARBAGE_BYTE); + durable + } + }; + } + + // Models the OS flushing one file's directory entry to disk. + fn writeback_dir_entry(&mut self, path: &Path) { + if self.files.contains_key(path) { + self.durable_names.insert(path.to_owned()); + } else { + self.durable_names.remove(path); + } + } + + // Discards everything that has not been forced durable by a barrier. + fn crash(&mut self) { + let live = std::mem::take(&mut self.files); + for (path, file) in live { + if self.durable_names.contains(&path) { + let durable = file.inner.lock().expect("poisoned").durable.clone(); + self.files.insert(path, TestFile::from_durable(durable)); + } + } + } } /// A `Filesystem` that tracks files in memory and allows introspection from the outside. @@ -279,10 +393,47 @@ pub struct TestFilesystem { inner: Arc>, } +impl TestFilesystem { + /// A filesystem with the given append atomicity. + pub fn with_atomicity(atomicity: FsAtomicity) -> Self { + Self { + inner: Arc::new(Mutex::new(FilesystemInner { + atomicity, + ..Default::default() + })), + } + } + + /// Models a hard crash, Discarding everything not forced durable by a + /// barrier. + pub fn crash(&self) { + self.inner.lock().expect("poisoned").crash(); + } + + /// Models the OS flushing a data file's dirty tail ahead of an explicit + /// `sync_all`. + pub fn writeback_data(&self, path: &Path, tail: TailPersistence) { + self.inner + .lock() + .expect("poisoned") + .writeback_data(path, tail); + } + + /// Models the OS flushing one file's directory entry ahead of a directory + /// fsync. + pub fn writeback_dir_entry(&self, path: &Path) { + self.inner + .lock() + .expect("poisoned") + .writeback_dir_entry(path); + } +} + impl fmt::Debug for TestFilesystem { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let inner = self.inner.lock().expect("poisoned"); f.debug_struct("TestFilesystem") + .field("atomicity", &inner.atomicity) .field("files", &inner.files) .finish() } @@ -290,7 +441,9 @@ impl fmt::Debug for TestFilesystem { impl Clone for TestFilesystem { fn clone(&self) -> Self { - Self::default() + Self { + inner: Arc::clone(&self.inner), + } } } diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs index ebab8bd1452e8..80a75779263c0 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs @@ -26,7 +26,7 @@ use crate::{ mod action; use self::{ - action::{Action, arb_actions}, + action::{Action, DirTarget, Writeback, arb_actions}, record::EncodeError, }; @@ -807,27 +807,20 @@ impl BufferModel { proptest! { #[test] - fn model_check(mut config in arb_buffer_config(), actions in arb_actions(0..64)) { + fn model_check(mut builder in arb_buffer_config(), actions in arb_actions(0..64)) { let rt = Builder::new_current_thread() .enable_all() + .start_paused(true) .build() .expect("should not fail to build runtime"); let _a = install_tracing_helpers(); - info!( - actions = actions.len(), - max_buffer_size = config.max_buffer_size, - max_data_file_size = config.max_data_file_size, - max_record_size = config.max_record_size, - "Starting model.", - ); - // We generate a new temporary directory and overwrite the data directory in the buffer - // configuration. This allows us to use a utility that will generate a random directory each - // time -- parallel runs of this test can't clobber each other anymore -- but also ensure - // that the directory is cleaned up when the test run is over. + // We generate a new temporary directory and overwrite the data directory in the builder. + // This gives each run a unique directory -- parallel runs can't clobber each other -- that + // is cleaned up when the run ends. let buf_dir = TempDir::with_prefix("vector-buffers-disk-v2-model").expect("creating temp dir should never fail"); - config.data_dir = buf_dir.path().to_path_buf(); + builder.data_dir = buf_dir.path().to_path_buf(); rt.block_on(async move { // This model tries to encapsulate all of the behavior of the disk buffer v2 @@ -836,7 +829,7 @@ proptest! { // // At the very top, we have our input actions, which are mapped one-to-one with the // possible actions that can influence the disk buffer: reading records, writing - // records, flushing writes, and acknowledging reads. + // records, flushing writes, acknowledging reads, and advancing time. // // After that, we have the model itself, which essentially a barebones re-implementation // of the disk buffer itself without any asynchrony, rich error handling, etc. We scope @@ -868,10 +861,21 @@ proptest! { // should be tried again before pulling a new action from the remaining actions in the // input sequence, and so on. Effectively, we can deterministically drive asynchronous // actions that are coupled to one another, in a lockstep fashion, with the model. + // Each buffer open rebuilds the config from the builder, the same path a restarting + // process takes. The model borrows this first one before it is handed to the open. + let config = builder.clone().build().expect("validated builder should build"); + info!( + actions = actions.len(), + max_buffer_size = config.max_buffer_size, + max_data_file_size = config.max_data_file_size, + max_record_size = config.max_record_size, + "Starting model.", + ); let mut model = BufferModel::from_config(&config); + let fs = builder.filesystem.clone(); let usage_handle = BufferUsageHandle::noop(); - let (writer, reader, ledger) = + let (writer, reader, mut ledger, mut finalizer_handle) = Buffer::::from_config_inner(config, usage_handle) .await .expect("should not fail to build buffer"); @@ -905,9 +909,66 @@ proptest! { // run against the model. If it's an action that may be asynchronous/blocked on // progress of another component, we try it later on, which lets us deduplicate some code. if let Some(action) = sequencer.trigger_next_runnable_action() { - if let Action::AcknowledgeRead = action { + match action { // Acknowledgements are based on atomics, so they never wait asynchronously. - model.acknowledge_read(); + Action::AcknowledgeRead => model.acknowledge_read(), + Action::AdvanceTime(duration) if duration > tokio::time::Duration::ZERO => { + // `advance(0)` still yields internally, which makes zero-time advances + // affect scheduling and prevents them from shrinking out as no-ops. + tokio::time::advance(duration).await; + tokio::task::yield_now().await; + } + // The OS flushes a stream to disk ahead of any explicit flush. + Action::Writeback(writeback) => match writeback { + Writeback::LedgerContents => { + ledger.flush().expect("ledger flush should not fail in the model"); + } + Writeback::DataContents { file, tail } => { + fs.writeback_data(&ledger.get_data_file_path(file), tail); + } + Writeback::DirEntry(target) => { + let path = match target { + DirTarget::Ledger => ledger.ledger_path(), + DirTarget::DataFile(file) => ledger.get_data_file_path(file), + }; + fs.writeback_dir_entry(&path); + } + }, + Action::Crash => { + fs.crash(); + // Tear the SUT down before reopening. + let remaining = sequencer.into_remaining_actions(); + drop(ledger); + finalizer_handle.await.expect("finalizer task panicked"); + // Reopen the buffer the way a restarting process + // would. Rebuild the config from the builder, + // reopening the buffer the way a restarting process + // would. + let config = builder.clone().build().expect("validated builder should build"); + let (writer, reader, new_ledger, new_handle) = + match Buffer::::from_config_inner( + config, + BufferUsageHandle::noop(), + ) + .await + { + Ok(parts) => parts, + Err(e) => { + prop_assert!( + false, + "buffer failed to reopen after crash: {:?}", + e + ); + unreachable!() + } + }; + + ledger = new_ledger; + finalizer_handle = new_handle; + sequencer = ActionSequencer::new(remaining, reader, writer); + closed_writers = false; + } + _ => {} } } else { let mut made_progress = false; diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/sequencer.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/sequencer.rs index 0e77c78b66f4e..6b8cd5c7882fc 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/sequencer.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/sequencer.rs @@ -235,6 +235,13 @@ impl ActionSequencer { Action::WriteRecord(_) | Action::FlushWrites => allow_write, Action::ReadRecord => allow_read, Action::AcknowledgeRead => !self.unacked_events.is_empty(), + Action::AdvanceTime(_) => true, + // A writeback and a crash both act on the filesystem outside the + // reader/writer. They run only when neither has an operation in + // flight, keeping the model in lockstep. + Action::Writeback(_) | Action::Crash => { + allow_read && (allow_write || self.write_state.is_closed()) + } }) } @@ -253,9 +260,11 @@ impl ActionSequencer { /// Likewise, we can't execute another read if there's an in-flight read. Acknowledgements /// always happen out-of-band, though, and so are always eligible. pub fn trigger_next_runnable_action(&mut self) -> Option { - let pos = self.get_next_runnable_action(); + loop { + let action = self + .get_next_runnable_action() + .map(|i| self.actions.remove(i))?; - if let Some(action) = pos.map(|i| self.actions.remove(i)) { match action { Action::WriteRecord(record) => { assert!( @@ -264,7 +273,7 @@ impl ActionSequencer { ); self.write_state.transition_to_write(record.clone()); - Some(Action::WriteRecord(record)) + return Some(Action::WriteRecord(record)); } a @ Action::FlushWrites => { assert!( @@ -273,7 +282,7 @@ impl ActionSequencer { ); self.write_state.transition_to_flush(); - Some(a) + return Some(a); } a @ Action::ReadRecord => { assert!( @@ -282,18 +291,32 @@ impl ActionSequencer { ); self.read_state.transition_to_read(); - Some(a) + return Some(a); } Action::AcknowledgeRead => { drop(self.unacked_events.pop_front().expect("FIXME")); - Some(Action::AcknowledgeRead) + return Some(Action::AcknowledgeRead); + } + Action::AdvanceTime(duration) if duration == tokio::time::Duration::ZERO => { + // Zero-time advances are valid no-ops. Returning them would still create a + // scheduler step in the outer harness, making them sticky during shrinking. } + Action::AdvanceTime(duration) => return Some(Action::AdvanceTime(duration)), + // Applied to the filesystem only, reader and writer are idle, + // no in-flight state to transition to. + a @ (Action::Writeback(_) | Action::Crash) => return Some(a), } - } else { - None } } + /// Consumes the sequencer, returning the actions it has not yet triggered. + /// + /// Used to hand the remaining work to a freshly reopened sequencer after a crash, dropping + /// the old reader and writer so the buffer lock is released. + pub fn into_remaining_actions(self) -> Vec { + self.actions + } + /// Gets the result of pending write action, if one is in-flight. /// /// If a write action (either a record write or a flush) is in-flight, we attempt to poll it to diff --git a/tests/antithesis/harness/Cargo.toml b/tests/antithesis/harness/Cargo.toml index 3120ca061b94c..732b6b84145d0 100644 --- a/tests/antithesis/harness/Cargo.toml +++ b/tests/antithesis/harness/Cargo.toml @@ -12,6 +12,7 @@ publish = false antithesis_sdk = { workspace = true, features = ["full"] } axum = { workspace = true, features = ["http1", "tokio"] } clap = { workspace = true, features = ["derive"] } +hex = { version = "0.4.3" } reqwest = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["rt", "macros", "net", "time"] } diff --git a/tests/antithesis/harness/src/bin/eventually_conservation.rs b/tests/antithesis/harness/src/bin/eventually_conservation.rs index 452a1ef898458..2d11295c12bd1 100644 --- a/tests/antithesis/harness/src/bin/eventually_conservation.rs +++ b/tests/antithesis/harness/src/bin/eventually_conservation.rs @@ -10,7 +10,7 @@ #[cfg(target_os = "linux")] extern crate antithesis_instrumentation; -use antithesis_harness::payload_field; +use antithesis_harness::{claim, post_event}; use antithesis_sdk::{ antithesis_init, assert_always, assert_always_less_than_or_equal_to, assert_sometimes_greater_than, assert_unreachable, @@ -98,26 +98,6 @@ async fn all_healthy(client: &reqwest::Client, metrics_urls: &[String]) -> bool true } -async fn claim(client: &reqwest::Client, oracle_url: &str) -> Option { - let resp = client - .post(format!("{oracle_url}/claim")) - .timeout(time::Duration::from_secs(10)) - .send() - .await - .ok()?; - resp.text().await.ok()?.trim().parse().ok() -} - -async fn post_probe(client: &reqwest::Client, source_url: &str, id: u64) -> bool { - // Same deterministic payload as the producer, so the probe's delivery passes - // the oracle's content check instead of counting as corruption. - let event = json!([{ "id": id, "data": payload_field(id) }]); - matches!( - client.post(source_url).timeout(time::Duration::from_secs(10)).json(&event).send().await, - Ok(resp) if resp.status().is_success() - ) -} - #[tokio::main(flavor = "current_thread")] async fn main() { antithesis_init(); @@ -239,7 +219,7 @@ async fn main() { while !progressed && time::Instant::now() < deadline { if probe.is_none() { if let Some(id) = claim(&client, &oracle_url).await { - if post_probe(&client, &source_url, id).await { + if post_event(&client, &source_url, id, time::Duration::from_secs(10)).await { probe = Some(id); } } diff --git a/tests/antithesis/harness/src/bin/parallel_driver_produce.rs b/tests/antithesis/harness/src/bin/parallel_driver_produce.rs index 91b04494a0ca9..01eb518f61a42 100644 --- a/tests/antithesis/harness/src/bin/parallel_driver_produce.rs +++ b/tests/antithesis/harness/src/bin/parallel_driver_produce.rs @@ -9,7 +9,7 @@ #[cfg(target_os = "linux")] extern crate antithesis_instrumentation; -use antithesis_harness::payload_field; +use antithesis_harness::{claim, post_event, report_acked}; use antithesis_sdk::{antithesis_init, assert_reachable, assert_unreachable}; use clap::Parser; use serde_json::json; @@ -25,48 +25,6 @@ struct Args { oracle_url: String, } -/// POST one event to the source. Ok(2xx) means the pipeline took end-to-end -/// responsibility for the event (with e2e acks enabled). -async fn post_event( - client: &reqwest::Client, - source_url: &str, - id: u64, - timeout: time::Duration, -) -> bool { - // The payload is a deterministic function of the id, so every retry re-sends - // the exact same record and the oracle can recompute the expected bytes. - let event = json!([{ "id": id, "data": payload_field(id) }]); - matches!( - client.post(source_url).timeout(timeout).json(&event).send().await, - Ok(resp) if resp.status().is_success() - ) -} - -/// Claim one fresh id from the oracle. -async fn claim(client: &reqwest::Client, oracle_url: &str) -> Option { - let resp = client - .post(format!("{oracle_url}/claim")) - .timeout(time::Duration::from_secs(10)) - .send() - .await - .ok()?; - resp.text().await.ok()?.trim().parse().ok() -} - -/// Tell the oracle the pipeline acked this id, so it must come back. Returns -/// whether the oracle recorded the obligation. -async fn report_acked(client: &reqwest::Client, oracle_url: &str, id: u64) -> bool { - matches!( - client - .post(format!("{oracle_url}/acked")) - .timeout(time::Duration::from_secs(10)) - .body(id.to_string()) - .send() - .await, - Ok(resp) if resp.status().is_success() - ) -} - #[tokio::main(flavor = "current_thread")] async fn main() { antithesis_init(); diff --git a/tests/antithesis/harness/src/lib.rs b/tests/antithesis/harness/src/lib.rs index 9784cc367d892..6f176d99f55f7 100644 --- a/tests/antithesis/harness/src/lib.rs +++ b/tests/antithesis/harness/src/lib.rs @@ -2,6 +2,9 @@ //! `scenarios/vector_to_vector_e2e_disk`) owns its own test-command bins. When two //! scenarios need the same HTTP or oracle helpers, factor them into modules here. +use std::time::Duration; + +use serde_json::json; use vector_buffers::WRITE_BUFFER_SIZE_V2; /// Payload lengths in bytes, one per id class. Sized around the disk_v2 write @@ -50,30 +53,56 @@ pub fn payload_for(id: u64) -> Vec { /// without escaping concerns, and a corruption of the bytes shows up as a hex /// mismatch. pub fn payload_field(id: u64) -> String { - let bytes = payload_for(id); - let mut s = String::with_capacity(bytes.len() * 2); - for b in bytes { - s.push(char::from_digit((b >> 4) as u32, 16).unwrap()); - s.push(char::from_digit((b & 0x0f) as u32, 16).unwrap()); - } - s + hex::encode(payload_for(id)) } /// Decode the hex produced by [`payload_field`] back to bytes. Returns `None` on /// any non-hex or odd-length input so the oracle can tell a mangled field from a /// content mismatch. pub fn decode_payload_field(field: &str) -> Option> { - if !field.len().is_multiple_of(2) { - return None; - } - let mut out = Vec::with_capacity(field.len() / 2); - let mut bytes = field.bytes(); - while let (Some(hi), Some(lo)) = (bytes.next(), bytes.next()) { - let hi = (hi as char).to_digit(16)?; - let lo = (lo as char).to_digit(16)?; - out.push(((hi << 4) | lo) as u8); - } - Some(out) + hex::decode(field).ok() +} + +/// Claim one fresh id from the oracle. `None` if the oracle is unreachable. +pub async fn claim(client: &reqwest::Client, oracle_url: &str) -> Option { + let resp = client + .post(format!("{oracle_url}/claim")) + .timeout(Duration::from_secs(10)) + .send() + .await + .ok()?; + resp.text().await.ok()?.trim().parse().ok() +} + +/// POST one event to the source. `true` on a 2xx, meaning the pipeline took +/// end-to-end responsibility for the event. The payload is a deterministic +/// function of the id, so every retry re-sends the exact same record and the +/// oracle can recompute the expected bytes. +pub async fn post_event( + client: &reqwest::Client, + source_url: &str, + id: u64, + timeout: Duration, +) -> bool { + let event = json!([{ "id": id, "data": payload_field(id) }]); + matches!( + client.post(source_url).timeout(timeout).json(&event).send().await, + Ok(resp) if resp.status().is_success() + ) +} + +/// Tell the oracle the pipeline acked this id, so it must come back. `true` if +/// the oracle recorded the obligation. +pub async fn report_acked(client: &reqwest::Client, oracle_url: &str, id: u64) -> bool { + matches!( + client + .post(format!("{oracle_url}/acked")) + .timeout(Duration::from_secs(10)) + .body(id.to_string()) + .send() + .await, + Ok(resp) if resp.status().is_success() + ) } #[cfg(test)]