Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/23346_logstash_decode_error_fatal.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed the `logstash` source to close the connection on a malformed frame instead of attempting to continue. A failed JSON decode or decompression previously left the decoder desynchronized but still running, which could busy-loop and emit ACKs for bogus sequence numbers (surfacing as `invalid sequence number received` on the client). The source now treats any decode error as fatal and closes the connection — matching the upstream `logstash-input-beats` server — so the client reconnects and retransmits the unacknowledged window.

authors: graphcareful
85 changes: 76 additions & 9 deletions src/sources/logstash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,12 @@ pub enum DecodeError {

impl StreamDecodingError for DecodeError {
fn can_continue(&self) -> bool {
use DecodeError::*;

match self {
IO { .. } => false,
UnknownProtocolVersion { .. } => false,
UnknownFrameType { .. } => false,
JsonFrameFailedDecode { .. } => true,
DecompressionFailed { .. } => true,
}
// No decode error is recoverable on this stream. Lumberjack is a
// length-prefixed binary protocol with no resync marker, so once a
// frame fails to decode the stream position is no longer trustworthy:
// continuing would misframe subsequent bytes and emit ACKs for bogus
// sequence numbers.
false
}
}

Expand Down Expand Up @@ -1017,6 +1014,76 @@ mod test {
}
}

// A malformed frame must be a fatal (non-continuable) decode error: the
// Lumberjack stream can't be resynced, so the connection is closed rather
// than continuing with a desynced decoder (which would emit bogus ACKs).
// This matches upstream logstash-input-beats, which closes the channel on
// any decode exception.

#[test]
fn malformed_json_frame_is_a_fatal_decode_error() {
let mut decoder = LogstashDecoder::new();
let mut src = BytesMut::new();
src.put_u8(b'2');
src.put_u8(b'J');
src.put_u32(1); // sequence number
let bad = b"{ not valid json ";
src.put_u32(bad.len() as u32); // payload size
src.put(&bad[..]);

let err = decoder.decode(&mut src).unwrap_err();
assert!(matches!(err, DecodeError::JsonFrameFailedDecode { .. }));
assert!(
!err.can_continue(),
"a malformed JSON frame must be fatal so the connection closes",
);
}

#[test]
fn malformed_compressed_frame_is_a_fatal_decode_error() {
let mut decoder = LogstashDecoder::new();
let mut src = BytesMut::new();
src.put_u8(b'2');
src.put_u8(b'C');
let garbage = b"this is not a zlib stream";
src.put_u32(garbage.len() as u32); // payload size
src.put(&garbage[..]);

let err = decoder.decode(&mut src).unwrap_err();
assert!(matches!(err, DecodeError::DecompressionFailed { .. }));
assert!(!err.can_continue());
}

#[tokio::test]
async fn malformed_frame_closes_connection_without_ack() {
let (address, _recv) = start_logstash(EventStatus::Delivered).await;

let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();

// A '2' 'J' frame whose payload is not valid JSON.
let mut req = BytesMut::new();
req.put_u8(b'2');
req.put_u8(b'J');
req.put_u32(1); // sequence number
let bad = b"{ not valid json ";
req.put_u32(bad.len() as u32); // payload size
req.put(&bad[..]);
socket.write_all(&req).await.unwrap();

// The source must close the connection on the decode error and send no
// ACK; the client will reconnect and retransmit.
let mut output = BytesMut::new();
let result = socket.read_buf(&mut output).await;
assert!(
matches!(result, Ok(0)) || result.is_err(),
"expected the connection to close; read returned {result:?} with {output:?}",
);
assert!(
output.is_empty(),
"no ACK should be sent for a malformed frame, got {output:?}",
);
}

#[tokio::test]
async fn distinct_windows_do_not_share_an_ack_domain() {
let mut req = BytesMut::new();
Expand Down
Loading