Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions lib/vector-buffers/src/variants/disk_v2/tests/basic.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use std::{io::Cursor, time::Duration};
use std::{io::Cursor, sync::Arc, time::Duration};

use futures::{StreamExt, stream};
use tokio::{select, time::sleep};
use tokio_test::{assert_pending, task::spawn};
use tracing::Instrument;
use vector_common::finalization::Finalizable;

use super::{create_default_buffer_v2, read_next, read_next_some};
use super::{
create_default_buffer_v2, create_default_buffer_v2_with_usage, read_next, read_next_some,
};
use crate::{
EventCount, assert_buffer_is_empty, assert_buffer_records,
buffer_usage_data::BufferUsageHandle,
test::{MultiEventRecord, SizedRecord, acknowledge, install_tracing_helpers, with_temp_dir},
variants::disk_v2::{tests::create_default_buffer_v2_with_usage, writer::RecordWriter},
variants::disk_v2::{BufferWriter, DiskBufferConfigBuilder, Ledger, writer::RecordWriter},
};

#[tokio::test]
Expand Down Expand Up @@ -155,8 +158,26 @@ async fn initial_size_correct_with_multievents() {
let data_dir = dir.to_path_buf();

async move {
// Create a regular buffer, no customizations required.
let (mut writer, _, _) = create_default_buffer_v2(data_dir.clone()).await;
// Build a write-only buffer without a finalizer task. Using
// `from_config_inner` would spawn a background finalizer that holds
// an Arc<Ledger> (and the lock file), causing a racy
// LedgerLockAlreadyHeld when we reopen the buffer below.
let config = DiskBufferConfigBuilder::from_path(&data_dir)
.build()
.expect("creating buffer config should not fail");
let usage_handle = BufferUsageHandle::noop();
let ledger = Ledger::load_or_create(config, usage_handle)
.await
.expect("ledger should not fail to load/create");
let ledger = Arc::new(ledger);

let mut writer = BufferWriter::new(Arc::clone(&ledger));
writer
.validate_last_write()
.await
.expect("validate_last_write should not fail");

ledger.synchronize_buffer_usage();

let input_items = (512..768)
.cycle()
Expand Down Expand Up @@ -209,14 +230,14 @@ async fn initial_size_correct_with_multievents() {
writer.flush().await.expect("writer flush should not fail");
writer.close();

// Now drop our buffer and reopen it.
// Yield to allow the background finalizer task to observe the closed
// stream and release its Arc<Ledger> (and thus the lock file) before
// we attempt to reopen the buffer.
// Drop the first buffer. No background finalizer task exists, so the
// ledger lock is released immediately when the last Arc is dropped.
drop(writer);
tokio::task::yield_now().await;
drop(ledger);

// Reopen the buffer with a full reader to verify the persisted data.
let (writer, mut reader, ledger, usage) =
create_default_buffer_v2_with_usage::<_, MultiEventRecord>(data_dir).await;
create_default_buffer_v2_with_usage::<_, MultiEventRecord>(&data_dir).await;
drop(writer);

// Make sure our usage data agrees with our expected event count and byte size:
Expand Down