diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs b/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs index ef7c7c848729f..c523390b45796 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs @@ -1,4 +1,4 @@ -use std::{io::Cursor, time::Duration}; +use std::{io::Cursor, sync::Arc, time::Duration}; use futures::{StreamExt, stream}; use tokio::{select, time::sleep}; @@ -6,11 +6,14 @@ use tokio_test::{assert_pending, task::spawn}; use tracing::Instrument; use vector_common::finalization::Finalizable; -use super::{create_default_buffer_v2, read_next, read_next_some}; +use super::{ + create_default_buffer_v2, create_default_buffer_v2_with_usage, read_next, read_next_some, +}; use crate::{ EventCount, assert_buffer_is_empty, assert_buffer_records, + buffer_usage_data::BufferUsageHandle, test::{MultiEventRecord, SizedRecord, acknowledge, install_tracing_helpers, with_temp_dir}, - variants::disk_v2::{tests::create_default_buffer_v2_with_usage, writer::RecordWriter}, + variants::disk_v2::{BufferWriter, DiskBufferConfigBuilder, Ledger, writer::RecordWriter}, }; #[tokio::test] @@ -155,8 +158,26 @@ async fn initial_size_correct_with_multievents() { let data_dir = dir.to_path_buf(); async move { - // Create a regular buffer, no customizations required. - let (mut writer, _, _) = create_default_buffer_v2(data_dir.clone()).await; + // Build a write-only buffer without a finalizer task. Using + // `from_config_inner` would spawn a background finalizer that holds + // an Arc (and the lock file), causing a racy + // LedgerLockAlreadyHeld when we reopen the buffer below. + let config = DiskBufferConfigBuilder::from_path(&data_dir) + .build() + .expect("creating buffer config should not fail"); + let usage_handle = BufferUsageHandle::noop(); + let ledger = Ledger::load_or_create(config, usage_handle) + .await + .expect("ledger should not fail to load/create"); + let ledger = Arc::new(ledger); + + let mut writer = BufferWriter::new(Arc::clone(&ledger)); + writer + .validate_last_write() + .await + .expect("validate_last_write should not fail"); + + ledger.synchronize_buffer_usage(); let input_items = (512..768) .cycle() @@ -209,14 +230,14 @@ async fn initial_size_correct_with_multievents() { writer.flush().await.expect("writer flush should not fail"); writer.close(); - // Now drop our buffer and reopen it. - // Yield to allow the background finalizer task to observe the closed - // stream and release its Arc (and thus the lock file) before - // we attempt to reopen the buffer. + // Drop the first buffer. No background finalizer task exists, so the + // ledger lock is released immediately when the last Arc is dropped. drop(writer); - tokio::task::yield_now().await; + drop(ledger); + + // Reopen the buffer with a full reader to verify the persisted data. let (writer, mut reader, ledger, usage) = - create_default_buffer_v2_with_usage::<_, MultiEventRecord>(data_dir).await; + create_default_buffer_v2_with_usage::<_, MultiEventRecord>(&data_dir).await; drop(writer); // Make sure our usage data agrees with our expected event count and byte size: