Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 74 additions & 9 deletions src/core/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,14 @@ pub fn run_streaming(
Some(std::thread::spawn(move || {
let mut writer = BufWriter::new(child_stdin);
let stdin_handle = io::stdin();
for line in BufReader::new(stdin_handle.lock())
.lines()
.map_while(Result::ok)
{
let mut reader = BufReader::new(stdin_handle.lock());
let mut buf = Vec::new();
while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) {
let line = {
let s = String::from_utf8_lossy(&buf);
s.trim_end_matches('\n').trim_end_matches('\r').to_owned()
};
buf.clear();
if let Some(out) = filter.feed_line(&line) {
if writeln!(writer, "{}", out).is_err() {
break;
Expand Down Expand Up @@ -340,15 +344,29 @@ pub fn run_streaming(
let (tx, rx) = mpsc::channel();
let tx_out = tx.clone();
let stdout_thread = std::thread::spawn(move || {
for line in BufReader::new(stdout).lines().map_while(Result::ok) {
let mut reader = BufReader::new(stdout);
let mut buf = Vec::new();
while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) {
let line = {
let s = String::from_utf8_lossy(&buf);
s.trim_end_matches('\n').trim_end_matches('\r').to_owned()
};
buf.clear();
if tx_out.send(StreamLine::Stdout(line)).is_err() {
break;
}
}
});
let tx_err = tx;
let stderr_thread = std::thread::spawn(move || {
for line in BufReader::new(stderr).lines().map_while(Result::ok) {
let mut reader = BufReader::new(stderr);
let mut buf = Vec::new();
while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) {
let line = {
let s = String::from_utf8_lossy(&buf);
s.trim_end_matches('\n').trim_end_matches('\r').to_owned()
};
buf.clear();
if tx_err.send(StreamLine::Stderr(line)).is_err() {
break;
}
Expand Down Expand Up @@ -417,7 +435,14 @@ pub fn run_streaming(
let stderr_thread = std::thread::spawn(move || -> String {
let mut raw_err = String::new();
let mut capped = false;
for line in BufReader::new(stderr).lines().map_while(Result::ok) {
let mut reader = BufReader::new(stderr);
let mut buf = Vec::new();
while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) {
let line = {
let s = String::from_utf8_lossy(&buf);
s.trim_end_matches('\n').trim_end_matches('\r').to_owned()
};
buf.clear();
if raw_err.len() + line.len() < RAW_CAP {
raw_err.push_str(&line);
raw_err.push('\n');
Expand All @@ -436,7 +461,14 @@ pub fn run_streaming(
FilterMode::Passthrough => unreachable!("handled by early-return above"),
FilterMode::Streaming(_) => unreachable!("handled by is_streaming branch"),
FilterMode::Buffered(filter_fn) => {
for line in BufReader::new(stdout).lines().map_while(Result::ok) {
let mut reader = BufReader::new(stdout);
let mut buf = Vec::new();
while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) {
let line = {
let s = String::from_utf8_lossy(&buf);
s.trim_end_matches('\n').trim_end_matches('\r').to_owned()
};
buf.clear();
if raw_stdout.len() + line.len() < RAW_CAP {
raw_stdout.push_str(&line);
raw_stdout.push('\n');
Expand All @@ -461,7 +493,14 @@ pub fn run_streaming(
}
}
FilterMode::CaptureOnly => {
for line in BufReader::new(stdout).lines().map_while(Result::ok) {
let mut reader = BufReader::new(stdout);
let mut buf = Vec::new();
while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) {
let line = {
let s = String::from_utf8_lossy(&buf);
s.trim_end_matches('\n').trim_end_matches('\r').to_owned()
};
buf.clear();
if raw_stdout.len() + line.len() < RAW_CAP {
raw_stdout.push_str(&line);
raw_stdout.push('\n');
Expand Down Expand Up @@ -1105,6 +1144,32 @@ pub(crate) mod tests {
assert!(result.contains("exit=42"), "got: {}", result);
}

#[test]
fn test_utf8_lossy_read_until_preserves_all_lines() {
use std::io::Cursor;
// Fixture: valid line, then a non-UTF-8 byte, then valid trailing line.
// The old lines().map_while(Result::ok) pattern would drop "after bad" silently.
let data: Vec<u8> = b"valid line\n\xFF\xFE bad\nafter bad\n".to_vec();
let mut reader = BufReader::new(Cursor::new(data));
let mut lines_out: Vec<String> = Vec::new();
let mut buf = Vec::new();
while reader.read_until(b'\n', &mut buf).map(|n| n > 0).unwrap_or(false) {
let line = {
let s = String::from_utf8_lossy(&buf);
s.trim_end_matches('\n').trim_end_matches('\r').to_owned()
};
buf.clear();
lines_out.push(line);
}
assert_eq!(lines_out.len(), 3, "all 3 lines must be returned, got: {:?}", lines_out);
assert_eq!(lines_out[0], "valid line");
assert!(
lines_out[1].contains('\u{FFFD}'),
"line 2 must contain replacement char, got: {:?}", lines_out[1]
);
assert_eq!(lines_out[2], "after bad", "line after invalid UTF-8 must be preserved");
}

#[test]
fn test_line_filter_observe_only_called_for_kept_lines() {
let handler = CountingLineHandler {
Expand Down