diff --git a/changelog.d/23346_logstash_decode_error_fatal.fix.md b/changelog.d/23346_logstash_decode_error_fatal.fix.md new file mode 100644 index 0000000000000..125991620dc4f --- /dev/null +++ b/changelog.d/23346_logstash_decode_error_fatal.fix.md @@ -0,0 +1,3 @@ +Fixed the `logstash` source to close the connection on a malformed frame instead of attempting to continue. A failed JSON decode or decompression previously left the decoder desynchronized but still running, which could busy-loop and emit ACKs for bogus sequence numbers (surfacing as `invalid sequence number received` on the client). The source now treats any decode error as fatal and closes the connection — matching the upstream `logstash-input-beats` server — so the client reconnects and retransmits the unacknowledged window. + +authors: graphcareful diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index a8b6c6b389e28..5d562444b8a97 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -389,15 +389,12 @@ pub enum DecodeError { impl StreamDecodingError for DecodeError { fn can_continue(&self) -> bool { - use DecodeError::*; - - match self { - IO { .. } => false, - UnknownProtocolVersion { .. } => false, - UnknownFrameType { .. } => false, - JsonFrameFailedDecode { .. } => true, - DecompressionFailed { .. } => true, - } + // No decode error is recoverable on this stream. Lumberjack is a + // length-prefixed binary protocol with no resync marker, so once a + // frame fails to decode the stream position is no longer trustworthy: + // continuing would misframe subsequent bytes and emit ACKs for bogus + // sequence numbers. + false } } @@ -1017,6 +1014,76 @@ mod test { } } + // A malformed frame must be a fatal (non-continuable) decode error: the + // Lumberjack stream can't be resynced, so the connection is closed rather + // than continuing with a desynced decoder (which would emit bogus ACKs). + // This matches upstream logstash-input-beats, which closes the channel on + // any decode exception. + + #[test] + fn malformed_json_frame_is_a_fatal_decode_error() { + let mut decoder = LogstashDecoder::new(); + let mut src = BytesMut::new(); + src.put_u8(b'2'); + src.put_u8(b'J'); + src.put_u32(1); // sequence number + let bad = b"{ not valid json "; + src.put_u32(bad.len() as u32); // payload size + src.put(&bad[..]); + + let err = decoder.decode(&mut src).unwrap_err(); + assert!(matches!(err, DecodeError::JsonFrameFailedDecode { .. })); + assert!( + !err.can_continue(), + "a malformed JSON frame must be fatal so the connection closes", + ); + } + + #[test] + fn malformed_compressed_frame_is_a_fatal_decode_error() { + let mut decoder = LogstashDecoder::new(); + let mut src = BytesMut::new(); + src.put_u8(b'2'); + src.put_u8(b'C'); + let garbage = b"this is not a zlib stream"; + src.put_u32(garbage.len() as u32); // payload size + src.put(&garbage[..]); + + let err = decoder.decode(&mut src).unwrap_err(); + assert!(matches!(err, DecodeError::DecompressionFailed { .. })); + assert!(!err.can_continue()); + } + + #[tokio::test] + async fn malformed_frame_closes_connection_without_ack() { + let (address, _recv) = start_logstash(EventStatus::Delivered).await; + + let mut socket = tokio::net::TcpStream::connect(address).await.unwrap(); + + // A '2' 'J' frame whose payload is not valid JSON. + let mut req = BytesMut::new(); + req.put_u8(b'2'); + req.put_u8(b'J'); + req.put_u32(1); // sequence number + let bad = b"{ not valid json "; + req.put_u32(bad.len() as u32); // payload size + req.put(&bad[..]); + socket.write_all(&req).await.unwrap(); + + // The source must close the connection on the decode error and send no + // ACK; the client will reconnect and retransmit. + let mut output = BytesMut::new(); + let result = socket.read_buf(&mut output).await; + assert!( + matches!(result, Ok(0)) || result.is_err(), + "expected the connection to close; read returned {result:?} with {output:?}", + ); + assert!( + output.is_empty(), + "no ACK should be sent for a malformed frame, got {output:?}", + ); + } + #[tokio::test] async fn distinct_windows_do_not_share_an_ack_domain() { let mut req = BytesMut::new();