From 1307508fa1b3080dfcbc51f52d5b4faa250e6d05 Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Ruiz Date: Tue, 30 Dec 2025 23:07:57 +0100 Subject: [PATCH] fix(test): handle NDJSON chunking in log_stream tests The log endpoint streams NDJSON (newline-delimited JSON) generated by `tracing-subscriber`. However, the log stream tests assume each HTTP body chunk contains a complete JSON object, which isn't necessarily the case. HTTP streaming doesn't guarantee chunk boundaries align with JSON boundaries because TCP can split data anywhere. This causes occasional test failures when chunks split mid-JSON-object in my local machine. Fix by buffering incoming chunks and extracting complete lines (by newline delimiter) before parsing as JSON. We could arguably set the content type header for this endpoint to `application/x-ndjson` to be fully compliant with the NDJSON specs, but that that may be risking a breaking change for little value. Signed-off-by: Alejandro Martinez Ruiz --- .../src/tests/telemetry/log_stream.rs | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/linkerd/app/integration/src/tests/telemetry/log_stream.rs b/linkerd/app/integration/src/tests/telemetry/log_stream.rs index f0d6e88ee6..94ba7404f9 100644 --- a/linkerd/app/integration/src/tests/telemetry/log_stream.rs +++ b/linkerd/app/integration/src/tests/telemetry/log_stream.rs @@ -213,6 +213,7 @@ where let mut result = Vec::new(); let logs = &mut result; let fut = async move { + let mut buffer = Vec::new(); while let Some(res) = body.frame().await { let chunk = match res { Ok(frame) => { @@ -227,13 +228,37 @@ where break; } }; - let deserialized = serde_json::from_slice(&chunk[..]); + + buffer.extend_from_slice(&chunk[..]); + + // Process complete lines since the format is newline-delimited JSON (NDJSON) + while let Some(newline_pos) = buffer.iter().position(|&b| b == b'\n') { + let line = buffer.drain(..=newline_pos).collect::>(); + // Skip empty lines + if line.iter().all(|&b| b.is_ascii_whitespace()) { + continue; + } + let deserialized = serde_json::from_slice(&line[..]); + tracing::info!(?deserialized); + match deserialized { + Ok(json) => logs.push(json), + Err(error) => panic!( + "parsing logs as JSON failed\n error: {error}\n line: {:?}", + String::from_utf8_lossy(&line[..]) + ), + } + } + } + + // Handle remaining data in buffer + if !buffer.is_empty() && !buffer.iter().all(|&b| b.is_ascii_whitespace()) { + let deserialized = serde_json::from_slice(&buffer[..]); tracing::info!(?deserialized); match deserialized { Ok(json) => logs.push(json), Err(error) => panic!( - "parsing logs as JSON failed\n error: {error}\n chunk: {:?}", - String::from_utf8_lossy(&chunk[..]) + "parsing logs as JSON failed (incomplete final line)\nerror: {error}\nbuffer: {:?}", + String::from_utf8_lossy(&buffer[..]) ), } }