diff --git a/Cargo.lock b/Cargo.lock index fb525def3b832..847040a8e12dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -231,6 +231,7 @@ dependencies = [ "antithesis_sdk", "axum 0.6.20", "clap", + "hex", "reqwest 0.11.26", "serde_json", "tokio", diff --git a/lib/vector-buffers/Cargo.toml b/lib/vector-buffers/Cargo.toml index c8d57a8e50862..d493e65e66664 100644 --- a/lib/vector-buffers/Cargo.toml +++ b/lib/vector-buffers/Cargo.toml @@ -54,6 +54,7 @@ rand.workspace = true serde_json.workspace = true serde_yaml.workspace = true temp-dir = "0.1.16" +tokio = { workspace = true, features = ["test-util"] } tokio-test.workspace = true tracing-fluent-assertions = { version = "0.3" } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "registry", "std", "ansi"] } diff --git a/lib/vector-buffers/proptest-regressions/variants/disk_v2/tests/model/mod.txt b/lib/vector-buffers/proptest-regressions/variants/disk_v2/tests/model/mod.txt index 381d1695a05fb..8fda5dbbf20a6 100644 --- a/lib/vector-buffers/proptest-regressions/variants/disk_v2/tests/model/mod.txt +++ b/lib/vector-buffers/proptest-regressions/variants/disk_v2/tests/model/mod.txt @@ -13,3 +13,4 @@ cc 99f23327ef7e759a9cb6424a3fa3495aaf4f1c7ef23145b5f5d72eb6cc5e0173 # shrinks to cc 54be1ee096dc5169013a1cd8a114f23c5aca7a209a663d783c7f74b4c4ff4746 # shrinks to mut config = DiskBufferConfig { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: 153516, max_data_file_size: 14228, max_record_size: 14214, write_buffer_size: 61440, flush_interval: 10s, filesystem: TestFilesystem { inner: Mutex { data: FilesystemInner { files: {} } } } }, actions = [WriteRecord(Record { id: 0, size: 14163, event_count: 1 })] cc d812fdee8da4aae904579f08cadae1b585944da58040cad4f18544a87faf240e # shrinks to mut config = DiskBufferConfig { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: 53736, max_data_file_size: 46808, max_record_size: 28912, write_buffer_size: 61440, flush_interval: 10s, filesystem: TestFilesystem { files: {} } }, actions = [WriteRecord(Record { id: 0, size: 23565, event_count: 1, aligned_len: 23632 }), WriteRecord(Record { id: 0, size: 18445, event_count: 1, aligned_len: 18512 }), WriteRecord(Record { id: 0, size: 11557, event_count: 1, aligned_len: 11616 })] cc b08fb47ac81e9148d8dc6a4d3332e92c21751685be9da9bd0c5d962ad7436285 # shrinks to mut config = DiskBufferConfig { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: 69602, max_data_file_size: 2462, max_record_size: 2450, write_buffer_size: 61440, flush_interval: 10s, filesystem: TestFilesystem { files: {} } }, actions = [WriteRecord(Record { id: 0, size: 2389, event_count: 1, archived_len: 2464 })] +cc 5099a6694efc56a1ba467216d9cec28af741ee63d394f26fec03c9a57f2b973b # shrinks to mut builder = DiskBufferConfigBuilder { data_dir: "/tmp/vector-disk-v2-model", max_buffer_size: Some(257344), max_data_file_size: Some(13000), max_record_size: Some(6198), write_buffer_size: Some(61440), flush_interval: Some(10s), filesystem: TestFilesystem { atomicity: Sector, files: {} } }, actions = [Writeback(DirEntry(DataFile(0))), AdvanceTime(10s), ReadRecord, WriteRecord(Record { id: 0, size: 0, event_count: 1, encoded_len: 12, archived_len: 64, .. }), FlushWrites, Crash] diff --git a/lib/vector-buffers/src/variants/disk_v2/ledger.rs b/lib/vector-buffers/src/variants/disk_v2/ledger.rs index 0e32b8266d977..82fecbac0cd03 100644 --- a/lib/vector-buffers/src/variants/disk_v2/ledger.rs +++ b/lib/vector-buffers/src/variants/disk_v2/ledger.rs @@ -5,7 +5,6 @@ use std::{ Arc, atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering}, }, - time::Instant, }; use bytecheck::CheckBytes; @@ -15,7 +14,7 @@ use fslock::LockFile; use futures::StreamExt; use rkyv::{Archive, Serialize, with::Atomic}; use snafu::{ResultExt, Snafu}; -use tokio::{fs, io::AsyncWriteExt, sync::Notify}; +use tokio::{fs, io::AsyncWriteExt, sync::Notify, time::Instant}; use vector_common::finalizer::OrderedFinalizer; use super::{ @@ -29,6 +28,9 @@ use crate::buffer_usage_data::BufferUsageHandle; pub const LEDGER_LEN: usize = align16(mem::size_of::()); +/// File name of the ledger within the buffer's data directory. +const LEDGER_FILE_NAME: &str = "buffer.db"; + /// Error that occurred during calls to [`Ledger`]. #[derive(Debug, Snafu)] pub enum LedgerLoadCreateError { @@ -380,6 +382,12 @@ where .join(format!("buffer-data-{file_id}.dat")) } + /// Gets the path to the ledger file. + #[cfg(test)] + pub fn ledger_path(&self) -> PathBuf { + self.config.data_dir.join(LEDGER_FILE_NAME) + } + /// Waits for a signal from the reader that progress has been made. /// /// This will only occur when a record is read, which may allow enough space (below the maximum @@ -603,7 +611,7 @@ where } // Open the ledger file, which may involve creating it if it doesn't yet exist. - let ledger_path = config.data_dir.join("buffer.db"); + let ledger_path = config.data_dir.join(LEDGER_FILE_NAME); let mut ledger_handle = config .filesystem .open_file_writable(&ledger_path) @@ -737,16 +745,20 @@ where Ok(()) } + /// Returns the finalizer together with the join handle of the task draining it. Callers that + /// do not need the handle let it drop, leaving the task to run until the finalizer is dropped. #[must_use] - pub(super) fn spawn_finalizer(self: Arc) -> OrderedFinalizer { + pub(super) fn spawn_finalizer( + self: Arc, + ) -> (OrderedFinalizer, tokio::task::JoinHandle<()>) { let (finalizer, mut stream) = OrderedFinalizer::new(None); - vector_common::spawn_in_current_span(async move { + let handle = vector_common::spawn_in_current_span(async move { while let Some((_status, amount)) = stream.next().await { self.increment_pending_acks(amount); self.notify_writer_waiters(); } }); - finalizer + (finalizer, handle) } } diff --git a/lib/vector-buffers/src/variants/disk_v2/mod.rs b/lib/vector-buffers/src/variants/disk_v2/mod.rs index 6cb43421ae728..c2a40b2610cb3 100644 --- a/lib/vector-buffers/src/variants/disk_v2/mod.rs +++ b/lib/vector-buffers/src/variants/disk_v2/mod.rs @@ -243,7 +243,15 @@ where pub(crate) async fn from_config_inner( config: DiskBufferConfig, usage_handle: BufferUsageHandle, - ) -> Result<(BufferWriter, BufferReader, Arc>), BufferError> + ) -> Result< + ( + BufferWriter, + BufferReader, + Arc>, + tokio::task::JoinHandle<()>, + ), + BufferError, + > where FS: Filesystem + fmt::Debug + Clone + 'static, FS::File: Unpin, @@ -259,7 +267,7 @@ where .await .context(WriterSeekFailedSnafu)?; - let finalizer = Arc::clone(&ledger).spawn_finalizer(); + let (finalizer, finalizer_handle) = Arc::clone(&ledger).spawn_finalizer(); let mut reader = BufferReader::new(Arc::clone(&ledger), finalizer); reader @@ -269,7 +277,7 @@ where ledger.synchronize_buffer_usage(); - Ok((writer, reader, ledger)) + Ok((writer, reader, ledger, finalizer_handle)) } /// Creates a new disk buffer from the given [`DiskBufferConfig`]. @@ -291,7 +299,7 @@ where FS: Filesystem + fmt::Debug + Clone + 'static, FS::File: Unpin, { - let (writer, reader, _) = Self::from_config_inner(config, usage_handle).await?; + let (writer, reader, _, _) = Self::from_config_inner(config, usage_handle).await?; Ok((writer, reader)) } diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs b/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs index a63ed70635a40..cb40138859df3 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs @@ -34,7 +34,7 @@ async fn ack_updates_ledger_correctly() { // Create our ledger, and make sure it's empty. let ledger = Arc::new(ledger); - let finalizer = Arc::clone(&ledger).spawn_finalizer(); + let (finalizer, _) = Arc::clone(&ledger).spawn_finalizer(); assert_eq!(ledger.consume_pending_acks(), 0); // Now make sure it updates pending acks. @@ -66,7 +66,7 @@ async fn ack_wakes_reader() { // Create our ledger, as well as a future for awaiting // writer progress, and make sure it's not yet woken up. let ledger = Arc::new(ledger); - let finalizer = Arc::clone(&ledger).spawn_finalizer(); + let (finalizer, _) = Arc::clone(&ledger).spawn_finalizer(); let mut wait_for_writer = spawn(ledger.wait_for_writer()); assert_pending!(wait_for_writer.poll()); diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs b/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs index fa8ac8b11ad23..5ce56f0968b09 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs @@ -206,9 +206,10 @@ where .build() .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - Buffer::from_config_inner(config, usage_handle) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle) .await - .expect("should not fail to create buffer") + .expect("should not fail to create buffer"); + (writer, reader, ledger) } /// Creates a disk v2 buffer with all default values, but returns a handle to the buffer usage tracker. @@ -228,7 +229,7 @@ where .build() .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - let (writer, reader, ledger) = Buffer::from_config_inner(config, usage_handle.clone()) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle.clone()) .await .expect("should not fail to create buffer"); (writer, reader, ledger, usage_handle) @@ -278,9 +279,10 @@ where .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - Buffer::from_config_inner(config, usage_handle) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle) .await - .expect("should not fail to create buffer") + .expect("should not fail to create buffer"); + (writer, reader, ledger) } /// Creates a disk v2 buffer with the specified maximum record size. @@ -302,9 +304,10 @@ where .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - Buffer::from_config_inner(config, usage_handle) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle) .await - .expect("should not fail to create buffer") + .expect("should not fail to create buffer"); + (writer, reader, ledger) } /// Creates a disk v2 buffer with the specified maximum data file size. @@ -331,9 +334,10 @@ where .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - Buffer::from_config_inner(config, usage_handle) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle) .await - .expect("should not fail to create buffer") + .expect("should not fail to create buffer"); + (writer, reader, ledger) } /// Creates a disk v2 buffer with the specified write buffer size. @@ -355,9 +359,10 @@ where .expect("creating buffer should not fail"); let usage_handle = BufferUsageHandle::noop(); - Buffer::from_config_inner(config, usage_handle) + let (writer, reader, ledger, _) = Buffer::from_config_inner(config, usage_handle) .await - .expect("should not fail to create buffer") + .expect("should not fail to create buffer"); + (writer, reader, ledger) } pub(crate) fn get_corrected_max_record_size(payload: &T) -> usize diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/action.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/action.rs index c68bbc7755ba9..e7c9eca7040f3 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/action.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/action.rs @@ -1,9 +1,13 @@ +use std::fmt; + use proptest::{ arbitrary::any, collection::{SizeRange, vec as arb_vec}, prop_compose, prop_oneof, - strategy::{Just, Strategy}, + strategy::{Just, NewTree, Strategy, ValueTree}, + test_runner::TestRunner, }; +use tokio::time::Duration; use super::record::Record; @@ -19,6 +23,39 @@ pub enum Action { FlushWrites, ReadRecord, AcknowledgeRead, + AdvanceTime(Duration), + Writeback(Writeback), + Crash, +} + +/// A stream the operating system flushed to disk on its own, ahead of any explicit flush. +/// This allows us to model OS non-determinism in durability. +#[derive(Clone, Debug)] +pub enum Writeback { + /// The ledger's memory-mapped pages reach disk, the same durable effect as an `msync`. + LedgerContents, + /// A data file's dirty tail reaches disk, with the given degree of completeness. + DataContents { file: u16, tail: TailPersistence }, + /// A single file's directory entry, its creation or deletion, reaches disk. + DirEntry(DirTarget), +} + +/// The file whose directory entry reaches disk. +#[derive(Clone, Debug)] +pub enum DirTarget { + Ledger, + DataFile(u16), +} + +/// How completely a data file's dirty tail reached disk, modelling append atomicity. +#[derive(Clone, Copy, Debug)] +pub enum TailPersistence { + /// Every dirty unit landed intact. + AllDirty, + /// A trailing unit did not land; the file is truncated at a unit boundary. + TornAtBoundary, + /// The trailing unit's size landed but its contents are garbage. + GarbageBoundary, } prop_compose! { @@ -27,18 +64,62 @@ prop_compose! { } } -fn arb_action() -> impl Strategy { +fn arb_action() -> impl Strategy> { + RemovableAction::new(arb_concrete_action()) +} + +fn arb_concrete_action() -> impl Strategy { // Overall, we want reads and writes to be equal, with slightly fewer acks, and slightly fewer // still flushes of writes. prop_oneof![ + // Generate nonzero durations only so time advances shrink to 1ms. Zero advances are valid + // no-ops, but they add noise to generated repros and are stripped if they appear. + 2 => (1u64..=20_000) + .prop_map(|millis| Action::AdvanceTime(Duration::from_millis(millis))), 3 => Just(Action::FlushWrites), 5 => Just(Action::ReadRecord), 4 => Just(Action::AcknowledgeRead), - 5 => any::<(u32, u16, u8, u8)>().prop_map(|(id, base_size, size_offset, event_count)| { + 3 => arb_writeback().prop_map(Action::Writeback), + 1 => Just(Action::Crash), + // The model is checking durability of successful writes; empty records are immediate + // validation errors and make those failures shrink into unrelated error-path calls. + 5 => arb_write_record(1u32..=6), + ] +} + +fn arb_write_record(event_count: S) -> impl Strategy +where + S: Strategy, +{ + (any::<(u32, u16, u8)>(), event_count).prop_map( + |((id, base_size, size_offset), event_count)| { let size = u32::from(base_size) + u32::from(size_offset); - let event_count = event_count % 7; - Action::WriteRecord(Record::new(id, size, u32::from(event_count))) - }), + Action::WriteRecord(Record::new(id, size, event_count)) + }, + ) +} + +fn arb_tail_persistence() -> impl Strategy { + prop_oneof![ + Just(TailPersistence::AllDirty), + Just(TailPersistence::TornAtBoundary), + Just(TailPersistence::GarbageBoundary), + ] +} + +fn arb_dir_target() -> impl Strategy { + prop_oneof![ + Just(DirTarget::Ledger), + (0u16..5).prop_map(DirTarget::DataFile) + ] +} + +fn arb_writeback() -> impl Strategy { + prop_oneof![ + Just(Writeback::LedgerContents), + (0u16..5, arb_tail_persistence()) + .prop_map(|(file, tail)| Writeback::DataContents { file, tail }), + arb_dir_target().prop_map(Writeback::DirEntry), ] } @@ -46,7 +127,7 @@ pub fn arb_actions(len_range: R) -> impl Strategy> where R: Into, { - arb_vec(arb_action(), len_range).prop_map(sanitize_raw_actions) + ActionList::new(arb_vec(arb_action(), len_range)) } /// Sanitizes raw actions generated by proptest into a valid sequence. @@ -79,7 +160,236 @@ pub fn sanitize_raw_actions(actions: Vec) -> Vec { unacked_events -= 1; Action::AcknowledgeRead }), + Action::AdvanceTime(duration) => Some(Action::AdvanceTime(duration)), Action::FlushWrites => Some(Action::FlushWrites), + // The OS flushing a stream is always a valid thing to happen. + Action::Writeback(w) => Some(Action::Writeback(w)), + // A crash can happen at any point and invalidates outstanding read/ack accounting, + // since the reader restarts from its last durable position after reopen. + Action::Crash => { + unread_event_count += unacked_events; + unacked_events = 0; + Some(Action::Crash) + } }) .collect::>() } + +#[derive(Clone)] +struct RemovableAction { + inner: S, +} + +impl RemovableAction { + const fn new(inner: S) -> Self { + Self { inner } + } +} + +impl fmt::Debug for RemovableAction +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RemovableAction") + .field("inner", &self.inner) + .finish() + } +} + +impl Strategy for RemovableAction +where + S: Strategy, +{ + type Tree = RemovableActionTree; + type Value = Option; + + fn new_tree(&self, runner: &mut TestRunner) -> NewTree { + self.inner.new_tree(runner).map(RemovableActionTree::new) + } +} + +#[derive(Clone, Debug)] +struct RemovableActionTree { + inner: T, + removed: bool, + removal_rejected: bool, +} + +impl RemovableActionTree { + const fn new(inner: T) -> Self { + Self { + inner, + removed: false, + removal_rejected: false, + } + } +} + +impl ValueTree for RemovableActionTree +where + T: ValueTree, +{ + type Value = Option; + + fn current(&self) -> Self::Value { + (!self.removed).then(|| self.inner.current()) + } + + fn simplify(&mut self) -> bool { + // The built-in Option shrinker only tries None after the inner value is done shrinking. + // Retrying removal after each accepted inner shrink keeps no-op time advances out of repros. + if !self.removed && !self.removal_rejected { + self.removed = true; + return true; + } + + if !self.removed && self.inner.simplify() { + self.removal_rejected = false; + return true; + } + + false + } + + fn complicate(&mut self) -> bool { + if self.removed { + self.removed = false; + self.removal_rejected = true; + return true; + } + + if self.inner.complicate() { + self.removal_rejected = false; + return true; + } + + false + } +} + +#[derive(Clone)] +struct ActionList { + inner: S, +} + +impl ActionList { + const fn new(inner: S) -> Self { + Self { inner } + } +} + +impl fmt::Debug for ActionList +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ActionList") + .field("inner", &self.inner) + .finish() + } +} + +impl Strategy for ActionList +where + S: Strategy>>, +{ + type Tree = ActionListTree; + type Value = Vec; + + fn new_tree(&self, runner: &mut TestRunner) -> NewTree { + self.inner.new_tree(runner).map(ActionListTree::new) + } +} + +#[derive(Clone, Debug)] +struct ActionListTree { + inner: T, + strip_zero_advances: bool, + zero_strip_rejected: bool, + last_simplification: Option, +} + +impl ActionListTree { + const fn new(inner: T) -> Self { + Self { + inner, + strip_zero_advances: false, + zero_strip_rejected: false, + last_simplification: None, + } + } +} + +#[derive(Clone, Copy, Debug)] +enum ActionListSimplification { + StripZeroAdvances, + Inner, +} + +impl ValueTree for ActionListTree +where + T: ValueTree>>, +{ + type Value = Vec; + + fn current(&self) -> Self::Value { + raw_actions_to_actions(self.inner.current(), self.strip_zero_advances) + } + + fn simplify(&mut self) -> bool { + // If a zero-time advance appears during shrinking or future generator changes, remove it + // as a no-op before the vector shrinker spends budget elsewhere. + if !self.strip_zero_advances + && !self.zero_strip_rejected + && has_zero_advance(self.inner.current()) + { + self.strip_zero_advances = true; + self.last_simplification = Some(ActionListSimplification::StripZeroAdvances); + return true; + } + + if self.inner.simplify() { + self.zero_strip_rejected = false; + self.last_simplification = Some(ActionListSimplification::Inner); + return true; + } + + self.last_simplification = None; + false + } + + fn complicate(&mut self) -> bool { + match self.last_simplification.take() { + Some(ActionListSimplification::StripZeroAdvances) => { + self.strip_zero_advances = false; + self.zero_strip_rejected = true; + true + } + Some(ActionListSimplification::Inner) if self.inner.complicate() => { + self.zero_strip_rejected = false; + true + } + Some(ActionListSimplification::Inner) | None => false, + } + } +} + +fn raw_actions_to_actions( + raw_actions: Vec>, + strip_zero_advances: bool, +) -> Vec { + let actions = raw_actions.into_iter().flatten().filter(|action| { + !(strip_zero_advances + && matches!(action, Action::AdvanceTime(duration) if *duration == Duration::ZERO)) + }); + + sanitize_raw_actions(actions.collect::>()) +} + +fn has_zero_advance(raw_actions: Vec>) -> bool { + raw_actions + .into_iter() + .flatten() + .any(|action| matches!(action, Action::AdvanceTime(duration) if duration == Duration::ZERO)) +} diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/common.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/common.rs index abbe785b9caef..facc600b3bede 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/common.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/common.rs @@ -1,10 +1,14 @@ use std::time::Duration; -use proptest::{arbitrary::any, strategy::Strategy}; +use proptest::strategy::{Just, Strategy}; -use super::{filesystem::TestFilesystem, record::Record}; +use super::{ + filesystem::{TestFilesystem, arb_fs_atomicity}, + record::Record, +}; use crate::variants::disk_v2::{ - BufferReader, BufferWriter, DiskBufferConfig, DiskBufferConfigBuilder, ReaderError, WriterError, + BufferReader, BufferWriter, DiskBufferConfigBuilder, ReaderError, WriterError, + common::MINIMUM_MAX_RECORD_SIZE, ledger::LEDGER_LEN, }; pub type TestReader = BufferReader; @@ -17,6 +21,10 @@ pub type WriterResult = Result>; // buffer overall, which exercises the "write this record directly to the wrapped writer" logic that // exists in `tokio::io::BufWriter` itself. pub const TEST_WRITE_BUFFER_SIZE: usize = 60 * 1024; +const MODEL_MAX_RAW_BUFFER_SIZE: u64 = 4_194_240; +const MODEL_MAX_DATA_FILE_SIZE: u64 = 131_070; +const MODEL_MAX_RECORD_SIZE: usize = 65_535; +const MIN_VALID_MAX_RECORD_SIZE: usize = MINIMUM_MAX_RECORD_SIZE + 1; /// Result of applying an action to the model. /// @@ -32,57 +40,73 @@ pub enum Progress { Blocked, } -pub fn arb_buffer_config() -> impl Strategy> { - any::<(u16, u16, u16)>() - .prop_map(|(n1, n2, n3)| { - let max_buffer_size = u64::from(n1) * 64; - let max_data_file_size = u64::from(n2) * 2; - let max_record_size = n3.into(); - - let mut path = std::env::temp_dir(); - path.push("vector-disk-v2-model"); +pub fn arb_buffer_config() -> impl Strategy> { + // Generate dependent limits directly as minimum + slack, rather than generating invalid + // triples and rejecting them, so each max_* field has a clean path toward its lower bound. + ( + MIN_VALID_MAX_RECORD_SIZE..=MODEL_MAX_RECORD_SIZE, + 1u64..=60, + arb_fs_atomicity(), + ) + .prop_flat_map(|(max_record_size, flush_interval_secs, atomicity)| { + let min_data_file_size = + u64::try_from(max_record_size).expect("model max record size must fit in u64"); + let max_data_file_size_slack = MODEL_MAX_DATA_FILE_SIZE - min_data_file_size; - DiskBufferConfigBuilder::from_path(path) - .max_buffer_size(max_buffer_size) - .max_data_file_size(max_data_file_size) - .max_record_size(max_record_size) - .write_buffer_size(TEST_WRITE_BUFFER_SIZE) - // This really only affects how often we flush the ledger, because we always `flush` - // after writes to ensure our buffered writes make it to the data files for the - // readers to make progress, and we're not testing anything about whether or not the - // ledger makes it to disk durably. - .flush_interval(Duration::from_secs(10)) - .filesystem(TestFilesystem::default()) + ( + Just((max_record_size, flush_interval_secs, atomicity)), + 0u64..=max_data_file_size_slack, + ) }) - .prop_filter_map( - "maximum size limits were too high, or zero", - validate_buffer_config, + .prop_flat_map( + |((max_record_size, flush_interval_secs, atomicity), max_data_file_size_slack)| { + let min_data_file_size = + u64::try_from(max_record_size).expect("model max record size must fit in u64"); + let max_data_file_size = min_data_file_size + max_data_file_size_slack; + let min_buffer_size = minimum_raw_buffer_size(max_data_file_size); + let max_buffer_size_slack = MODEL_MAX_RAW_BUFFER_SIZE - min_buffer_size; + + ( + Just(( + max_record_size, + max_data_file_size, + flush_interval_secs, + atomicity, + )), + 0u64..=max_buffer_size_slack, + ) + }, + ) + .prop_map( + |( + (max_record_size, max_data_file_size, flush_interval_secs, atomicity), + max_buffer_size_slack, + )| { + let max_buffer_size = + minimum_raw_buffer_size(max_data_file_size) + max_buffer_size_slack; + let mut path = std::env::temp_dir(); + path.push("vector-disk-v2-model"); + + DiskBufferConfigBuilder::from_path(path) + .max_buffer_size(max_buffer_size) + .max_data_file_size(max_data_file_size) + .max_record_size(max_record_size) + .write_buffer_size(TEST_WRITE_BUFFER_SIZE) + // This really only affects how often we flush the ledger, because we always `flush` + // after writes to ensure our buffered writes make it to the data files for the + // readers to make progress, and we're not testing anything about whether or not the + // ledger makes it to disk durably. + .flush_interval(Duration::from_secs(flush_interval_secs)) + .filesystem(TestFilesystem::with_atomicity(atomicity)) + }, ) } -/// Validates the given buffer config builder and generates a resulting configuration. -/// -/// If the builder has been configured incorrectly (i.e. zero values), or if the configuration is -/// valid but has values that are not appropriate for being used under test (i.e. values are way too -/// large and would balloon the run-time of the test) then `None` is returned. -/// -/// Otherwise, `Some(DiskBufferConfig)` is returned. -pub fn validate_buffer_config( - builder: DiskBufferConfigBuilder, -) -> Option> { - builder - .build() - // If building the configuration failed, just return `None`. - .ok() - .filter(|config| { - // Limit our buffer config to the following: - // - max buffer size of 64MB - // - max data file size of 2MB - // - max record size of 1MB - // - // Otherwise, the model just runs uselessly slow. - config.max_buffer_size <= 64_000_000 - && config.max_data_file_size <= 2_000_000 - && config.max_record_size <= 1_000_000 - }) +fn minimum_raw_buffer_size(max_data_file_size: u64) -> u64 { + let ledger_len = u64::try_from(LEDGER_LEN).expect("ledger length must fit in u64"); + + max_data_file_size + .checked_mul(2) + .and_then(|doubled| doubled.checked_add(ledger_len)) + .expect("model max data file size must leave room for the minimum buffer size") } diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs index a88204d7f7a9c..fa5f44d5af516 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/filesystem.rs @@ -1,6 +1,6 @@ use std::{ cmp, - collections::HashMap, + collections::{HashMap, HashSet}, fmt, io, path::{Path, PathBuf}, pin::Pin, @@ -10,11 +10,53 @@ use std::{ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use proptest::{ + prop_oneof, + strategy::{Just, Strategy}, +}; + +use super::action::TailPersistence; use crate::variants::disk_v2::{ Filesystem, io::{AsyncFile, Metadata, ReadableMemoryMap, WritableMemoryMap}, }; +/// Byte written into a garbage boundary unit. Arbitrary value chosen to +/// distinguish from bytes already written by the model. +const GARBAGE_BYTE: u8 = 0xFF; + +/// The largest write a filesystem persists atomically, setting the granularity at which a torn +/// writeback tears. +#[derive(Clone, Copy, Debug, Default)] +pub enum FsAtomicity { + /// Writes are atomic per 512-byte sector. Torn writes lands on a 512-byte + /// boundary. + #[default] + Sector, + /// Writes are atomic per 4096-byte block. Torn write lands on a 4096-byte + /// boundary. + Block, +} + +impl FsAtomicity { + fn tear_granularity(self) -> usize { + match self { + FsAtomicity::Sector => 512, + FsAtomicity::Block => 4096, + } + } +} + +pub fn arb_fs_atomicity() -> impl Strategy { + prop_oneof![Just(FsAtomicity::Sector), Just(FsAtomicity::Block)] +} + +/// The largest `unit` boundary strictly below `len` — i.e. the offset that drops the trailing +/// (possibly partial) unit. +fn unit_floor_below(len: usize, unit: usize) -> usize { + (len.saturating_sub(1) / unit) * unit +} + fn io_err_already_exists() -> io::Error { io::Error::new(io::ErrorKind::AlreadyExists, "file already exists") } @@ -28,7 +70,10 @@ fn io_err_permission_denied() -> io::Error { } struct FileInner { + // The live contents, as seen by the running reader and writer. buf: Option>, + // The contents known to have reached disk via a barrier. + durable: Vec, } impl FileInner { @@ -40,12 +85,17 @@ impl FileInner { let previous = self.buf.replace(buf); assert!(previous.is_none()); } + + fn persist(&mut self) { + self.durable = self.buf.as_ref().expect("file buf consumed").clone(); + } } impl Default for FileInner { fn default() -> Self { Self { buf: Some(Vec::new()), + durable: Vec::new(), } } } @@ -79,6 +129,19 @@ impl TestFile { } } + // Rebuilds a file from its durable snapshot after a crash. The live contents equal the + // durable contents, since that is all that reached disk. + fn from_durable(durable: Vec) -> Self { + Self { + inner: Arc::new(Mutex::new(FileInner { + buf: Some(durable.clone()), + durable, + })), + is_writable: false, + read_pos: 0, + } + } + fn set_readable(&mut self) { self.is_writable = false; } @@ -141,6 +204,8 @@ impl ReadableMemoryMap for TestMmap {} impl WritableMemoryMap for TestMmap { fn flush(&self) -> io::Result<()> { + let buf = self.buf.as_ref().expect("mmap buf consumed"); + self.inner.lock().expect("poisoned").durable = buf.clone(); Ok(()) } } @@ -217,6 +282,7 @@ impl AsyncFile for TestFile { } async fn sync_all(&self) -> io::Result<()> { + self.inner.lock().expect("poisoned").persist(); Ok(()) } } @@ -224,7 +290,10 @@ impl AsyncFile for TestFile { // Inner state of the test filesystem. #[derive(Debug, Default)] struct FilesystemInner { + atomicity: FsAtomicity, files: HashMap, + // Paths whose directory entry has reached disk and so survive a crash. + durable_names: HashSet, } impl FilesystemInner { @@ -272,6 +341,51 @@ impl FilesystemInner { fn delete_file(&mut self, path: &Path) -> bool { self.files.remove(path).is_some() } + + // Models the OS flushing a data file's dirty tail to disk ahead of an + // explicit `sync_all`. + fn writeback_data(&mut self, path: &Path, tail: TailPersistence) { + let Some(file) = self.files.get(path) else { + return; + }; + let unit = self.atomicity.tear_granularity(); + let mut inner = file.inner.lock().expect("poisoned"); + let live = inner.buf.as_ref().expect("data file buf consumed").clone(); + let durable_len = inner.durable.len(); + if live.len() <= durable_len { + return; + } + let boundary = unit_floor_below(live.len(), unit).max(durable_len); + inner.durable = match tail { + TailPersistence::AllDirty => live, + TailPersistence::TornAtBoundary => live[..boundary].to_vec(), + TailPersistence::GarbageBoundary => { + let mut durable = live[..boundary].to_vec(); + durable.resize(live.len(), GARBAGE_BYTE); + durable + } + }; + } + + // Models the OS flushing one file's directory entry to disk. + fn writeback_dir_entry(&mut self, path: &Path) { + if self.files.contains_key(path) { + self.durable_names.insert(path.to_owned()); + } else { + self.durable_names.remove(path); + } + } + + // Discards everything that has not been forced durable by a barrier. + fn crash(&mut self) { + let live = std::mem::take(&mut self.files); + for (path, file) in live { + if self.durable_names.contains(&path) { + let durable = file.inner.lock().expect("poisoned").durable.clone(); + self.files.insert(path, TestFile::from_durable(durable)); + } + } + } } /// A `Filesystem` that tracks files in memory and allows introspection from the outside. @@ -279,10 +393,47 @@ pub struct TestFilesystem { inner: Arc>, } +impl TestFilesystem { + /// A filesystem with the given append atomicity. + pub fn with_atomicity(atomicity: FsAtomicity) -> Self { + Self { + inner: Arc::new(Mutex::new(FilesystemInner { + atomicity, + ..Default::default() + })), + } + } + + /// Models a hard crash, Discarding everything not forced durable by a + /// barrier. + pub fn crash(&self) { + self.inner.lock().expect("poisoned").crash(); + } + + /// Models the OS flushing a data file's dirty tail ahead of an explicit + /// `sync_all`. + pub fn writeback_data(&self, path: &Path, tail: TailPersistence) { + self.inner + .lock() + .expect("poisoned") + .writeback_data(path, tail); + } + + /// Models the OS flushing one file's directory entry ahead of a directory + /// fsync. + pub fn writeback_dir_entry(&self, path: &Path) { + self.inner + .lock() + .expect("poisoned") + .writeback_dir_entry(path); + } +} + impl fmt::Debug for TestFilesystem { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let inner = self.inner.lock().expect("poisoned"); f.debug_struct("TestFilesystem") + .field("atomicity", &inner.atomicity) .field("files", &inner.files) .finish() } @@ -290,7 +441,9 @@ impl fmt::Debug for TestFilesystem { impl Clone for TestFilesystem { fn clone(&self) -> Self { - Self::default() + Self { + inner: Arc::clone(&self.inner), + } } } diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs index ebab8bd1452e8..80a75779263c0 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/mod.rs @@ -26,7 +26,7 @@ use crate::{ mod action; use self::{ - action::{Action, arb_actions}, + action::{Action, DirTarget, Writeback, arb_actions}, record::EncodeError, }; @@ -807,27 +807,20 @@ impl BufferModel { proptest! { #[test] - fn model_check(mut config in arb_buffer_config(), actions in arb_actions(0..64)) { + fn model_check(mut builder in arb_buffer_config(), actions in arb_actions(0..64)) { let rt = Builder::new_current_thread() .enable_all() + .start_paused(true) .build() .expect("should not fail to build runtime"); let _a = install_tracing_helpers(); - info!( - actions = actions.len(), - max_buffer_size = config.max_buffer_size, - max_data_file_size = config.max_data_file_size, - max_record_size = config.max_record_size, - "Starting model.", - ); - // We generate a new temporary directory and overwrite the data directory in the buffer - // configuration. This allows us to use a utility that will generate a random directory each - // time -- parallel runs of this test can't clobber each other anymore -- but also ensure - // that the directory is cleaned up when the test run is over. + // We generate a new temporary directory and overwrite the data directory in the builder. + // This gives each run a unique directory -- parallel runs can't clobber each other -- that + // is cleaned up when the run ends. let buf_dir = TempDir::with_prefix("vector-buffers-disk-v2-model").expect("creating temp dir should never fail"); - config.data_dir = buf_dir.path().to_path_buf(); + builder.data_dir = buf_dir.path().to_path_buf(); rt.block_on(async move { // This model tries to encapsulate all of the behavior of the disk buffer v2 @@ -836,7 +829,7 @@ proptest! { // // At the very top, we have our input actions, which are mapped one-to-one with the // possible actions that can influence the disk buffer: reading records, writing - // records, flushing writes, and acknowledging reads. + // records, flushing writes, acknowledging reads, and advancing time. // // After that, we have the model itself, which essentially a barebones re-implementation // of the disk buffer itself without any asynchrony, rich error handling, etc. We scope @@ -868,10 +861,21 @@ proptest! { // should be tried again before pulling a new action from the remaining actions in the // input sequence, and so on. Effectively, we can deterministically drive asynchronous // actions that are coupled to one another, in a lockstep fashion, with the model. + // Each buffer open rebuilds the config from the builder, the same path a restarting + // process takes. The model borrows this first one before it is handed to the open. + let config = builder.clone().build().expect("validated builder should build"); + info!( + actions = actions.len(), + max_buffer_size = config.max_buffer_size, + max_data_file_size = config.max_data_file_size, + max_record_size = config.max_record_size, + "Starting model.", + ); let mut model = BufferModel::from_config(&config); + let fs = builder.filesystem.clone(); let usage_handle = BufferUsageHandle::noop(); - let (writer, reader, ledger) = + let (writer, reader, mut ledger, mut finalizer_handle) = Buffer::::from_config_inner(config, usage_handle) .await .expect("should not fail to build buffer"); @@ -905,9 +909,66 @@ proptest! { // run against the model. If it's an action that may be asynchronous/blocked on // progress of another component, we try it later on, which lets us deduplicate some code. if let Some(action) = sequencer.trigger_next_runnable_action() { - if let Action::AcknowledgeRead = action { + match action { // Acknowledgements are based on atomics, so they never wait asynchronously. - model.acknowledge_read(); + Action::AcknowledgeRead => model.acknowledge_read(), + Action::AdvanceTime(duration) if duration > tokio::time::Duration::ZERO => { + // `advance(0)` still yields internally, which makes zero-time advances + // affect scheduling and prevents them from shrinking out as no-ops. + tokio::time::advance(duration).await; + tokio::task::yield_now().await; + } + // The OS flushes a stream to disk ahead of any explicit flush. + Action::Writeback(writeback) => match writeback { + Writeback::LedgerContents => { + ledger.flush().expect("ledger flush should not fail in the model"); + } + Writeback::DataContents { file, tail } => { + fs.writeback_data(&ledger.get_data_file_path(file), tail); + } + Writeback::DirEntry(target) => { + let path = match target { + DirTarget::Ledger => ledger.ledger_path(), + DirTarget::DataFile(file) => ledger.get_data_file_path(file), + }; + fs.writeback_dir_entry(&path); + } + }, + Action::Crash => { + fs.crash(); + // Tear the SUT down before reopening. + let remaining = sequencer.into_remaining_actions(); + drop(ledger); + finalizer_handle.await.expect("finalizer task panicked"); + // Reopen the buffer the way a restarting process + // would. Rebuild the config from the builder, + // reopening the buffer the way a restarting process + // would. + let config = builder.clone().build().expect("validated builder should build"); + let (writer, reader, new_ledger, new_handle) = + match Buffer::::from_config_inner( + config, + BufferUsageHandle::noop(), + ) + .await + { + Ok(parts) => parts, + Err(e) => { + prop_assert!( + false, + "buffer failed to reopen after crash: {:?}", + e + ); + unreachable!() + } + }; + + ledger = new_ledger; + finalizer_handle = new_handle; + sequencer = ActionSequencer::new(remaining, reader, writer); + closed_writers = false; + } + _ => {} } } else { let mut made_progress = false; diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/model/sequencer.rs b/lib/vector-buffers/src/variants/disk_v2/tests/model/sequencer.rs index 0e77c78b66f4e..6b8cd5c7882fc 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/model/sequencer.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/model/sequencer.rs @@ -235,6 +235,13 @@ impl ActionSequencer { Action::WriteRecord(_) | Action::FlushWrites => allow_write, Action::ReadRecord => allow_read, Action::AcknowledgeRead => !self.unacked_events.is_empty(), + Action::AdvanceTime(_) => true, + // A writeback and a crash both act on the filesystem outside the + // reader/writer. They run only when neither has an operation in + // flight, keeping the model in lockstep. + Action::Writeback(_) | Action::Crash => { + allow_read && (allow_write || self.write_state.is_closed()) + } }) } @@ -253,9 +260,11 @@ impl ActionSequencer { /// Likewise, we can't execute another read if there's an in-flight read. Acknowledgements /// always happen out-of-band, though, and so are always eligible. pub fn trigger_next_runnable_action(&mut self) -> Option { - let pos = self.get_next_runnable_action(); + loop { + let action = self + .get_next_runnable_action() + .map(|i| self.actions.remove(i))?; - if let Some(action) = pos.map(|i| self.actions.remove(i)) { match action { Action::WriteRecord(record) => { assert!( @@ -264,7 +273,7 @@ impl ActionSequencer { ); self.write_state.transition_to_write(record.clone()); - Some(Action::WriteRecord(record)) + return Some(Action::WriteRecord(record)); } a @ Action::FlushWrites => { assert!( @@ -273,7 +282,7 @@ impl ActionSequencer { ); self.write_state.transition_to_flush(); - Some(a) + return Some(a); } a @ Action::ReadRecord => { assert!( @@ -282,18 +291,32 @@ impl ActionSequencer { ); self.read_state.transition_to_read(); - Some(a) + return Some(a); } Action::AcknowledgeRead => { drop(self.unacked_events.pop_front().expect("FIXME")); - Some(Action::AcknowledgeRead) + return Some(Action::AcknowledgeRead); + } + Action::AdvanceTime(duration) if duration == tokio::time::Duration::ZERO => { + // Zero-time advances are valid no-ops. Returning them would still create a + // scheduler step in the outer harness, making them sticky during shrinking. } + Action::AdvanceTime(duration) => return Some(Action::AdvanceTime(duration)), + // Applied to the filesystem only, reader and writer are idle, + // no in-flight state to transition to. + a @ (Action::Writeback(_) | Action::Crash) => return Some(a), } - } else { - None } } + /// Consumes the sequencer, returning the actions it has not yet triggered. + /// + /// Used to hand the remaining work to a freshly reopened sequencer after a crash, dropping + /// the old reader and writer so the buffer lock is released. + pub fn into_remaining_actions(self) -> Vec { + self.actions + } + /// Gets the result of pending write action, if one is in-flight. /// /// If a write action (either a record write or a flush) is in-flight, we attempt to poll it to diff --git a/tests/antithesis/harness/Cargo.toml b/tests/antithesis/harness/Cargo.toml index 3120ca061b94c..732b6b84145d0 100644 --- a/tests/antithesis/harness/Cargo.toml +++ b/tests/antithesis/harness/Cargo.toml @@ -12,6 +12,7 @@ publish = false antithesis_sdk = { workspace = true, features = ["full"] } axum = { workspace = true, features = ["http1", "tokio"] } clap = { workspace = true, features = ["derive"] } +hex = { version = "0.4.3" } reqwest = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["rt", "macros", "net", "time"] } diff --git a/tests/antithesis/harness/src/bin/eventually_conservation.rs b/tests/antithesis/harness/src/bin/eventually_conservation.rs index 947d6d9372d2a..2d11295c12bd1 100644 --- a/tests/antithesis/harness/src/bin/eventually_conservation.rs +++ b/tests/antithesis/harness/src/bin/eventually_conservation.rs @@ -10,7 +10,7 @@ #[cfg(target_os = "linux")] extern crate antithesis_instrumentation; -use antithesis_harness::payload_field; +use antithesis_harness::{claim, post_event}; use antithesis_sdk::{ antithesis_init, assert_always, assert_always_less_than_or_equal_to, assert_sometimes_greater_than, assert_unreachable, @@ -98,26 +98,6 @@ async fn all_healthy(client: &reqwest::Client, metrics_urls: &[String]) -> bool true } -async fn claim(client: &reqwest::Client, oracle_url: &str) -> Option { - let resp = client - .post(format!("{oracle_url}/claim")) - .timeout(time::Duration::from_secs(10)) - .send() - .await - .ok()?; - resp.text().await.ok()?.trim().parse().ok() -} - -async fn post_probe(client: &reqwest::Client, source_url: &str, id: u64) -> bool { - // Same deterministic payload as the producer, so the probe's delivery passes - // the oracle's content check instead of counting as corruption. - let event = json!([{ "id": id, "data": payload_field(id) }]); - matches!( - client.post(source_url).timeout(time::Duration::from_secs(10)).json(&event).send().await, - Ok(resp) if resp.status().is_success() - ) -} - #[tokio::main(flavor = "current_thread")] async fn main() { antithesis_init(); @@ -226,13 +206,20 @@ async fn main() { // post retry until one sticks, since a node can briefly refuse a write while it is // still recovering. A wedged node never delivers it and fails here. Runs // unconditionally. - let deadline = time::Instant::now() + time::Duration::from_secs(45); + // + // The recovery gate above only proves the metrics endpoint answers. That is a + // separate listener from the source's data path, so the source and sink can still + // be unready while metrics already serve, and a just-restarted node needs time to + // bring them up. The round-trip is therefore the real readiness signal and gets the + // same budget as recovery rather than a tight window that expires before the data + // path is serving. + let deadline = time::Instant::now() + time::Duration::from_secs(180); let mut probe = None; let mut progressed = false; while !progressed && time::Instant::now() < deadline { if probe.is_none() { if let Some(id) = claim(&client, &oracle_url).await { - if post_probe(&client, &source_url, id).await { + if post_event(&client, &source_url, id, time::Duration::from_secs(10)).await { probe = Some(id); } } diff --git a/tests/antithesis/harness/src/bin/parallel_driver_produce.rs b/tests/antithesis/harness/src/bin/parallel_driver_produce.rs index 91b04494a0ca9..01eb518f61a42 100644 --- a/tests/antithesis/harness/src/bin/parallel_driver_produce.rs +++ b/tests/antithesis/harness/src/bin/parallel_driver_produce.rs @@ -9,7 +9,7 @@ #[cfg(target_os = "linux")] extern crate antithesis_instrumentation; -use antithesis_harness::payload_field; +use antithesis_harness::{claim, post_event, report_acked}; use antithesis_sdk::{antithesis_init, assert_reachable, assert_unreachable}; use clap::Parser; use serde_json::json; @@ -25,48 +25,6 @@ struct Args { oracle_url: String, } -/// POST one event to the source. Ok(2xx) means the pipeline took end-to-end -/// responsibility for the event (with e2e acks enabled). -async fn post_event( - client: &reqwest::Client, - source_url: &str, - id: u64, - timeout: time::Duration, -) -> bool { - // The payload is a deterministic function of the id, so every retry re-sends - // the exact same record and the oracle can recompute the expected bytes. - let event = json!([{ "id": id, "data": payload_field(id) }]); - matches!( - client.post(source_url).timeout(timeout).json(&event).send().await, - Ok(resp) if resp.status().is_success() - ) -} - -/// Claim one fresh id from the oracle. -async fn claim(client: &reqwest::Client, oracle_url: &str) -> Option { - let resp = client - .post(format!("{oracle_url}/claim")) - .timeout(time::Duration::from_secs(10)) - .send() - .await - .ok()?; - resp.text().await.ok()?.trim().parse().ok() -} - -/// Tell the oracle the pipeline acked this id, so it must come back. Returns -/// whether the oracle recorded the obligation. -async fn report_acked(client: &reqwest::Client, oracle_url: &str, id: u64) -> bool { - matches!( - client - .post(format!("{oracle_url}/acked")) - .timeout(time::Duration::from_secs(10)) - .body(id.to_string()) - .send() - .await, - Ok(resp) if resp.status().is_success() - ) -} - #[tokio::main(flavor = "current_thread")] async fn main() { antithesis_init(); diff --git a/tests/antithesis/harness/src/lib.rs b/tests/antithesis/harness/src/lib.rs index 9784cc367d892..6f176d99f55f7 100644 --- a/tests/antithesis/harness/src/lib.rs +++ b/tests/antithesis/harness/src/lib.rs @@ -2,6 +2,9 @@ //! `scenarios/vector_to_vector_e2e_disk`) owns its own test-command bins. When two //! scenarios need the same HTTP or oracle helpers, factor them into modules here. +use std::time::Duration; + +use serde_json::json; use vector_buffers::WRITE_BUFFER_SIZE_V2; /// Payload lengths in bytes, one per id class. Sized around the disk_v2 write @@ -50,30 +53,56 @@ pub fn payload_for(id: u64) -> Vec { /// without escaping concerns, and a corruption of the bytes shows up as a hex /// mismatch. pub fn payload_field(id: u64) -> String { - let bytes = payload_for(id); - let mut s = String::with_capacity(bytes.len() * 2); - for b in bytes { - s.push(char::from_digit((b >> 4) as u32, 16).unwrap()); - s.push(char::from_digit((b & 0x0f) as u32, 16).unwrap()); - } - s + hex::encode(payload_for(id)) } /// Decode the hex produced by [`payload_field`] back to bytes. Returns `None` on /// any non-hex or odd-length input so the oracle can tell a mangled field from a /// content mismatch. pub fn decode_payload_field(field: &str) -> Option> { - if !field.len().is_multiple_of(2) { - return None; - } - let mut out = Vec::with_capacity(field.len() / 2); - let mut bytes = field.bytes(); - while let (Some(hi), Some(lo)) = (bytes.next(), bytes.next()) { - let hi = (hi as char).to_digit(16)?; - let lo = (lo as char).to_digit(16)?; - out.push(((hi << 4) | lo) as u8); - } - Some(out) + hex::decode(field).ok() +} + +/// Claim one fresh id from the oracle. `None` if the oracle is unreachable. +pub async fn claim(client: &reqwest::Client, oracle_url: &str) -> Option { + let resp = client + .post(format!("{oracle_url}/claim")) + .timeout(Duration::from_secs(10)) + .send() + .await + .ok()?; + resp.text().await.ok()?.trim().parse().ok() +} + +/// POST one event to the source. `true` on a 2xx, meaning the pipeline took +/// end-to-end responsibility for the event. The payload is a deterministic +/// function of the id, so every retry re-sends the exact same record and the +/// oracle can recompute the expected bytes. +pub async fn post_event( + client: &reqwest::Client, + source_url: &str, + id: u64, + timeout: Duration, +) -> bool { + let event = json!([{ "id": id, "data": payload_field(id) }]); + matches!( + client.post(source_url).timeout(timeout).json(&event).send().await, + Ok(resp) if resp.status().is_success() + ) +} + +/// Tell the oracle the pipeline acked this id, so it must come back. `true` if +/// the oracle recorded the obligation. +pub async fn report_acked(client: &reqwest::Client, oracle_url: &str, id: u64) -> bool { + matches!( + client + .post(format!("{oracle_url}/acked")) + .timeout(Duration::from_secs(10)) + .body(id.to_string()) + .send() + .await, + Ok(resp) if resp.status().is_success() + ) } #[cfg(test)] diff --git a/tests/antithesis/scenarios/vector_e2e/Dockerfile b/tests/antithesis/scenarios/vector_e2e/Dockerfile index 350fc48a9e165..7f2d0e21b170f 100644 --- a/tests/antithesis/scenarios/vector_e2e/Dockerfile +++ b/tests/antithesis/scenarios/vector_e2e/Dockerfile @@ -76,13 +76,8 @@ FROM debian:stable-slim AS vector RUN apt-get update && apt-get install -y --no-install-recommends curl ca-certificates \ && rm -rf /var/lib/apt/lists/* COPY --from=vector-build /usr/local/bin/vector /usr/bin/vector -# Bake the node config plus its benign alternate, which the reload fault swaps in -# to force a sink rebuild. +# Bake the node config; compose selects it via --config. COPY tests/antithesis/scenarios/vector_e2e/vector.yaml /etc/vector/vector.yaml -COPY tests/antithesis/scenarios/vector_e2e/vector.b.yaml /etc/vector/vector.b.yaml -# The reload fault is an anytime_ test command that runs IN the node container. -# The node stays running because its entrypoint is Vector, not a test command. -COPY --chmod=755 tests/antithesis/scenarios/vector_e2e/anytime_reload.sh /opt/antithesis/test/v1/ve2e/anytime_reload RUN mkdir -p /symbols && ln -s /usr/bin/vector /symbols/vector ENV NO_COLOR=1 EXPOSE 8080 9598 diff --git a/tests/antithesis/scenarios/vector_e2e/README.md b/tests/antithesis/scenarios/vector_e2e/README.md index 4448e918b7e94..4aabdc33a01b3 100644 --- a/tests/antithesis/scenarios/vector_e2e/README.md +++ b/tests/antithesis/scenarios/vector_e2e/README.md @@ -26,9 +26,7 @@ One Vector node and one oracle container. - **vector** takes an `http_server` source (`:8080`) and delivers over `http` to the oracle through an in-memory buffer with `when_full: block` and e2e acks. It - also exposes Prometheus metrics (`:9598`) for the health gate, and runs the - reload fault: an `anytime_` command swaps `vector.yaml`/`vector.b.yaml` and sends - `SIGHUP`, forcing the sink to rebuild mid-run. + also exposes Prometheus metrics (`:9598`) for the health gate. - **oracle** (`:8686`) is one container that injects unique event ids at the node and runs the HTTP endpoint the node's sink delivers back to. diff --git a/tests/antithesis/scenarios/vector_e2e/anytime_reload.sh b/tests/antithesis/scenarios/vector_e2e/anytime_reload.sh deleted file mode 100755 index 71416aaf9324b..0000000000000 --- a/tests/antithesis/scenarios/vector_e2e/anytime_reload.sh +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail -[ -n "${VECTOR_CONFIG_ALT:-}" ] || exit 0 -cfg="${VECTOR_CONFIG:?}" -alt="${VECTOR_CONFIG_ALT:?}" - -# Vector only ever reads $cfg, so reload alternates $cfg between two immutable -# sources rather than swapping two live files. The alternate $alt is never -# written, and the baseline (the original $cfg) is snapshotted once, so the only -# mutable file is $cfg and the only writes to it are a single rename of a fully -# written temp. The node-termination fault can therefore interrupt this script at -# any point and leave $cfg as one complete config or the other, never half-written -# and never collapsed so both sources hold the same content. Alternation always -# resumes on the next invocation. -base="$cfg.orig" -if [ ! -f "$base" ]; then - cp "$cfg" "$base.tmp" - mv "$base.tmp" "$base" -fi - -# Pick whichever source is not currently live. cksum reads from stdin so its -# output is the checksum alone, with no filename to differ on. -if [ "$(cksum <"$cfg")" = "$(cksum <"$alt")" ]; then - next="$base" -else - next="$alt" -fi -cp "$next" "$cfg.tmp" -mv "$cfg.tmp" "$cfg" - -# Vector is PID 1 in the node container. SIGHUP triggers reload-from-disk. -kill -HUP 1 -sleep 5 diff --git a/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml b/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml index abbc3a136b36d..f18a794ad251a 100644 --- a/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml +++ b/tests/antithesis/scenarios/vector_e2e/docker-compose.yaml @@ -21,12 +21,8 @@ services: build: *vector-build image: ve2e-vector:${ANTITHESIS_IMAGE_TAG:-dev} entrypoint: ["/usr/bin/vector", "--config", "/etc/vector/vector.yaml"] - # vector runs the reload fault: VECTOR_CONFIG_ALT lets anytime_reload swap - # configs and SIGHUP, forcing the sink to rebuild. No disk buffer, so no volume. environment: NO_COLOR: "1" - VECTOR_CONFIG: "/etc/vector/vector.yaml" - VECTOR_CONFIG_ALT: "/etc/vector/vector.b.yaml" healthcheck: *node-health oracle: diff --git a/tests/antithesis/scenarios/vector_e2e/vector.b.yaml b/tests/antithesis/scenarios/vector_e2e/vector.b.yaml deleted file mode 100644 index d426c7413699a..0000000000000 --- a/tests/antithesis/scenarios/vector_e2e/vector.b.yaml +++ /dev/null @@ -1,36 +0,0 @@ -sources: - in: - type: http_server - address: 0.0.0.0:8080 - decoding: - codec: json - acknowledgements: - enabled: true - - metrics: - type: internal_metrics - scrape_interval_secs: 1 - -sinks: - out: - type: http - inputs: [in] - uri: http://oracle:8686/ingest - method: post - encoding: - codec: json - # Benign alternate the reload fault swaps in. It differs from vector.yaml only - # by an explicit request timeout, enough to make the reload rebuild the sink. - request: - timeout_secs: 45 - buffer: - type: memory - max_events: 500 - when_full: block - acknowledgements: - enabled: true - - prom: - type: prometheus_exporter - inputs: [metrics] - address: 0.0.0.0:9598