diff --git a/src/core/stream.rs b/src/core/stream.rs index 02a6cfc79..bdf0bfa9a 100644 --- a/src/core/stream.rs +++ b/src/core/stream.rs @@ -298,10 +298,14 @@ pub fn run_streaming( Some(std::thread::spawn(move || { let mut writer = BufWriter::new(child_stdin); let stdin_handle = io::stdin(); - for line in BufReader::new(stdin_handle.lock()) - .lines() - .map_while(Result::ok) - { + let mut reader = BufReader::new(stdin_handle.lock()); + let mut buf = Vec::new(); + while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) { + let line = { + let s = String::from_utf8_lossy(&buf); + s.trim_end_matches('\n').trim_end_matches('\r').to_owned() + }; + buf.clear(); if let Some(out) = filter.feed_line(&line) { if writeln!(writer, "{}", out).is_err() { break; @@ -340,7 +344,14 @@ pub fn run_streaming( let (tx, rx) = mpsc::channel(); let tx_out = tx.clone(); let stdout_thread = std::thread::spawn(move || { - for line in BufReader::new(stdout).lines().map_while(Result::ok) { + let mut reader = BufReader::new(stdout); + let mut buf = Vec::new(); + while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) { + let line = { + let s = String::from_utf8_lossy(&buf); + s.trim_end_matches('\n').trim_end_matches('\r').to_owned() + }; + buf.clear(); if tx_out.send(StreamLine::Stdout(line)).is_err() { break; } @@ -348,7 +359,14 @@ pub fn run_streaming( }); let tx_err = tx; let stderr_thread = std::thread::spawn(move || { - for line in BufReader::new(stderr).lines().map_while(Result::ok) { + let mut reader = BufReader::new(stderr); + let mut buf = Vec::new(); + while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) { + let line = { + let s = String::from_utf8_lossy(&buf); + s.trim_end_matches('\n').trim_end_matches('\r').to_owned() + }; + buf.clear(); if tx_err.send(StreamLine::Stderr(line)).is_err() { break; } @@ -417,7 +435,14 @@ pub fn run_streaming( let stderr_thread = std::thread::spawn(move || -> String { let mut raw_err = String::new(); let mut capped = false; - for line in BufReader::new(stderr).lines().map_while(Result::ok) { + let mut reader = BufReader::new(stderr); + let mut buf = Vec::new(); + while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) { + let line = { + let s = String::from_utf8_lossy(&buf); + s.trim_end_matches('\n').trim_end_matches('\r').to_owned() + }; + buf.clear(); if raw_err.len() + line.len() < RAW_CAP { raw_err.push_str(&line); raw_err.push('\n'); @@ -436,7 +461,14 @@ pub fn run_streaming( FilterMode::Passthrough => unreachable!("handled by early-return above"), FilterMode::Streaming(_) => unreachable!("handled by is_streaming branch"), FilterMode::Buffered(filter_fn) => { - for line in BufReader::new(stdout).lines().map_while(Result::ok) { + let mut reader = BufReader::new(stdout); + let mut buf = Vec::new(); + while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) { + let line = { + let s = String::from_utf8_lossy(&buf); + s.trim_end_matches('\n').trim_end_matches('\r').to_owned() + }; + buf.clear(); if raw_stdout.len() + line.len() < RAW_CAP { raw_stdout.push_str(&line); raw_stdout.push('\n'); @@ -461,7 +493,14 @@ pub fn run_streaming( } } FilterMode::CaptureOnly => { - for line in BufReader::new(stdout).lines().map_while(Result::ok) { + let mut reader = BufReader::new(stdout); + let mut buf = Vec::new(); + while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) { + let line = { + let s = String::from_utf8_lossy(&buf); + s.trim_end_matches('\n').trim_end_matches('\r').to_owned() + }; + buf.clear(); if raw_stdout.len() + line.len() < RAW_CAP { raw_stdout.push_str(&line); raw_stdout.push('\n'); @@ -1105,6 +1144,32 @@ pub(crate) mod tests { assert!(result.contains("exit=42"), "got: {}", result); } + #[test] + fn test_utf8_lossy_read_until_preserves_all_lines() { + use std::io::Cursor; + // Fixture: valid line, then a non-UTF-8 byte, then valid trailing line. + // The old lines().map_while(Result::ok) pattern would drop "after bad" silently. + let data: Vec = b"valid line\n\xFF\xFE bad\nafter bad\n".to_vec(); + let mut reader = BufReader::new(Cursor::new(data)); + let mut lines_out: Vec = Vec::new(); + let mut buf = Vec::new(); + while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) { + let line = { + let s = String::from_utf8_lossy(&buf); + s.trim_end_matches('\n').trim_end_matches('\r').to_owned() + }; + buf.clear(); + lines_out.push(line); + } + assert_eq!(lines_out.len(), 3, "all 3 lines must be returned, got: {:?}", lines_out); + assert_eq!(lines_out[0], "valid line"); + assert!( + lines_out[1].contains('\u{FFFD}'), + "line 2 must contain replacement char, got: {:?}", lines_out[1] + ); + assert_eq!(lines_out[2], "after bad", "line after invalid UTF-8 must be preserved"); + } + #[test] fn test_line_filter_observe_only_called_for_kept_lines() { let handler = CountingLineHandler {