diff --git a/changelog.d/50215_logstash_partial_window_ack.fix.md b/changelog.d/50215_logstash_partial_window_ack.fix.md new file mode 100644 index 0000000000000..848ffd098a3e5 --- /dev/null +++ b/changelog.d/50215_logstash_partial_window_ack.fix.md @@ -0,0 +1,3 @@ +Fixed the `logstash` source to preserve ACK domains when a fresh Lumberjack `WindowSize` arrives before a previous partial writer window has been acknowledged. This prevents ACKs for later windows from advancing past the current Filebeat window and triggering `invalid sequence number received` retransmits. + +authors: emilychendd diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index a8b6c6b389e28..dcb49bd88ea3f 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -264,6 +264,8 @@ impl TcpSource for LogstashSource { } } +type LogstashAcknowledgement = (LogstashProtocolVersion, u32); + struct LogstashAcker { // Batched reads can contain multiple writer windows. Preserve a separate // ACK point for each completed window so Filebeat never sees an ACK that @@ -277,19 +279,32 @@ struct LogstashAcker { // We expect most batches to need only one ACK point, either for a single // completed window or for one partial tail. Multiple ACKs are only needed // when ReadyFrames coalesces multiple logical windows into one batch. - acknowledgements: SmallVec<[(LogstashProtocolVersion, u32); 1]>, + acknowledgements: SmallVec<[LogstashAcknowledgement; 1]>, } impl LogstashAcker { fn new(frames: &[LogstashEventFrame]) -> Self { - let acknowledgements = frames - .iter() - .enumerate() + let mut acknowledgements = SmallVec::new(); + let mut last_acknowledgement = None; + + for (index, frame) in frames.iter().enumerate() { + // A carried ACK boundary belongs to the event immediately before a fresh + // WindowSize. Emit it only when that predecessor is in this ReadyFrames batch; + // otherwise the prior batch tail was already ACKable. + if let Some(acknowledgement) = frame.preceding_acknowledgement + && last_acknowledgement == Some(acknowledgement) + { + acknowledgements.push(acknowledgement); + } + // ACK each completed writer window and the last frame in a partial batch if ReadyFrames // flushes before the current window is complete. - .filter(|(index, frame)| frame.window_end || index + 1 == frames.len()) - .map(|(_, frame)| (frame.protocol, frame.sequence_number)) - .collect(); + if frame.window_end || index + 1 == frames.len() { + acknowledgements.push(frame.acknowledgement()); + } + + last_acknowledgement = Some(frame.acknowledgement()); + } Self { acknowledgements } } @@ -328,6 +343,8 @@ struct LogstashDecoder { // preserve sender window boundaries even if ReadyFrames later batches // multiple decoded windows together before ACKing. window_events_remaining: Option, + last_pending_window_acknowledgement: Option, + pending_acknowledgement: Option, } impl LogstashDecoder { @@ -337,10 +354,20 @@ impl LogstashDecoder { const fn new_with_window_events_remaining( window_events_remaining: Option, + ) -> Self { + Self::new_with_state(window_events_remaining, None, None) + } + + const fn new_with_state( + window_events_remaining: Option, + last_pending_window_acknowledgement: Option, + pending_acknowledgement: Option, ) -> Self { Self { state: LogstashDecoderReadState::ReadProtocol, window_events_remaining, + last_pending_window_acknowledgement, + pending_acknowledgement, } } @@ -355,19 +382,24 @@ impl LogstashDecoder { /// If a sender omits `WindowSize`, we keep the previous behavior and treat /// each standalone frame as ACKable on its own. const fn annotate_frame(&mut self, frame: &mut LogstashEventFrame) { + frame.preceding_acknowledgement = self.pending_acknowledgement.take(); + match self.window_events_remaining { Some(remaining) if remaining.get() == 1 => { frame.window_end = true; self.window_events_remaining = None; + self.last_pending_window_acknowledgement = None; } Some(remaining) => { frame.window_end = false; self.window_events_remaining = NonZeroUsize::new(remaining.get() - 1); // safe because we know remaining is greater than 1 + self.last_pending_window_acknowledgement = Some(frame.acknowledgement()); } None => { // Preserve existing behavior for inputs that send standalone data frames // without an explicit WindowSize frame. frame.window_end = true; + self.last_pending_window_acknowledgement = None; } } } @@ -407,7 +439,7 @@ impl From for DecodeError { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] enum LogstashProtocolVersion { V1, // 1 V2, // 2 @@ -489,11 +521,20 @@ struct LogstashEventFrame { sequence_number: u32, fields: BTreeMap, window_end: bool, + preceding_acknowledgement: Option, +} + +impl LogstashEventFrame { + const fn acknowledgement(&self) -> LogstashAcknowledgement { + (self.protocol, self.sequence_number) + } } struct DecodedCompressedFrames { frames: VecDeque<(LogstashEventFrame, usize)>, window_events_remaining: Option, + last_pending_window_acknowledgement: Option, + pending_acknowledgement: Option, } // Based on spec at: https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md @@ -552,9 +593,9 @@ impl Decoder for LogstashDecoder { // current writer window it is waiting on. WindowSize is a maximum unacked // count, not necessarily an exact count of immediately following frames, so a // sender can legitimately advertise a new window after a previously ACKed - // partial tail. If a malformed sender does this before that earlier tail has - // actually been ACKed, we tolerate the reset here even though it can collapse - // the older incomplete domain into the new one. + // partial tail. If the previous partial tail is still in the same ReadyFrames + // batch, carry its ACK boundary forward so the acker can emit it before the + // fresh window's ACK. // // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#window-size-frame-type LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => { @@ -562,6 +603,13 @@ impl Decoder for LogstashDecoder { return Ok(None); } + if self.window_events_remaining.is_some() + && let Some(acknowledgement) = + self.last_pending_window_acknowledgement.take() + { + self.pending_acknowledgement = Some(acknowledgement); + } + let window_size = src.get_u32() as usize; self.window_events_remaining = NonZeroUsize::new(window_size); @@ -605,11 +653,19 @@ impl Decoder for LogstashDecoder { // overwrite any WindowSize boundaries that were established inside the compressed // payload and can also lose progress from a partially consumed outer window. LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => { - let Some(decoded) = decode_compressed_frame(src, self.window_events_remaining)? + let Some(decoded) = decode_compressed_frame( + src, + self.window_events_remaining, + self.last_pending_window_acknowledgement, + self.pending_acknowledgement, + )? else { return Ok(None); }; self.window_events_remaining = decoded.window_events_remaining; + self.last_pending_window_acknowledgement = + decoded.last_pending_window_acknowledgement; + self.pending_acknowledgement = decoded.pending_acknowledgement; LogstashDecoderReadState::PendingFrames(decoded.frames) } @@ -654,6 +710,7 @@ fn decode_data_frame( sequence_number, fields, window_end: false, + preceding_acknowledgement: None, }, byte_size, )) @@ -713,6 +770,7 @@ fn decode_json_frame( sequence_number, fields, window_end: false, + preceding_acknowledgement: None, }, byte_size, ))) @@ -721,6 +779,8 @@ fn decode_json_frame( fn decode_compressed_frame( src: &mut BytesMut, window_events_remaining: Option, + last_pending_window_acknowledgement: Option, + pending_acknowledgement: Option, ) -> Result, DecodeError> { let mut rest = src.as_ref(); @@ -749,7 +809,11 @@ fn decode_compressed_frame( let mut buf = res?; - let mut decoder = LogstashDecoder::new_with_window_events_remaining(window_events_remaining); + let mut decoder = LogstashDecoder::new_with_state( + window_events_remaining, + last_pending_window_acknowledgement, + pending_acknowledgement, + ); let mut frames = VecDeque::new(); @@ -759,6 +823,8 @@ fn decode_compressed_frame( Ok(Some(DecodedCompressedFrames { frames, window_events_remaining: decoder.window_events_remaining, + last_pending_window_acknowledgement: decoder.last_pending_window_acknowledgement, + pending_acknowledgement: decoder.pending_acknowledgement, })) } @@ -1044,6 +1110,78 @@ mod test { assert_acknowledgements_for_ready_frames(decoded, &[1, 2, 3, 4], &[2, 4]).await; } + #[tokio::test] + async fn fresh_window_before_acked_partial_tail_preserves_ack_domain() { + let mut req = BytesMut::new(); + push_window_size(&mut req, 4); + push_req(&mut req, 1, &[("message", "first partial window first")]); + push_req(&mut req, 2, &[("message", "first partial window second")]); + push_window_size(&mut req, 4); + push_req(&mut req, 1, &[("message", "second window first")]); + push_req(&mut req, 2, &[("message", "second window second")]); + push_req(&mut req, 3, &[("message", "second window third")]); + push_req(&mut req, 4, &[("message", "second window fourth")]); + + let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 1, 2, 3, 4]); + assert_acknowledgements_for_ready_frames(decoded, &[1, 2, 1, 2, 3, 4], &[2, 4]).await; + } + + #[tokio::test] + async fn compressed_fresh_window_before_acked_partial_tail_preserves_ack_domain() { + let mut first_partial_window = BytesMut::new(); + push_req( + &mut first_partial_window, + 1, + &[("message", "first partial window first")], + ); + push_req( + &mut first_partial_window, + 2, + &[("message", "first partial window second")], + ); + + let mut second_window = BytesMut::new(); + push_req(&mut second_window, 1, &[("message", "second window first")]); + push_req( + &mut second_window, + 2, + &[("message", "second window second")], + ); + push_req(&mut second_window, 3, &[("message", "second window third")]); + push_req( + &mut second_window, + 4, + &[("message", "second window fourth")], + ); + + let mut req = BytesMut::new(); + push_window_size(&mut req, 4); + push_compressed(&mut req, &first_partial_window); + push_window_size(&mut req, 4); + push_compressed(&mut req, &second_window); + + let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 1, 2, 3, 4]); + assert_acknowledgements_for_ready_frames(decoded, &[1, 2, 1, 2, 3, 4], &[2, 4]).await; + } + + #[tokio::test] + async fn repeated_fresh_windows_before_acked_partial_tails_preserve_ack_domains() { + let mut req = BytesMut::new(); + push_window_size(&mut req, 4); + push_req(&mut req, 1, &[("message", "first partial window first")]); + push_req(&mut req, 2, &[("message", "first partial window second")]); + push_window_size(&mut req, 4); + push_req(&mut req, 1, &[("message", "second partial window")]); + push_window_size(&mut req, 4); + push_req(&mut req, 1, &[("message", "third window first")]); + push_req(&mut req, 2, &[("message", "third window second")]); + push_req(&mut req, 3, &[("message", "third window third")]); + push_req(&mut req, 4, &[("message", "third window fourth")]); + + let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 1, 1, 2, 3, 4]); + assert_acknowledgements_for_ready_frames(decoded, &[1, 2, 1, 1, 2, 3, 4], &[2, 1, 4]).await; + } + #[tokio::test] async fn incomplete_final_window_is_acked_to_the_last_received_event() { let mut req = BytesMut::new();