Skip to content

Commit f68eabe

Browse files
committed
fix(runner): make FIFO cancel-safe
1 parent 6782062 commit f68eabe

1 file changed

Lines changed: 22 additions & 17 deletions

File tree

src/executor/shared/fifo.rs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
use crate::prelude::*;
22
use anyhow::Context;
3+
use futures::StreamExt;
34
use nix::{sys::time::TimeValLike, time::clock_gettime};
45
use runner_shared::artifacts::ExecutionTimestamps;
56
use runner_shared::fifo::{Command as FifoCommand, MarkerType};
67
use runner_shared::fifo::{RUNNER_ACK_FIFO, RUNNER_CTL_FIFO};
78
use std::cmp::Ordering;
89
use std::path::{Path, PathBuf};
910
use std::{collections::HashSet, time::Duration};
10-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
11+
use tokio::io::AsyncWriteExt;
1112
use tokio::net::unix::pid_t;
1213
use tokio::net::unix::pipe::OpenOptions as TokioPipeOpenOptions;
1314
use tokio::net::unix::pipe::Receiver as TokioPipeReader;
1415
use tokio::net::unix::pipe::Sender as TokioPipeSender;
16+
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
1517

1618
fn create_fifo<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<()> {
1719
// Remove the previous FIFO (if it exists)
@@ -71,7 +73,7 @@ pub struct FifoBenchmarkData {
7173

7274
pub struct RunnerFifo {
7375
ack_fifo: TokioPipeSender,
74-
ctl_fifo: TokioPipeReader,
76+
ctl_reader: FramedRead<TokioPipeReader, LengthDelimitedCodec>,
7577
}
7678

7779
fn get_pipe_open_options() -> TokioPipeOpenOptions {
@@ -94,24 +96,27 @@ impl RunnerFifo {
9496
let ack_fifo = get_pipe_open_options().open_sender(ack_path)?;
9597
let ctl_fifo = get_pipe_open_options().open_receiver(ctl_path)?;
9698

97-
Ok(Self { ctl_fifo, ack_fifo })
99+
let codec = LengthDelimitedCodec::builder()
100+
.length_field_length(4)
101+
.little_endian()
102+
.new_codec();
103+
let ctl_reader = FramedRead::new(ctl_fifo, codec);
104+
105+
Ok(Self {
106+
ack_fifo,
107+
ctl_reader,
108+
})
98109
}
99110

100111
pub async fn recv_cmd(&mut self) -> anyhow::Result<FifoCommand> {
101-
let mut len_buffer = [0u8; 4];
102-
self.ctl_fifo.read_exact(&mut len_buffer).await?;
103-
let message_len = u32::from_le_bytes(len_buffer) as usize;
104-
105-
let mut buffer = vec![0u8; message_len];
106-
loop {
107-
if self.ctl_fifo.read_exact(&mut buffer).await.is_ok() {
108-
break;
109-
}
110-
}
111-
112-
let decoded = bincode::deserialize(&buffer).with_context(|| {
113-
format!("Failed to deserialize FIFO command (len: {message_len}, data: {buffer:?})")
114-
})?;
112+
let bytes = self
113+
.ctl_reader
114+
.next()
115+
.await
116+
.ok_or_else(|| anyhow::anyhow!("FIFO stream closed"))??;
117+
118+
let decoded = bincode::deserialize(&bytes)
119+
.with_context(|| format!("Failed to deserialize FIFO command (data: {bytes:?})"))?;
115120
Ok(decoded)
116121
}
117122

0 commit comments

Comments
 (0)