diff --git a/changelog.d/source_chunk_size_config.feature.md b/changelog.d/source_chunk_size_config.feature.md new file mode 100644 index 0000000000000..2e4fd8a9e8921 --- /dev/null +++ b/changelog.d/source_chunk_size_config.feature.md @@ -0,0 +1,2 @@ +Add `--chunk-size` / `VECTOR_CHUNK_SIZE` to configure the source sender batch size and source output buffer base capacity (defaults to 1000 events). +authors: sakateka diff --git a/lib/vector-core/src/source_sender/builder.rs b/lib/vector-core/src/source_sender/builder.rs index ae09c9ebd6bb4..675ce817bcb58 100644 --- a/lib/vector-core/src/source_sender/builder.rs +++ b/lib/vector-core/src/source_sender/builder.rs @@ -5,8 +5,8 @@ use vector_common::histogram; use vector_common::internal_event::DEFAULT_OUTPUT; use super::{ - CHUNK_SIZE, LAG_TIME_NAME, Output, OutputMetrics, PostProcessor, SEND_BATCH_LATENCY_NAME, - SEND_LATENCY_NAME, SourceSender, SourceSenderItem, + LAG_TIME_NAME, Output, OutputMetrics, PostProcessor, SEND_BATCH_LATENCY_NAME, + SEND_LATENCY_NAME, SourceSender, SourceSenderItem, chunk_size }; use crate::config::{ComponentKey, OutputId, SourceOutput}; @@ -23,7 +23,7 @@ pub struct Builder { impl Default for Builder { fn default() -> Self { Self { - buf_size: CHUNK_SIZE, + buf_size: chunk_size(), default_output: None, named_outputs: Default::default(), output_metrics: OutputMetrics::new( diff --git a/lib/vector-core/src/source_sender/mod.rs b/lib/vector-core/src/source_sender/mod.rs index d3c28330254b6..162a3e7fb1a39 100644 --- a/lib/vector-core/src/source_sender/mod.rs +++ b/lib/vector-core/src/source_sender/mod.rs @@ -17,7 +17,30 @@ pub use errors::SendError; use output::{Output, OutputMetrics}; pub use sender::{SourceSender, SourceSenderItem}; -pub const CHUNK_SIZE: usize = 1000; +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// Default number of events batched per source send, and the base used for source output buffer +/// sizing. Used when the chunk size has not been configured at startup. +pub const DEFAULT_CHUNK_SIZE: usize = 1000; + +static CHUNK_SIZE: AtomicUsize = AtomicUsize::new(0); + +/// Returns the configured source sender chunk size, or [`DEFAULT_CHUNK_SIZE`] if unset. +#[must_use] +pub fn chunk_size() -> usize { + match CHUNK_SIZE.load(Ordering::Relaxed) { + 0 => DEFAULT_CHUNK_SIZE, + size => size, + } +} + +/// Sets the process-wide source sender chunk size. Must be called at most once, before the +/// topology is built. Panics if called more than once. +pub fn set_chunk_size(size: usize) { + CHUNK_SIZE + .compare_exchange(0, size, Ordering::Acquire, Ordering::Relaxed) + .unwrap_or_else(|_| panic!("double chunk_size initialization")); +} #[cfg(any(test, feature = "test"))] const TEST_BUFFER_SIZE: usize = 100; diff --git a/lib/vector-core/src/source_sender/output.rs b/lib/vector-core/src/source_sender/output.rs index 6569fd5ca0074..abbeef202360c 100644 --- a/lib/vector-core/src/source_sender/output.rs +++ b/lib/vector-core/src/source_sender/output.rs @@ -24,7 +24,7 @@ use vector_common::{ }; use vrl::value::Value; -use super::{CHUNK_SIZE, PostProcessor, SendError, SourceSenderItem}; +use super::{PostProcessor, SendError, SourceSenderItem,chunk_size}; use crate::{ EstimatedJsonEncodedSizeOf, config::{OutputId, log_schema}, @@ -281,7 +281,7 @@ impl Output { S: Stream + Unpin, E: Into + ByteSizeOf, { - let mut stream = events.ready_chunks(CHUNK_SIZE); + let mut stream = events.ready_chunks(chunk_size()); while let Some(events) = stream.next().await { self.send_batch(events).await?; } @@ -305,7 +305,7 @@ impl Output { let mut unsent_event_count = UnsentEventCount::new(events.len()); let send_batch_start = Instant::now(); - for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) { + for events in array::events_into_arrays(events, Some(chunk_size())) { self.send_inner(events, &mut unsent_event_count, reference) .await .inspect_err(|error| { diff --git a/lib/vector-core/src/source_sender/tests.rs b/lib/vector-core/src/source_sender/tests.rs index dd0b1da9233cb..ee9404db17e89 100644 --- a/lib/vector-core/src/source_sender/tests.rs +++ b/lib/vector-core/src/source_sender/tests.rs @@ -147,7 +147,7 @@ async fn emits_component_discarded_events_total_for_send_batch() { let (mut sender, _recv) = SourceSender::new_test_sender_with_options(1, None); let expected_drop = 100; - let events: Vec = (0..(CHUNK_SIZE + expected_drop)) + let events: Vec = (0..(chunk_size() + expected_drop)) .map(|_| { Event::Metric(Metric::new( "name", @@ -157,7 +157,7 @@ async fn emits_component_discarded_events_total_for_send_batch() { }) .collect(); - // `CHUNK_SIZE` events will be sent into buffer but then the future will not be polled to completion. + // `chunk_size()` events will be sent into buffer but then the future will not be polled to completion. let res = timeout( std::time::Duration::from_millis(100), sender.send_batch(events), diff --git a/src/app.rs b/src/app.rs index bc797cc72284f..6235362532f1c 100644 --- a/src/app.rs +++ b/src/app.rs @@ -230,7 +230,7 @@ impl Application { ); } - let runtime = build_runtime(opts.root.threads, "vector-worker")?; + let runtime = build_runtime(opts.root.threads, opts.root.chunk_size, "vector-worker")?; // Signal handler for OS and provider messages. let mut signals = SignalPair::new(&runtime); @@ -542,7 +542,11 @@ fn get_log_levels(default: &str) -> String { .unwrap_or_else(|_| default.into()) } -pub fn build_runtime(threads: Option, thread_name: &str) -> Result { +pub fn build_runtime( + threads: Option, + chunk_size: Option, + thread_name: &str, +) -> Result { let mut rt_builder = runtime::Builder::new_multi_thread(); rt_builder.max_blocking_threads(20_000); rt_builder.enable_all().thread_name(thread_name); @@ -557,7 +561,16 @@ pub fn build_runtime(threads: Option, thread_name: &str) -> Result, + /// Number of events batched per source send and used as the base for source output buffer sizing + /// (source output buffer capacity is this value multiplied by the number of worker threads) + #[arg(long, env = "VECTOR_CHUNK_SIZE")] + pub chunk_size: Option, + /// Enable more detailed internal logging. Repeat to increase level. Overridden by `--quiet`. #[arg(short, long, action = ArgAction::Count)] pub verbose: u8, diff --git a/src/topology/builder.rs b/src/topology/builder.rs index d325d1fc070d3..c4d65c54475ab 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -31,7 +31,7 @@ use vector_lib::{ internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered}, latency::LatencyRecorder, schema::Definition, - source_sender::{CHUNK_SIZE, SourceSenderItem}, + source_sender::{SourceSenderItem, chunk_size}, transform::update_runtime_schema_definition, }; use vector_lib::{gauge, internal_event::GaugeName}; @@ -72,9 +72,8 @@ static ENRICHMENT_TABLES_LOAD_LOCK: LazyLock> = LazyLock::new(Asy static METRICS_STORAGE: LazyLock = LazyLock::new(MetricsStorage::default); pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock = - LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE); + LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * chunk_size()); -const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap(); pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap(); static TRANSFORM_CONCURRENCY_LIMIT: LazyLock = LazyLock::new(|| { @@ -1299,8 +1298,10 @@ impl Runner { .into_stream() .filter(move |events| ready(filter_events_type(events, self.input_type))); + let ready_array_capacity = + NonZeroUsize::new(chunk_size() * 4).expect("chunk size is non-zero"); let mut input_rx = - super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY); + super::ready_arrays::ReadyArrays::with_capacity(input_rx, ready_array_capacity); let mut in_flight = FuturesOrdered::new(); let mut shutting_down = false; diff --git a/website/cue/reference/cli.cue b/website/cue/reference/cli.cue index ac787886c295e..bd55259d8b4ec 100644 --- a/website/cue/reference/cli.cue +++ b/website/cue/reference/cli.cue @@ -192,6 +192,12 @@ cli: { type: "integer" env_var: "VECTOR_THREADS" } + "chunk-size": { + description: env_vars.VECTOR_CHUNK_SIZE.description + default: env_vars.VECTOR_CHUNK_SIZE.type.uint.default + type: "integer" + env_var: "VECTOR_CHUNK_SIZE" + } "internal-log-rate-limit": { _short: "i" description: env_vars.VECTOR_INTERNAL_LOG_RATE_LIMIT.description @@ -659,6 +665,15 @@ cli: { unit: null } } + VECTOR_CHUNK_SIZE: { + description: """ + The number of events batched per source send and used as the base for source output buffer sizing. + """ + type: uint: { + default: 1000 + unit: "events" + } + } VECTOR_WATCH_CONFIG: { description: "Watch for changes in the configuration file and reload accordingly" type: bool: default: false