diff --git a/changelog.d/23346_logstash_ack_window_boundaries.fix.md b/changelog.d/23346_logstash_ack_window_boundaries.fix.md new file mode 100644 index 0000000000000..117056a71ea63 --- /dev/null +++ b/changelog.d/23346_logstash_ack_window_boundaries.fix.md @@ -0,0 +1,3 @@ +Fixed the `logstash` source to delimit ACK windows by `WindowSize` frames rather than by counting events down to the advertised window size. The previous approach could lose a window boundary when a sender flushed a window with fewer events than advertised (legal, since `WindowSize` is a maximum) and a later window was coalesced into the same read, causing Vector to ACK a sequence number beyond the window the sender was waiting on. Beats/Filebeat reported this as `invalid sequence number received (seq=N, expected=M)`, leading to reconnects and duplicate retransmits. + +authors: graphcareful diff --git a/changelog.d/23346_logstash_decode_error_fatal.fix.md b/changelog.d/23346_logstash_decode_error_fatal.fix.md new file mode 100644 index 0000000000000..125991620dc4f --- /dev/null +++ b/changelog.d/23346_logstash_decode_error_fatal.fix.md @@ -0,0 +1,3 @@ +Fixed the `logstash` source to close the connection on a malformed frame instead of attempting to continue. A failed JSON decode or decompression previously left the decoder desynchronized but still running, which could busy-loop and emit ACKs for bogus sequence numbers (surfacing as `invalid sequence number received` on the client). The source now treats any decode error as fatal and closes the connection — matching the upstream `logstash-input-beats` server — so the client reconnects and retransmits the unacknowledged window. + +authors: graphcareful diff --git a/proptest-regressions/sources/logstash.txt b/proptest-regressions/sources/logstash.txt new file mode 100644 index 0000000000000..6b67f83112eb8 --- /dev/null +++ b/proptest-regressions/sources/logstash.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc b3e438be44a9b6d3ad27dd1c5cf54f143e698ff1f01b178661869dffb67c7710 # shrinks to windows = [GenWindow { declared: 2, actual: 1, compressed: false }, GenWindow { declared: 2, actual: 2, compressed: false }], batch_sizes = [3] diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index a8b6c6b389e28..8e5b53bec5aa2 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -3,7 +3,6 @@ use std::{ convert::TryFrom, io::{self, Read}, net::SocketAddr, - num::NonZeroUsize, time::Duration, }; @@ -265,18 +264,10 @@ impl TcpSource for LogstashSource { } struct LogstashAcker { - // Batched reads can contain multiple writer windows. Preserve a separate - // ACK point for each completed window so Filebeat never sees an ACK that - // advances past the current window it is waiting on. If the batch ends in - // the middle of a window, ACK the last received event in that final ACK - // domain so clients are not forced to wait for the advertised window size. - // Lumberjack defines WindowSize as a maximum unacked count, so a sender can - // legitimately advertise a fresh window after a previously ACKed partial - // tail. Within a single ReadyFrames batch, the only incomplete ACK domain - // we can represent independently is the final tail we have actually seen. - // We expect most batches to need only one ACK point, either for a single - // completed window or for one partial tail. Multiple ACKs are only needed - // when ReadyFrames coalesces multiple logical windows into one batch. + // One cumulative ACK per writer window in the batch, in wire order. Each + // ACK is a window's highest sequence; since sequences reset per window it + // never exceeds the window the sender awaits. A window split across reads + // gets a partial-tail ACK and completes in a later batch. acknowledgements: SmallVec<[(LogstashProtocolVersion, u32); 1]>, } @@ -285,9 +276,11 @@ impl LogstashAcker { let acknowledgements = frames .iter() .enumerate() - // ACK each completed writer window and the last frame in a partial batch if ReadyFrames - // flushes before the current window is complete. - .filter(|(index, frame)| frame.window_end || index + 1 == frames.len()) + // last frame of each window: the next frame opens a new one, or this + // is the end of the batch. + .filter(|(index, frame)| { + index + 1 == frames.len() || frames[index + 1].window_id != frame.window_id + }) .map(|(_, frame)| (frame.protocol, frame.sequence_number)) .collect(); @@ -300,6 +293,7 @@ impl TcpSourceAcker for LogstashAcker { fn build_ack(self, ack: TcpSourceAck) -> Option { match ack { TcpSourceAck::Ack if !self.acknowledgements.is_empty() => { + debug!(message = "Sending Lumberjack ACK(s).", acks = ?self.acknowledgements); let mut bytes: Vec = Vec::with_capacity(self.acknowledgements.len() * 6); for (protocol_version, sequence_number) in self.acknowledgements { bytes.push(protocol_version.into()); @@ -321,55 +315,66 @@ enum LogstashDecoderReadState { PendingFrames(VecDeque<(LogstashEventFrame, usize)>), } +/// Tracks the current Lumberjack writer window (ack domain) so the acker can +/// emit one ACK per window even when a read coalesces several. +/// +/// The boundary is the *presence* of a `WindowSize` frame, not its value: +/// `WindowSize` is a maximum, so counting events down to it desyncs when a +/// sender under-fills a window. Delimiting on the frame itself can't desync. +#[derive(Debug, Clone, Copy)] +struct WindowState { + /// Monotonic id of the current ack domain. + current: u64, + /// Whether a `WindowSize` frame governs incoming data frames. + active: bool, +} + +impl WindowState { + const fn new() -> Self { + Self { + current: 0, + active: false, + } + } + + /// Opens a new ack domain in response to a `WindowSize` frame. + const fn open(&mut self) { + self.current += 1; + self.active = true; + } + + /// Id for the next data frame. Frames under a `WindowSize` share its id; + /// bare frames (no active window) each get a fresh id, so a non-windowed + /// sender's frames are ACKed individually. + const fn frame_id(&mut self) -> u64 { + if !self.active { + self.current += 1; + } + self.current + } +} + #[derive(Debug)] struct LogstashDecoder { state: LogstashDecoderReadState, - // Tracks how many events remain in the current writer window. This lets us - // preserve sender window boundaries even if ReadyFrames later batches - // multiple decoded windows together before ACKing. - window_events_remaining: Option, + window: WindowState, } impl LogstashDecoder { const fn new() -> Self { - Self::new_with_window_events_remaining(None) + Self::new_with_window(WindowState::new()) } - const fn new_with_window_events_remaining( - window_events_remaining: Option, - ) -> Self { + const fn new_with_window(window: WindowState) -> Self { Self { state: LogstashDecoderReadState::ReadProtocol, - window_events_remaining, + window, } } - /// Marks whether a decoded frame closes the current writer window. - /// - /// Filebeat expects ACKs to stay within the current window announced by the - /// most recent `WindowSize` frame. The generic TCP batching layer can merge - /// frames from multiple windows before we build an ACK, so we record the - /// per-frame window boundary here and let the acker emit one ACK frame per - /// completed window later. - /// - /// If a sender omits `WindowSize`, we keep the previous behavior and treat - /// each standalone frame as ACKable on its own. - const fn annotate_frame(&mut self, frame: &mut LogstashEventFrame) { - match self.window_events_remaining { - Some(remaining) if remaining.get() == 1 => { - frame.window_end = true; - self.window_events_remaining = None; - } - Some(remaining) => { - frame.window_end = false; - self.window_events_remaining = NonZeroUsize::new(remaining.get() - 1); // safe because we know remaining is greater than 1 - } - None => { - // Preserve existing behavior for inputs that send standalone data frames - // without an explicit WindowSize frame. - frame.window_end = true; - } - } + /// Stamps a decoded data frame with the id of the window it belongs to. + const fn stamp_frame(&mut self, frame: &mut LogstashEventFrame) { + frame.window_id = self.window.frame_id(); } } @@ -389,15 +394,12 @@ pub enum DecodeError { impl StreamDecodingError for DecodeError { fn can_continue(&self) -> bool { - use DecodeError::*; - - match self { - IO { .. } => false, - UnknownProtocolVersion { .. } => false, - UnknownFrameType { .. } => false, - JsonFrameFailedDecode { .. } => true, - DecompressionFailed { .. } => true, - } + // No decode error is recoverable on this stream. Lumberjack is a + // length-prefixed binary protocol with no resync marker, so once a + // frame fails to decode the stream position is no longer trustworthy: + // continuing would misframe subsequent bytes and emit ACKs for bogus + // sequence numbers. + false } } @@ -488,12 +490,13 @@ struct LogstashEventFrame { protocol: LogstashProtocolVersion, sequence_number: u32, fields: BTreeMap, - window_end: bool, + // Ack-domain (writer window) id, stamped by the decoder. See [`WindowState`]. + window_id: u64, } struct DecodedCompressedFrames { frames: VecDeque<(LogstashEventFrame, usize)>, - window_events_remaining: Option, + window: WindowState, } // Based on spec at: https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md @@ -537,24 +540,25 @@ impl Decoder for LogstashDecoder { use LogstashFrameType::*; - match LogstashFrameType::try_from(src.get_u8())? { - WindowSize => LogstashDecoderReadState::ReadFrame(protocol, WindowSize), - Data => LogstashDecoderReadState::ReadFrame(protocol, Data), - Json => LogstashDecoderReadState::ReadFrame(protocol, Json), - Compressed => LogstashDecoderReadState::ReadFrame(protocol, Compressed), - Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack), + let frame_type = src.get_u8(); + match LogstashFrameType::try_from(frame_type) { + Ok(WindowSize) => LogstashDecoderReadState::ReadFrame(protocol, WindowSize), + Ok(Data) => LogstashDecoderReadState::ReadFrame(protocol, Data), + Ok(Json) => LogstashDecoderReadState::ReadFrame(protocol, Json), + Ok(Compressed) => LogstashDecoderReadState::ReadFrame(protocol, Compressed), + Ok(Ack) => LogstashDecoderReadState::ReadFrame(protocol, Ack), + Err(error) => { + debug!( + message = "Unexpected frame type byte; stream may be misframed.", + frame_type_byte = frame_type, + buffer_head = ?&src[..src.len().min(16)], + ); + return Err(error); + } } } - // The window size indicates how many events the writer will send before waiting - // for acks. We preserve this boundary so the acker can emit one ACK per - // completed window, even if multiple windows are batched together later. - // Filebeat accepts cumulative ACKs, but not ACKs that advance past the - // current writer window it is waiting on. WindowSize is a maximum unacked - // count, not necessarily an exact count of immediately following frames, so a - // sender can legitimately advertise a new window after a previously ACKed - // partial tail. If a malformed sender does this before that earlier tail has - // actually been ACKed, we tolerate the reset here even though it can collapse - // the older incomplete domain into the new one. + // A `WindowSize` frame opens a new ack domain; its value is unused + // (we delimit by the frame, not by counting events down to its size). // // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#window-size-frame-type LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => { @@ -562,8 +566,13 @@ impl Decoder for LogstashDecoder { return Ok(None); } - let window_size = src.get_u32() as usize; - self.window_events_remaining = NonZeroUsize::new(window_size); + let window_size = src.get_u32(); + self.window.open(); + debug!( + message = "Opened Lumberjack writer window.", + window_id = self.window.current, + advertised_size = window_size, + ); LogstashDecoderReadState::ReadProtocol } @@ -584,7 +593,7 @@ impl Decoder for LogstashDecoder { let Some((mut frame, byte_size)) = decode_data_frame(protocol, src) else { return Ok(None); }; - self.annotate_frame(&mut frame); + self.stamp_frame(&mut frame); LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into()) } @@ -593,23 +602,19 @@ impl Decoder for LogstashDecoder { let Some((mut frame, byte_size)) = decode_json_frame(protocol, src)? else { return Ok(None); }; - self.annotate_frame(&mut frame); + self.stamp_frame(&mut frame); LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into()) } // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type // - // The compressed payload is still part of the same logical Lumberjack stream, so - // the nested decoder must inherit the current window state and return the updated - // state after expanding the payload. Re-annotating the emitted frames here would - // overwrite any WindowSize boundaries that were established inside the compressed - // payload and can also lose progress from a partially consumed outer window. + // Thread window state through the nested decoder so window ids stay + // monotonic across the compression boundary. LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => { - let Some(decoded) = decode_compressed_frame(src, self.window_events_remaining)? - else { + let Some(decoded) = decode_compressed_frame(src, self.window)? else { return Ok(None); }; - self.window_events_remaining = decoded.window_events_remaining; + self.window = decoded.window; LogstashDecoderReadState::PendingFrames(decoded.frames) } @@ -653,7 +658,7 @@ fn decode_data_frame( protocol, sequence_number, fields, - window_end: false, + window_id: 0, }, byte_size, )) @@ -712,7 +717,7 @@ fn decode_json_frame( protocol, sequence_number, fields, - window_end: false, + window_id: 0, }, byte_size, ))) @@ -720,7 +725,7 @@ fn decode_json_frame( fn decode_compressed_frame( src: &mut BytesMut, - window_events_remaining: Option, + window: WindowState, ) -> Result, DecodeError> { let mut rest = src.as_ref(); @@ -749,7 +754,7 @@ fn decode_compressed_frame( let mut buf = res?; - let mut decoder = LogstashDecoder::new_with_window_events_remaining(window_events_remaining); + let mut decoder = LogstashDecoder::new_with_window(window); let mut frames = VecDeque::new(); @@ -758,7 +763,7 @@ fn decode_compressed_frame( } Ok(Some(DecodedCompressedFrames { frames, - window_events_remaining: decoder.window_events_remaining, + window: decoder.window, })) } @@ -850,12 +855,7 @@ mod test { async fn test_protocol(status: EventStatus, sends_ack: bool) { let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (address, recv) = start_logstash(status).await; - spawn_collect_n( - send_req(address, &[("message", "Hello, world!")], sends_ack), - recv, - 1, - ) - .await + spawn_collect_n(send_req(address, "Hello, world!", sends_ack), recv, 1).await }) .await; @@ -873,22 +873,20 @@ mod test { assert!(log.get("timestamp").is_some()); } - fn push_req(req: &mut BytesMut, seq: u32, pairs: &[(&str, &str)]) { + fn push_req(req: &mut BytesMut, seq: u32, message: &str) { req.put_u8(b'2'); req.put_u8(b'D'); req.put_u32(seq); - req.put_u32(pairs.len() as u32); - for (key, value) in pairs { - req.put_u32(key.len() as u32); - req.put(key.as_bytes()); - req.put_u32(value.len() as u32); - req.put(value.as_bytes()); - } + req.put_u32(1u32); + req.put_u32("message".len() as u32); + req.put("message".as_bytes()); + req.put_u32(message.len() as u32); + req.put(message.as_bytes()); } - fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes { + fn encode_req(seq: u32, message: &str) -> Bytes { let mut req = BytesMut::new(); - push_req(&mut req, seq, pairs); + push_req(&mut req, seq, message); req.into() } @@ -1008,7 +1006,7 @@ mod test { #[test] fn v1_decoder_does_not_panic() { let seq = rng().random_range(1..u32::MAX); - let req = encode_req(seq, &[("message", "Hello, World!")]); + let req = encode_req(seq, "Hello, World!"); for i in 0..req.len() - 1 { assert!( decode_data_frame(LogstashProtocolVersion::V1, &mut BytesMut::from(&req[..i])) @@ -1017,14 +1015,84 @@ mod test { } } + // A malformed frame must be a fatal (non-continuable) decode error: the + // Lumberjack stream can't be resynced, so the connection is closed rather + // than continuing with a desynced decoder (which would emit bogus ACKs). + // This matches upstream logstash-input-beats, which closes the channel on + // any decode exception. + + #[test] + fn malformed_json_frame_is_a_fatal_decode_error() { + let mut decoder = LogstashDecoder::new(); + let mut src = BytesMut::new(); + src.put_u8(b'2'); + src.put_u8(b'J'); + src.put_u32(1); // sequence number + let bad = b"{ not valid json "; + src.put_u32(bad.len() as u32); // payload size + src.put(&bad[..]); + + let err = decoder.decode(&mut src).unwrap_err(); + assert!(matches!(err, DecodeError::JsonFrameFailedDecode { .. })); + assert!( + !err.can_continue(), + "a malformed JSON frame must be fatal so the connection closes", + ); + } + + #[test] + fn malformed_compressed_frame_is_a_fatal_decode_error() { + let mut decoder = LogstashDecoder::new(); + let mut src = BytesMut::new(); + src.put_u8(b'2'); + src.put_u8(b'C'); + let garbage = b"this is not a zlib stream"; + src.put_u32(garbage.len() as u32); // payload size + src.put(&garbage[..]); + + let err = decoder.decode(&mut src).unwrap_err(); + assert!(matches!(err, DecodeError::DecompressionFailed { .. })); + assert!(!err.can_continue()); + } + + #[tokio::test] + async fn malformed_frame_closes_connection_without_ack() { + let (address, _recv) = start_logstash(EventStatus::Delivered).await; + + let mut socket = tokio::net::TcpStream::connect(address).await.unwrap(); + + // A '2' 'J' frame whose payload is not valid JSON. + let mut req = BytesMut::new(); + req.put_u8(b'2'); + req.put_u8(b'J'); + req.put_u32(1); // sequence number + let bad = b"{ not valid json "; + req.put_u32(bad.len() as u32); // payload size + req.put(&bad[..]); + socket.write_all(&req).await.unwrap(); + + // The source must close the connection on the decode error and send no + // ACK; the client will reconnect and retransmit. + let mut output = BytesMut::new(); + let result = socket.read_buf(&mut output).await; + assert!( + matches!(result, Ok(0)) || result.is_err(), + "expected the connection to close; read returned {result:?} with {output:?}", + ); + assert!( + output.is_empty(), + "no ACK should be sent for a malformed frame, got {output:?}", + ); + } + #[tokio::test] async fn distinct_windows_do_not_share_an_ack_domain() { let mut req = BytesMut::new(); push_window_size(&mut req, 1); - push_req(&mut req, 1, &[("message", "first window")]); + push_req(&mut req, 1, "first window"); push_window_size(&mut req, 2); - push_req(&mut req, 1, &[("message", "second window first")]); - push_req(&mut req, 2, &[("message", "second window second")]); + push_req(&mut req, 1, "second window first"); + push_req(&mut req, 2, "second window second"); let decoded = decode_frames_and_assert_sequences(req, &[1, 1, 2]); assert_acknowledgements_for_ready_frames(decoded, &[1, 1, 2], &[1, 2]).await; @@ -1034,11 +1102,11 @@ mod test { async fn distinct_windows_with_monotonic_sequences_ack_the_first_window() { let mut req = BytesMut::new(); push_window_size(&mut req, 2); - push_req(&mut req, 1, &[("message", "first window first")]); - push_req(&mut req, 2, &[("message", "first window second")]); + push_req(&mut req, 1, "first window first"); + push_req(&mut req, 2, "first window second"); push_window_size(&mut req, 2); - push_req(&mut req, 3, &[("message", "second window first")]); - push_req(&mut req, 4, &[("message", "second window second")]); + push_req(&mut req, 3, "second window first"); + push_req(&mut req, 4, "second window second"); let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]); assert_acknowledgements_for_ready_frames(decoded, &[1, 2, 3, 4], &[2, 4]).await; @@ -1048,7 +1116,7 @@ mod test { async fn incomplete_final_window_is_acked_to_the_last_received_event() { let mut req = BytesMut::new(); push_window_size(&mut req, 4); - push_req(&mut req, 1, &[("message", "only event in partial window")]); + push_req(&mut req, 1, "only event in partial window"); let decoded = decode_frames_and_assert_sequences(req, &[1]); assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; @@ -1058,8 +1126,8 @@ mod test { async fn compressed_frames_preserve_inner_window_boundaries() { let mut inner = BytesMut::new(); push_window_size(&mut inner, 2); - push_req(&mut inner, 1, &[("message", "compressed first")]); - push_req(&mut inner, 2, &[("message", "compressed second")]); + push_req(&mut inner, 1, "compressed first"); + push_req(&mut inner, 2, "compressed second"); let mut req = BytesMut::new(); push_compressed(&mut req, &inner); @@ -1072,10 +1140,10 @@ mod test { async fn single_window_split_across_ready_frames_keeps_progressive_acks() { let mut req = BytesMut::new(); push_window_size(&mut req, 4); - push_req(&mut req, 1, &[("message", "first")]); - push_req(&mut req, 2, &[("message", "second")]); - push_req(&mut req, 3, &[("message", "third")]); - push_req(&mut req, 4, &[("message", "fourth")]); + push_req(&mut req, 1, "first"); + push_req(&mut req, 2, "second"); + push_req(&mut req, 3, "third"); + push_req(&mut req, 4, "fourth"); let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]); @@ -1100,28 +1168,217 @@ mod test { let mut first_batch = BytesMut::new(); push_window_size(&mut first_batch, 2); - push_req(&mut first_batch, 1, &[("message", "first partial tail")]); + push_req(&mut first_batch, 1, "first partial tail"); let decoded = decode_frames_with_decoder_and_assert_sequences(&mut decoder, first_batch, &[1]); assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; let mut second_batch = BytesMut::new(); push_window_size(&mut second_batch, 1); - push_req( - &mut second_batch, - 1, - &[("message", "fresh window after ack")], - ); + push_req(&mut second_batch, 1, "fresh window after ack"); let decoded = decode_frames_with_decoder_and_assert_sequences(&mut decoder, second_batch, &[1]); assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; } - async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) { + // Regression tests for the `seq=N, expected=M` error. A sender may legally + // under-fill a window (`WindowSize` is a maximum); when such a window is + // coalesced with a later one in a single read, the later window's sequence + // must not leak onto it. + + #[tokio::test] + async fn under_filled_window_must_not_leak_later_window_sequence() { + // Window 1 advertises up to 2 unacked events but only sends 1 (legal + // early flush), then window 2 advertises 10 and fills it. Filebeat is + // waiting on window 1 with `expected = 2`. The ACK stream must stay + // within each window: ACK 1 for window 1 (one event delivered), then + // ACK 10 for window 2. It must never emit seq=10 for window 1. + let mut req = BytesMut::new(); + push_window_size(&mut req, 2); + push_req(&mut req, 1, "under-filled window, only event"); + push_window_size(&mut req, 10); + for seq in 1..=10 { + push_req(&mut req, seq, "second window"); + } + + // Decoded sequence numbers: window 1 -> [1], window 2 -> [1..=10]. + let expected_sequences: Vec = std::iter::once(1).chain(1..=10).collect(); + let decoded = decode_frames_and_assert_sequences(req, &expected_sequences); + + // Window 1 is ACKed at its own (partial) sequence 1, then window 2 at + // 10. The size-2 window never receives seq=10, so Filebeat does not log + // "invalid sequence number received (seq=10, expected=2)". + assert_acknowledgements_for_ready_frames(decoded, &expected_sequences, &[1, 10]).await; + } + + #[tokio::test] + async fn under_filled_window_with_monotonic_sequences_must_not_leak() { + // Same defect, but with monotonic sequence numbers across windows to + // show it is not specific to per-window sequence resets. Window 1 + // (advertised 2) delivers one event (seq 1); window 2 (advertised 10) + // delivers seq 2..=11. Correct ACKs: [1, 11]. + let mut req = BytesMut::new(); + push_window_size(&mut req, 2); + push_req(&mut req, 1, "under-filled window, only event"); + push_window_size(&mut req, 10); + for seq in 2..=11 { + push_req(&mut req, seq, "second window"); + } + + let expected_sequences: Vec = (1..=11).collect(); + let decoded = decode_frames_and_assert_sequences(req, &expected_sequences); + + assert_acknowledgements_for_ready_frames(decoded, &expected_sequences, &[1, 11]).await; + } + + #[tokio::test] + async fn ack_never_advances_past_the_window_filebeat_is_waiting_on() { + // States the invariant directly in the terms of the Filebeat error: + // the ACK that lands on the first (oldest) window must not exceed that + // window's advertised size. + let first_window_size = 2u32; + + let mut req = BytesMut::new(); + push_window_size(&mut req, first_window_size); + push_req(&mut req, 1, "under-filled window, only event"); + push_window_size(&mut req, 10); + for seq in 1..=10 { + push_req(&mut req, seq, "second window"); + } + + let decoded = decode_frames(req); + + let stream = stream::iter(decoded.into_iter().map(Ok::<_, DecodeError>)); + let mut ready = ReadyFrames::with_capacity(stream, 16); + let (frames, _) = ready.next().await.unwrap().unwrap(); + + let ack = LogstashAcker::new(&frames) + .build_ack(TcpSourceAck::Ack) + .unwrap(); + let acknowledgements = decode_acknowledgements(ack); + + let first_ack = *acknowledgements + .first() + .expect("the oldest window must receive an ACK"); + assert!( + first_ack <= first_window_size, + "first ACK seq={first_ack} exceeds the first window's size {first_window_size}; \ + Filebeat reports this as `invalid sequence number received (seq={first_ack}, \ + expected={first_window_size})`", + ); + } + + // Property test: an ACK must never advance past the window the sender is + // waiting on (go-lumber rejects ackSeq > count). Generate writer windows, + // decode them, regroup the frames into arbitrary batches (modeling + // ReadyFrames coalescing), and replay the emitted ACKs FIFO against the + // window sizes. A violation is the customer's `invalid sequence number`. + + use proptest::prelude::*; + + /// A writer window: advertises `WindowSize = declared` but sends `actual` + /// frames (`actual <= declared`). `actual < declared` is a legal under-fill, + /// after which the sender expects ACKs no greater than `actual`. + #[derive(Debug, Clone)] + struct GenWindow { + declared: u32, + actual: u32, + compressed: bool, + } + + fn encode_windows(windows: &[GenWindow]) -> BytesMut { + let mut buf = BytesMut::new(); + for w in windows { + push_window_size(&mut buf, w.declared); + if w.compressed { + let mut inner = BytesMut::new(); + for seq in 1..=w.actual { + push_req(&mut inner, seq, "x"); + } + push_compressed(&mut buf, &inner); + } else { + for seq in 1..=w.actual { + push_req(&mut buf, seq, "x"); + } + } + } + buf + } + + /// Replays `acks` FIFO against the window sizes; returns the first ACK that + /// advances past its window, if any. + fn first_out_of_window_ack(window_sizes: &[u32], acks: &[u32]) -> Option<(u32, u32)> { + let mut head = 0usize; + for &ack in acks { + // 0 once we're past the last window: any ACK there is a violation. + let size = window_sizes.get(head).copied().unwrap_or(0); + if ack > size { + return Some((ack, size)); + } + if ack == size { + head += 1; + } + } + None + } + + proptest::proptest! { + #![proptest_config(proptest::prelude::ProptestConfig::with_cases(2048))] + + #[test] + fn acks_never_advance_past_a_window( + windows in proptest::collection::vec( + // declared >= actual >= 1; `extra` makes the window under-filled. + (1u32..=20, 0u32..=8, proptest::prelude::any::()) + .prop_map(|(actual, extra, compressed)| GenWindow { + declared: actual + extra, + actual, + compressed, + }), + 1..20, + ), + batch_sizes in proptest::collection::vec(1usize..=8, 1..40), + ) { + // each window's sender expects ACKs <= the frames it actually sent + let window_sizes: Vec = windows.iter().map(|w| w.actual).collect(); + + let decoded: Vec = decode_frames(encode_windows(&windows)) + .into_iter() + .map(|(frame, _)| frame) + .collect(); + + // regroup into arbitrary contiguous batches; one ACK per batch, in order + let mut acks = Vec::new(); + let mut offset = 0usize; + let mut sizes = batch_sizes.iter().copied().cycle(); + while offset < decoded.len() { + let take = sizes.next().unwrap().min(decoded.len() - offset); + let batch = &decoded[offset..offset + take]; + offset += take; + if let Some(ack_bytes) = + LogstashAcker::new(batch).build_ack(TcpSourceAck::Ack) + { + acks.extend(decode_acknowledgements(ack_bytes)); + } + } + + if let Some((ack, size)) = first_out_of_window_ack(&window_sizes, &acks) { + proptest::prop_assert!( + false, + "ACK seq {} advanced past window of size {} \ + (== `invalid sequence number received (seq={}, expected={})`)\n\ + windows={:?}\nbatch_sizes={:?}\nacks={:?}", + ack, size, ack, size, window_sizes, batch_sizes, acks, + ); + } + } + } + + async fn send_req(address: SocketAddr, message: &str, sends_ack: bool) { let seq = rng().random_range(1..u32::MAX); let mut socket = tokio::net::TcpStream::connect(address).await.unwrap(); - let req = encode_req(seq, pairs); + let req = encode_req(seq, message); socket.write_all(&req).await.unwrap(); let mut output = BytesMut::new();