From d45e45e3c70c14de2bbd958d5afe452aafe6930e Mon Sep 17 00:00:00 2001 From: Rob Blafford Date: Wed, 17 Jun 2026 14:18:51 +0000 Subject: [PATCH 1/5] fix(logstash source): delimit ACK windows by WindowSize frames The ACK generator inferred writer-window boundaries by counting events down to the advertised WindowSize. WindowSize is a maximum unacked count, not an exact frame count, so an early-flushed or under-filled window never had its boundary marked; when coalesced with a later window in one read, that window was dropped from the ACKs and the later window's higher sequence leaked onto it. Filebeat reported this as "invalid sequence number received (seq=N, expected=M)", causing reconnects and duplicate retransmits. Delimit ACK domains by the WindowSize frame itself instead: each W frame opens a new domain (window_id), data frames are stamped with it, and the acker emits one cumulative ACK per domain. Every ACK value is then drawn from within the domain it acks, so it can never exceed the window the sender is waiting on. Replaces the window_events_remaining counter (and its overwrite-on-new-window hazard) with a monotonic per-decoder window id threaded through the compressed-frame path. Adds reproduction tests for under-filled windows with both reset and monotonic sequence numbering. --- ...3346_logstash_ack_window_boundaries.fix.md | 3 + src/sources/logstash.rs | 252 ++++++++++++------ 2 files changed, 175 insertions(+), 80 deletions(-) create mode 100644 changelog.d/23346_logstash_ack_window_boundaries.fix.md diff --git a/changelog.d/23346_logstash_ack_window_boundaries.fix.md b/changelog.d/23346_logstash_ack_window_boundaries.fix.md new file mode 100644 index 0000000000000..117056a71ea63 --- /dev/null +++ b/changelog.d/23346_logstash_ack_window_boundaries.fix.md @@ -0,0 +1,3 @@ +Fixed the `logstash` source to delimit ACK windows by `WindowSize` frames rather than by counting events down to the advertised window size. The previous approach could lose a window boundary when a sender flushed a window with fewer events than advertised (legal, since `WindowSize` is a maximum) and a later window was coalesced into the same read, causing Vector to ACK a sequence number beyond the window the sender was waiting on. Beats/Filebeat reported this as `invalid sequence number received (seq=N, expected=M)`, leading to reconnects and duplicate retransmits. + +authors: graphcareful diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index a8b6c6b389e28..3c5a1d3e4091e 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -3,7 +3,6 @@ use std::{ convert::TryFrom, io::{self, Read}, net::SocketAddr, - num::NonZeroUsize, time::Duration, }; @@ -265,18 +264,10 @@ impl TcpSource for LogstashSource { } struct LogstashAcker { - // Batched reads can contain multiple writer windows. Preserve a separate - // ACK point for each completed window so Filebeat never sees an ACK that - // advances past the current window it is waiting on. If the batch ends in - // the middle of a window, ACK the last received event in that final ACK - // domain so clients are not forced to wait for the advertised window size. - // Lumberjack defines WindowSize as a maximum unacked count, so a sender can - // legitimately advertise a fresh window after a previously ACKed partial - // tail. Within a single ReadyFrames batch, the only incomplete ACK domain - // we can represent independently is the final tail we have actually seen. - // We expect most batches to need only one ACK point, either for a single - // completed window or for one partial tail. Multiple ACKs are only needed - // when ReadyFrames coalesces multiple logical windows into one batch. + // One cumulative ACK per writer window in the batch, in wire order. Each + // ACK is a window's highest sequence; since sequences reset per window it + // never exceeds the window the sender awaits. A window split across reads + // gets a partial-tail ACK and completes in a later batch. acknowledgements: SmallVec<[(LogstashProtocolVersion, u32); 1]>, } @@ -285,9 +276,11 @@ impl LogstashAcker { let acknowledgements = frames .iter() .enumerate() - // ACK each completed writer window and the last frame in a partial batch if ReadyFrames - // flushes before the current window is complete. - .filter(|(index, frame)| frame.window_end || index + 1 == frames.len()) + // last frame of each window: the next frame opens a new one, or this + // is the end of the batch. + .filter(|(index, frame)| { + index + 1 == frames.len() || frames[index + 1].window_id != frame.window_id + }) .map(|(_, frame)| (frame.protocol, frame.sequence_number)) .collect(); @@ -321,55 +314,66 @@ enum LogstashDecoderReadState { PendingFrames(VecDeque<(LogstashEventFrame, usize)>), } +/// Tracks the current Lumberjack writer window (ack domain) so the acker can +/// emit one ACK per window even when a read coalesces several. +/// +/// The boundary is the *presence* of a `WindowSize` frame, not its value: +/// `WindowSize` is a maximum, so counting events down to it desyncs when a +/// sender under-fills a window. Delimiting on the frame itself can't desync. +#[derive(Debug, Clone, Copy)] +struct WindowState { + /// Monotonic id of the current ack domain. + current: u64, + /// Whether a `WindowSize` frame governs incoming data frames. + active: bool, +} + +impl WindowState { + const fn new() -> Self { + Self { + current: 0, + active: false, + } + } + + /// Opens a new ack domain in response to a `WindowSize` frame. + const fn open(&mut self) { + self.current += 1; + self.active = true; + } + + /// Id for the next data frame. Frames under a `WindowSize` share its id; + /// bare frames (no active window) each get a fresh id, so a non-windowed + /// sender's frames are ACKed individually. + const fn frame_id(&mut self) -> u64 { + if !self.active { + self.current += 1; + } + self.current + } +} + #[derive(Debug)] struct LogstashDecoder { state: LogstashDecoderReadState, - // Tracks how many events remain in the current writer window. This lets us - // preserve sender window boundaries even if ReadyFrames later batches - // multiple decoded windows together before ACKing. - window_events_remaining: Option, + window: WindowState, } impl LogstashDecoder { const fn new() -> Self { - Self::new_with_window_events_remaining(None) + Self::new_with_window(WindowState::new()) } - const fn new_with_window_events_remaining( - window_events_remaining: Option, - ) -> Self { + const fn new_with_window(window: WindowState) -> Self { Self { state: LogstashDecoderReadState::ReadProtocol, - window_events_remaining, + window, } } - /// Marks whether a decoded frame closes the current writer window. - /// - /// Filebeat expects ACKs to stay within the current window announced by the - /// most recent `WindowSize` frame. The generic TCP batching layer can merge - /// frames from multiple windows before we build an ACK, so we record the - /// per-frame window boundary here and let the acker emit one ACK frame per - /// completed window later. - /// - /// If a sender omits `WindowSize`, we keep the previous behavior and treat - /// each standalone frame as ACKable on its own. - const fn annotate_frame(&mut self, frame: &mut LogstashEventFrame) { - match self.window_events_remaining { - Some(remaining) if remaining.get() == 1 => { - frame.window_end = true; - self.window_events_remaining = None; - } - Some(remaining) => { - frame.window_end = false; - self.window_events_remaining = NonZeroUsize::new(remaining.get() - 1); // safe because we know remaining is greater than 1 - } - None => { - // Preserve existing behavior for inputs that send standalone data frames - // without an explicit WindowSize frame. - frame.window_end = true; - } - } + /// Stamps a decoded data frame with the id of the window it belongs to. + const fn stamp_frame(&mut self, frame: &mut LogstashEventFrame) { + frame.window_id = self.window.frame_id(); } } @@ -488,12 +492,13 @@ struct LogstashEventFrame { protocol: LogstashProtocolVersion, sequence_number: u32, fields: BTreeMap, - window_end: bool, + // Ack-domain (writer window) id, stamped by the decoder. See [`WindowState`]. + window_id: u64, } struct DecodedCompressedFrames { frames: VecDeque<(LogstashEventFrame, usize)>, - window_events_remaining: Option, + window: WindowState, } // Based on spec at: https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md @@ -545,16 +550,8 @@ impl Decoder for LogstashDecoder { Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack), } } - // The window size indicates how many events the writer will send before waiting - // for acks. We preserve this boundary so the acker can emit one ACK per - // completed window, even if multiple windows are batched together later. - // Filebeat accepts cumulative ACKs, but not ACKs that advance past the - // current writer window it is waiting on. WindowSize is a maximum unacked - // count, not necessarily an exact count of immediately following frames, so a - // sender can legitimately advertise a new window after a previously ACKed - // partial tail. If a malformed sender does this before that earlier tail has - // actually been ACKed, we tolerate the reset here even though it can collapse - // the older incomplete domain into the new one. + // A `WindowSize` frame opens a new ack domain; its value is unused + // (we delimit by the frame, not by counting events down to its size). // // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#window-size-frame-type LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => { @@ -562,8 +559,8 @@ impl Decoder for LogstashDecoder { return Ok(None); } - let window_size = src.get_u32() as usize; - self.window_events_remaining = NonZeroUsize::new(window_size); + let _window_size = src.get_u32(); + self.window.open(); LogstashDecoderReadState::ReadProtocol } @@ -584,7 +581,7 @@ impl Decoder for LogstashDecoder { let Some((mut frame, byte_size)) = decode_data_frame(protocol, src) else { return Ok(None); }; - self.annotate_frame(&mut frame); + self.stamp_frame(&mut frame); LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into()) } @@ -593,23 +590,19 @@ impl Decoder for LogstashDecoder { let Some((mut frame, byte_size)) = decode_json_frame(protocol, src)? else { return Ok(None); }; - self.annotate_frame(&mut frame); + self.stamp_frame(&mut frame); LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into()) } // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type // - // The compressed payload is still part of the same logical Lumberjack stream, so - // the nested decoder must inherit the current window state and return the updated - // state after expanding the payload. Re-annotating the emitted frames here would - // overwrite any WindowSize boundaries that were established inside the compressed - // payload and can also lose progress from a partially consumed outer window. + // Thread window state through the nested decoder so window ids stay + // monotonic across the compression boundary. LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => { - let Some(decoded) = decode_compressed_frame(src, self.window_events_remaining)? - else { + let Some(decoded) = decode_compressed_frame(src, self.window)? else { return Ok(None); }; - self.window_events_remaining = decoded.window_events_remaining; + self.window = decoded.window; LogstashDecoderReadState::PendingFrames(decoded.frames) } @@ -653,7 +646,7 @@ fn decode_data_frame( protocol, sequence_number, fields, - window_end: false, + window_id: 0, }, byte_size, )) @@ -712,7 +705,7 @@ fn decode_json_frame( protocol, sequence_number, fields, - window_end: false, + window_id: 0, }, byte_size, ))) @@ -720,7 +713,7 @@ fn decode_json_frame( fn decode_compressed_frame( src: &mut BytesMut, - window_events_remaining: Option, + window: WindowState, ) -> Result, DecodeError> { let mut rest = src.as_ref(); @@ -749,7 +742,7 @@ fn decode_compressed_frame( let mut buf = res?; - let mut decoder = LogstashDecoder::new_with_window_events_remaining(window_events_remaining); + let mut decoder = LogstashDecoder::new_with_window(window); let mut frames = VecDeque::new(); @@ -758,7 +751,7 @@ fn decode_compressed_frame( } Ok(Some(DecodedCompressedFrames { frames, - window_events_remaining: decoder.window_events_remaining, + window: decoder.window, })) } @@ -1117,6 +1110,105 @@ mod test { assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; } + // Regression tests for the `seq=N, expected=M` error. A sender may legally + // under-fill a window (`WindowSize` is a maximum); when such a window is + // coalesced with a later one in a single read, the later window's sequence + // must not leak onto it. + + #[tokio::test] + async fn under_filled_window_must_not_leak_later_window_sequence() { + // Window 1 advertises up to 2 unacked events but only sends 1 (legal + // early flush), then window 2 advertises 10 and fills it. Filebeat is + // waiting on window 1 with `expected = 2`. The ACK stream must stay + // within each window: ACK 1 for window 1 (one event delivered), then + // ACK 10 for window 2. It must never emit seq=10 for window 1. + let mut req = BytesMut::new(); + push_window_size(&mut req, 2); + push_req( + &mut req, + 1, + &[("message", "under-filled window, only event")], + ); + push_window_size(&mut req, 10); + for seq in 1..=10 { + push_req(&mut req, seq, &[("message", "second window")]); + } + + // Decoded sequence numbers: window 1 -> [1], window 2 -> [1..=10]. + let expected_sequences: Vec = std::iter::once(1).chain(1..=10).collect(); + let decoded = decode_frames_and_assert_sequences(req, &expected_sequences); + + // Window 1 is ACKed at its own (partial) sequence 1, then window 2 at + // 10. The size-2 window never receives seq=10, so Filebeat does not log + // "invalid sequence number received (seq=10, expected=2)". + assert_acknowledgements_for_ready_frames(decoded, &expected_sequences, &[1, 10]).await; + } + + #[tokio::test] + async fn under_filled_window_with_monotonic_sequences_must_not_leak() { + // Same defect, but with monotonic sequence numbers across windows to + // show it is not specific to per-window sequence resets. Window 1 + // (advertised 2) delivers one event (seq 1); window 2 (advertised 10) + // delivers seq 2..=11. Correct ACKs: [1, 11]. + let mut req = BytesMut::new(); + push_window_size(&mut req, 2); + push_req( + &mut req, + 1, + &[("message", "under-filled window, only event")], + ); + push_window_size(&mut req, 10); + for seq in 2..=11 { + push_req(&mut req, seq, &[("message", "second window")]); + } + + let expected_sequences: Vec = (1..=11).collect(); + let decoded = decode_frames_and_assert_sequences(req, &expected_sequences); + + assert_acknowledgements_for_ready_frames(decoded, &expected_sequences, &[1, 11]).await; + } + + #[tokio::test] + async fn ack_never_advances_past_the_window_filebeat_is_waiting_on() { + // States the invariant directly in the terms of the Filebeat error: + // the ACK that lands on the first (oldest) window must not exceed that + // window's advertised size. + let first_window_size = 2u32; + + let mut req = BytesMut::new(); + push_window_size(&mut req, first_window_size); + push_req( + &mut req, + 1, + &[("message", "under-filled window, only event")], + ); + push_window_size(&mut req, 10); + for seq in 1..=10 { + push_req(&mut req, seq, &[("message", "second window")]); + } + + let decoded = decode_frames(req); + + let stream = stream::iter(decoded.into_iter().map(Ok::<_, DecodeError>)); + let mut ready = ReadyFrames::with_capacity(stream, 16); + let (frames, _) = ready.next().await.unwrap().unwrap(); + + let ack = LogstashAcker::new(&frames) + .build_ack(TcpSourceAck::Ack) + .unwrap(); + let acknowledgements = decode_acknowledgements(ack); + + let first_ack = *acknowledgements + .first() + .expect("the oldest window must receive an ACK"); + assert!( + first_ack <= first_window_size, + "first ACK seq={first_ack} exceeds the first window's size {first_window_size}; \ + Filebeat reports this as `invalid sequence number received (seq={first_ack}, \ + expected={first_window_size})`", + ); + } + async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) { let seq = rng().random_range(1..u32::MAX); let mut socket = tokio::net::TcpStream::connect(address).await.unwrap(); From f02c37b7655cc7589d3aa7267689e16d038fde02 Mon Sep 17 00:00:00 2001 From: Rob Blafford Date: Wed, 17 Jun 2026 18:48:38 +0000 Subject: [PATCH 2/5] test(logstash source): proptest fuzzer for the ACK window invariant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Generates random Lumberjack window streams — including spec-legal under-filled windows (WindowSize advertised larger than the frames actually sent) — groups the decoded frames into arbitrary contiguous batches (modeling ReadyFrames coalescing under any byte-arrival pattern), builds one ACK per batch, and replays the emitted sequences FIFO against the declared windows. The invariant: an ACK landing on a window must never exceed the frames that window actually carried — exactly go-lumber's `AwaitACK` check (ackSeq > count => "invalid sequence number received (seq=N, expected=M)"). Against the previous counting acker this fails and shrinks to the minimal case (one 1-frame window followed by a 2-frame window, coalesced into one read). With the window-id acker it passes. Serves as a permanent regression guard; the discovered seed is checked in under proptest-regressions/. --- proptest-regressions/sources/logstash.txt | 7 ++ src/sources/logstash.rs | 106 ++++++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 proptest-regressions/sources/logstash.txt diff --git a/proptest-regressions/sources/logstash.txt b/proptest-regressions/sources/logstash.txt new file mode 100644 index 0000000000000..6b67f83112eb8 --- /dev/null +++ b/proptest-regressions/sources/logstash.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc b3e438be44a9b6d3ad27dd1c5cf54f143e698ff1f01b178661869dffb67c7710 # shrinks to windows = [GenWindow { declared: 2, actual: 1, compressed: false }, GenWindow { declared: 2, actual: 2, compressed: false }], batch_sizes = [3] diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index 3c5a1d3e4091e..251c06bd5c4a2 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -1209,6 +1209,112 @@ mod test { ); } + // Property test: an ACK must never advance past the window the sender is + // waiting on (go-lumber rejects ackSeq > count). Generate writer windows, + // decode them, regroup the frames into arbitrary batches (modeling + // ReadyFrames coalescing), and replay the emitted ACKs FIFO against the + // window sizes. A violation is the customer's `invalid sequence number`. + + use proptest::prelude::*; + + /// A writer window: advertises `WindowSize = declared` but sends `actual` + /// frames (`actual <= declared`). `actual < declared` is a legal under-fill, + /// after which the sender expects ACKs no greater than `actual`. + #[derive(Debug, Clone)] + struct GenWindow { + declared: u32, + actual: u32, + compressed: bool, + } + + fn encode_windows(windows: &[GenWindow]) -> BytesMut { + let mut buf = BytesMut::new(); + for w in windows { + push_window_size(&mut buf, w.declared); + if w.compressed { + let mut inner = BytesMut::new(); + for seq in 1..=w.actual { + push_req(&mut inner, seq, &[("message", "x")]); + } + push_compressed(&mut buf, &inner); + } else { + for seq in 1..=w.actual { + push_req(&mut buf, seq, &[("message", "x")]); + } + } + } + buf + } + + /// Replays `acks` FIFO against the window sizes; returns the first ACK that + /// advances past its window, if any. + fn first_out_of_window_ack(window_sizes: &[u32], acks: &[u32]) -> Option<(u32, u32)> { + let mut head = 0usize; + for &ack in acks { + // 0 once we're past the last window: any ACK there is a violation. + let size = window_sizes.get(head).copied().unwrap_or(0); + if ack > size { + return Some((ack, size)); + } + if ack == size { + head += 1; + } + } + None + } + + proptest::proptest! { + #![proptest_config(proptest::prelude::ProptestConfig::with_cases(2048))] + + #[test] + fn acks_never_advance_past_a_window( + windows in proptest::collection::vec( + // declared >= actual >= 1; `extra` makes the window under-filled. + (1u32..=20, 0u32..=8, proptest::prelude::any::()) + .prop_map(|(actual, extra, compressed)| GenWindow { + declared: actual + extra, + actual, + compressed, + }), + 1..20, + ), + batch_sizes in proptest::collection::vec(1usize..=8, 1..40), + ) { + // each window's sender expects ACKs <= the frames it actually sent + let window_sizes: Vec = windows.iter().map(|w| w.actual).collect(); + + let decoded: Vec = decode_frames(encode_windows(&windows)) + .into_iter() + .map(|(frame, _)| frame) + .collect(); + + // regroup into arbitrary contiguous batches; one ACK per batch, in order + let mut acks = Vec::new(); + let mut offset = 0usize; + let mut sizes = batch_sizes.iter().copied().cycle(); + while offset < decoded.len() { + let take = sizes.next().unwrap().min(decoded.len() - offset); + let batch = &decoded[offset..offset + take]; + offset += take; + if let Some(ack_bytes) = + LogstashAcker::new(batch).build_ack(TcpSourceAck::Ack) + { + acks.extend(decode_acknowledgements(ack_bytes)); + } + } + + if let Some((ack, size)) = first_out_of_window_ack(&window_sizes, &acks) { + proptest::prop_assert!( + false, + "ACK seq {} advanced past window of size {} \ + (== `invalid sequence number received (seq={}, expected={})`)\n\ + windows={:?}\nbatch_sizes={:?}\nacks={:?}", + ack, size, ack, size, window_sizes, batch_sizes, acks, + ); + } + } + } + async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) { let seq = rng().random_range(1..u32::MAX); let mut socket = tokio::net::TcpStream::connect(address).await.unwrap(); From 9c7497dafb7eaa1561895f8bef0c58baae2a8d86 Mon Sep 17 00:00:00 2001 From: Rob Blafford Date: Thu, 18 Jun 2026 15:05:54 +0000 Subject: [PATCH 3/5] fix(logstash source): close the connection on a malformed frame MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A failed JSON decode or decompression was marked continuable (`can_continue() == true`), but the Lumberjack stream is length-prefixed binary with no resync marker. Continuing left the decoder desynchronized yet running: the uncompressed-JSON path never consumes the bad bytes (a CPU busy-loop) and the compressed path advances but keeps stale state, misframing subsequent bytes and emitting ACKs for bogus sequence numbers — surfacing as `invalid sequence number received` on the client. Treat every decode error as fatal (`can_continue() == false`) so the connection closes and the client reconnects (fresh decoder) and retransmits the unacknowledged window. This matches the upstream `logstash-input-beats` server, which closes the channel on any decode exception, and is at-least-once and safe. Adds decoder-level tests that malformed JSON and bad compression are fatal, and a socket-level test that a malformed frame closes the connection without sending an ACK. Also adds the missing authors line to the prior changelog fragment. --- .../23346_logstash_decode_error_fatal.fix.md | 3 + src/sources/logstash.rs | 85 +++++++++++++++++-- 2 files changed, 79 insertions(+), 9 deletions(-) create mode 100644 changelog.d/23346_logstash_decode_error_fatal.fix.md diff --git a/changelog.d/23346_logstash_decode_error_fatal.fix.md b/changelog.d/23346_logstash_decode_error_fatal.fix.md new file mode 100644 index 0000000000000..125991620dc4f --- /dev/null +++ b/changelog.d/23346_logstash_decode_error_fatal.fix.md @@ -0,0 +1,3 @@ +Fixed the `logstash` source to close the connection on a malformed frame instead of attempting to continue. A failed JSON decode or decompression previously left the decoder desynchronized but still running, which could busy-loop and emit ACKs for bogus sequence numbers (surfacing as `invalid sequence number received` on the client). The source now treats any decode error as fatal and closes the connection — matching the upstream `logstash-input-beats` server — so the client reconnects and retransmits the unacknowledged window. + +authors: graphcareful diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index 251c06bd5c4a2..80c026ef39916 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -393,15 +393,12 @@ pub enum DecodeError { impl StreamDecodingError for DecodeError { fn can_continue(&self) -> bool { - use DecodeError::*; - - match self { - IO { .. } => false, - UnknownProtocolVersion { .. } => false, - UnknownFrameType { .. } => false, - JsonFrameFailedDecode { .. } => true, - DecompressionFailed { .. } => true, - } + // No decode error is recoverable on this stream. Lumberjack is a + // length-prefixed binary protocol with no resync marker, so once a + // frame fails to decode the stream position is no longer trustworthy: + // continuing would misframe subsequent bytes and emit ACKs for bogus + // sequence numbers. + false } } @@ -1010,6 +1007,76 @@ mod test { } } + // A malformed frame must be a fatal (non-continuable) decode error: the + // Lumberjack stream can't be resynced, so the connection is closed rather + // than continuing with a desynced decoder (which would emit bogus ACKs). + // This matches upstream logstash-input-beats, which closes the channel on + // any decode exception. + + #[test] + fn malformed_json_frame_is_a_fatal_decode_error() { + let mut decoder = LogstashDecoder::new(); + let mut src = BytesMut::new(); + src.put_u8(b'2'); + src.put_u8(b'J'); + src.put_u32(1); // sequence number + let bad = b"{ not valid json "; + src.put_u32(bad.len() as u32); // payload size + src.put(&bad[..]); + + let err = decoder.decode(&mut src).unwrap_err(); + assert!(matches!(err, DecodeError::JsonFrameFailedDecode { .. })); + assert!( + !err.can_continue(), + "a malformed JSON frame must be fatal so the connection closes", + ); + } + + #[test] + fn malformed_compressed_frame_is_a_fatal_decode_error() { + let mut decoder = LogstashDecoder::new(); + let mut src = BytesMut::new(); + src.put_u8(b'2'); + src.put_u8(b'C'); + let garbage = b"this is not a zlib stream"; + src.put_u32(garbage.len() as u32); // payload size + src.put(&garbage[..]); + + let err = decoder.decode(&mut src).unwrap_err(); + assert!(matches!(err, DecodeError::DecompressionFailed { .. })); + assert!(!err.can_continue()); + } + + #[tokio::test] + async fn malformed_frame_closes_connection_without_ack() { + let (address, _recv) = start_logstash(EventStatus::Delivered).await; + + let mut socket = tokio::net::TcpStream::connect(address).await.unwrap(); + + // A '2' 'J' frame whose payload is not valid JSON. + let mut req = BytesMut::new(); + req.put_u8(b'2'); + req.put_u8(b'J'); + req.put_u32(1); // sequence number + let bad = b"{ not valid json "; + req.put_u32(bad.len() as u32); // payload size + req.put(&bad[..]); + socket.write_all(&req).await.unwrap(); + + // The source must close the connection on the decode error and send no + // ACK; the client will reconnect and retransmit. + let mut output = BytesMut::new(); + let result = socket.read_buf(&mut output).await; + assert!( + matches!(result, Ok(0)) || result.is_err(), + "expected the connection to close; read returned {result:?} with {output:?}", + ); + assert!( + output.is_empty(), + "no ACK should be sent for a malformed frame, got {output:?}", + ); + } + #[tokio::test] async fn distinct_windows_do_not_share_an_ack_domain() { let mut req = BytesMut::new(); From 114350a0ebc24c5691bec992cd18f44d1f2b86c3 Mon Sep 17 00:00:00 2001 From: Rob Blafford Date: Thu, 18 Jun 2026 21:03:44 +0000 Subject: [PATCH 4/5] chore(logstash source): add debug logging for ACK/window diagnostics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds debug-level logs to help diagnose client-side `invalid sequence number received` reports when the cause is upstream of Vector (e.g. a proxy reframing the stream). All inherit `peer_addr` from the connection span. - ACK emission: the sequence(s) ACKed per window, in wire order — the direct counterpart to the client's `(seq=N, expected=M)`. - Window open: the `window_id` and advertised `WindowSize` — shows the window structure the decoder reconstructed. - Misframe: on an unexpected protocol-version or frame-type byte, the offending byte plus the following bytes, to reveal what the stream actually contains (e.g. an injected/concatenated payload). --- src/sources/logstash.rs | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index 80c026ef39916..0db9355113a0c 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -293,6 +293,7 @@ impl TcpSourceAcker for LogstashAcker { fn build_ack(self, ack: TcpSourceAck) -> Option { match ack { TcpSourceAck::Ack if !self.acknowledgements.is_empty() => { + debug!(message = "Sending Lumberjack ACK(s).", acks = ?self.acknowledgements); let mut bytes: Vec = Vec::with_capacity(self.acknowledgements.len() * 6); for (protocol_version, sequence_number) in self.acknowledgements { bytes.push(protocol_version.into()); @@ -539,12 +540,21 @@ impl Decoder for LogstashDecoder { use LogstashFrameType::*; - match LogstashFrameType::try_from(src.get_u8())? { - WindowSize => LogstashDecoderReadState::ReadFrame(protocol, WindowSize), - Data => LogstashDecoderReadState::ReadFrame(protocol, Data), - Json => LogstashDecoderReadState::ReadFrame(protocol, Json), - Compressed => LogstashDecoderReadState::ReadFrame(protocol, Compressed), - Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack), + let frame_type = src.get_u8(); + match LogstashFrameType::try_from(frame_type) { + Ok(WindowSize) => LogstashDecoderReadState::ReadFrame(protocol, WindowSize), + Ok(Data) => LogstashDecoderReadState::ReadFrame(protocol, Data), + Ok(Json) => LogstashDecoderReadState::ReadFrame(protocol, Json), + Ok(Compressed) => LogstashDecoderReadState::ReadFrame(protocol, Compressed), + Ok(Ack) => LogstashDecoderReadState::ReadFrame(protocol, Ack), + Err(error) => { + debug!( + message = "Unexpected frame type byte; stream may be misframed.", + frame_type_byte = frame_type, + buffer_head = ?&src[..src.len().min(16)], + ); + return Err(error); + } } } // A `WindowSize` frame opens a new ack domain; its value is unused @@ -556,8 +566,13 @@ impl Decoder for LogstashDecoder { return Ok(None); } - let _window_size = src.get_u32(); + let window_size = src.get_u32(); self.window.open(); + debug!( + message = "Opened Lumberjack writer window.", + window_id = self.window.current, + advertised_size = window_size, + ); LogstashDecoderReadState::ReadProtocol } From b867ea9a1fa70ff3c7a587ea052b637aeb2fd194 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 19 Jun 2026 22:12:01 -0600 Subject: [PATCH 5/5] Refactor test push_req --- src/sources/logstash.rs | 95 ++++++++++++++++------------------------- 1 file changed, 36 insertions(+), 59 deletions(-) diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index 0db9355113a0c..8e5b53bec5aa2 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -855,12 +855,7 @@ mod test { async fn test_protocol(status: EventStatus, sends_ack: bool) { let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (address, recv) = start_logstash(status).await; - spawn_collect_n( - send_req(address, &[("message", "Hello, world!")], sends_ack), - recv, - 1, - ) - .await + spawn_collect_n(send_req(address, "Hello, world!", sends_ack), recv, 1).await }) .await; @@ -878,22 +873,20 @@ mod test { assert!(log.get("timestamp").is_some()); } - fn push_req(req: &mut BytesMut, seq: u32, pairs: &[(&str, &str)]) { + fn push_req(req: &mut BytesMut, seq: u32, message: &str) { req.put_u8(b'2'); req.put_u8(b'D'); req.put_u32(seq); - req.put_u32(pairs.len() as u32); - for (key, value) in pairs { - req.put_u32(key.len() as u32); - req.put(key.as_bytes()); - req.put_u32(value.len() as u32); - req.put(value.as_bytes()); - } + req.put_u32(1u32); + req.put_u32("message".len() as u32); + req.put("message".as_bytes()); + req.put_u32(message.len() as u32); + req.put(message.as_bytes()); } - fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes { + fn encode_req(seq: u32, message: &str) -> Bytes { let mut req = BytesMut::new(); - push_req(&mut req, seq, pairs); + push_req(&mut req, seq, message); req.into() } @@ -1013,7 +1006,7 @@ mod test { #[test] fn v1_decoder_does_not_panic() { let seq = rng().random_range(1..u32::MAX); - let req = encode_req(seq, &[("message", "Hello, World!")]); + let req = encode_req(seq, "Hello, World!"); for i in 0..req.len() - 1 { assert!( decode_data_frame(LogstashProtocolVersion::V1, &mut BytesMut::from(&req[..i])) @@ -1096,10 +1089,10 @@ mod test { async fn distinct_windows_do_not_share_an_ack_domain() { let mut req = BytesMut::new(); push_window_size(&mut req, 1); - push_req(&mut req, 1, &[("message", "first window")]); + push_req(&mut req, 1, "first window"); push_window_size(&mut req, 2); - push_req(&mut req, 1, &[("message", "second window first")]); - push_req(&mut req, 2, &[("message", "second window second")]); + push_req(&mut req, 1, "second window first"); + push_req(&mut req, 2, "second window second"); let decoded = decode_frames_and_assert_sequences(req, &[1, 1, 2]); assert_acknowledgements_for_ready_frames(decoded, &[1, 1, 2], &[1, 2]).await; @@ -1109,11 +1102,11 @@ mod test { async fn distinct_windows_with_monotonic_sequences_ack_the_first_window() { let mut req = BytesMut::new(); push_window_size(&mut req, 2); - push_req(&mut req, 1, &[("message", "first window first")]); - push_req(&mut req, 2, &[("message", "first window second")]); + push_req(&mut req, 1, "first window first"); + push_req(&mut req, 2, "first window second"); push_window_size(&mut req, 2); - push_req(&mut req, 3, &[("message", "second window first")]); - push_req(&mut req, 4, &[("message", "second window second")]); + push_req(&mut req, 3, "second window first"); + push_req(&mut req, 4, "second window second"); let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]); assert_acknowledgements_for_ready_frames(decoded, &[1, 2, 3, 4], &[2, 4]).await; @@ -1123,7 +1116,7 @@ mod test { async fn incomplete_final_window_is_acked_to_the_last_received_event() { let mut req = BytesMut::new(); push_window_size(&mut req, 4); - push_req(&mut req, 1, &[("message", "only event in partial window")]); + push_req(&mut req, 1, "only event in partial window"); let decoded = decode_frames_and_assert_sequences(req, &[1]); assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; @@ -1133,8 +1126,8 @@ mod test { async fn compressed_frames_preserve_inner_window_boundaries() { let mut inner = BytesMut::new(); push_window_size(&mut inner, 2); - push_req(&mut inner, 1, &[("message", "compressed first")]); - push_req(&mut inner, 2, &[("message", "compressed second")]); + push_req(&mut inner, 1, "compressed first"); + push_req(&mut inner, 2, "compressed second"); let mut req = BytesMut::new(); push_compressed(&mut req, &inner); @@ -1147,10 +1140,10 @@ mod test { async fn single_window_split_across_ready_frames_keeps_progressive_acks() { let mut req = BytesMut::new(); push_window_size(&mut req, 4); - push_req(&mut req, 1, &[("message", "first")]); - push_req(&mut req, 2, &[("message", "second")]); - push_req(&mut req, 3, &[("message", "third")]); - push_req(&mut req, 4, &[("message", "fourth")]); + push_req(&mut req, 1, "first"); + push_req(&mut req, 2, "second"); + push_req(&mut req, 3, "third"); + push_req(&mut req, 4, "fourth"); let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]); @@ -1175,18 +1168,14 @@ mod test { let mut first_batch = BytesMut::new(); push_window_size(&mut first_batch, 2); - push_req(&mut first_batch, 1, &[("message", "first partial tail")]); + push_req(&mut first_batch, 1, "first partial tail"); let decoded = decode_frames_with_decoder_and_assert_sequences(&mut decoder, first_batch, &[1]); assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; let mut second_batch = BytesMut::new(); push_window_size(&mut second_batch, 1); - push_req( - &mut second_batch, - 1, - &[("message", "fresh window after ack")], - ); + push_req(&mut second_batch, 1, "fresh window after ack"); let decoded = decode_frames_with_decoder_and_assert_sequences(&mut decoder, second_batch, &[1]); assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; @@ -1206,14 +1195,10 @@ mod test { // ACK 10 for window 2. It must never emit seq=10 for window 1. let mut req = BytesMut::new(); push_window_size(&mut req, 2); - push_req( - &mut req, - 1, - &[("message", "under-filled window, only event")], - ); + push_req(&mut req, 1, "under-filled window, only event"); push_window_size(&mut req, 10); for seq in 1..=10 { - push_req(&mut req, seq, &[("message", "second window")]); + push_req(&mut req, seq, "second window"); } // Decoded sequence numbers: window 1 -> [1], window 2 -> [1..=10]. @@ -1234,14 +1219,10 @@ mod test { // delivers seq 2..=11. Correct ACKs: [1, 11]. let mut req = BytesMut::new(); push_window_size(&mut req, 2); - push_req( - &mut req, - 1, - &[("message", "under-filled window, only event")], - ); + push_req(&mut req, 1, "under-filled window, only event"); push_window_size(&mut req, 10); for seq in 2..=11 { - push_req(&mut req, seq, &[("message", "second window")]); + push_req(&mut req, seq, "second window"); } let expected_sequences: Vec = (1..=11).collect(); @@ -1259,14 +1240,10 @@ mod test { let mut req = BytesMut::new(); push_window_size(&mut req, first_window_size); - push_req( - &mut req, - 1, - &[("message", "under-filled window, only event")], - ); + push_req(&mut req, 1, "under-filled window, only event"); push_window_size(&mut req, 10); for seq in 1..=10 { - push_req(&mut req, seq, &[("message", "second window")]); + push_req(&mut req, seq, "second window"); } let decoded = decode_frames(req); @@ -1316,12 +1293,12 @@ mod test { if w.compressed { let mut inner = BytesMut::new(); for seq in 1..=w.actual { - push_req(&mut inner, seq, &[("message", "x")]); + push_req(&mut inner, seq, "x"); } push_compressed(&mut buf, &inner); } else { for seq in 1..=w.actual { - push_req(&mut buf, seq, &[("message", "x")]); + push_req(&mut buf, seq, "x"); } } } @@ -1397,11 +1374,11 @@ mod test { } } - async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) { + async fn send_req(address: SocketAddr, message: &str, sends_ack: bool) { let seq = rng().random_range(1..u32::MAX); let mut socket = tokio::net::TcpStream::connect(address).await.unwrap(); - let req = encode_req(seq, pairs); + let req = encode_req(seq, message); socket.write_all(&req).await.unwrap(); let mut output = BytesMut::new();