Skip to content

Commit d891f6d

Browse files
committed
fix(runner): use child process instead of pid/perf fifo for FIFO health check
1 parent 1833366 commit d891f6d

4 files changed

Lines changed: 62 additions & 80 deletions

File tree

src/executor/helpers/run_command_with_log_pipe.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::thread;
1111
///
1212
/// # Arguments
1313
/// - `cmd`: The command to run.
14-
/// - `cb`: A callback function that takes the process ID and returns a result.
14+
/// - `cb`: A callback function that takes the process and returns the exit status.
1515
///
1616
/// # Returns
1717
///
@@ -22,8 +22,8 @@ pub async fn run_command_with_log_pipe_and_callback<F, Fut>(
2222
cb: F,
2323
) -> Result<ExitStatus>
2424
where
25-
F: FnOnce(u32) -> Fut,
26-
Fut: Future<Output = anyhow::Result<()>>,
25+
F: FnOnce(std::process::Child) -> Fut,
26+
Fut: Future<Output = anyhow::Result<ExitStatus>>,
2727
{
2828
fn log_tee(
2929
mut reader: impl Read,
@@ -93,11 +93,9 @@ where
9393
log_tee(stderr, std::io::stderr(), Some("[stderr]")).unwrap();
9494
});
9595

96-
cb(process.id()).await?;
97-
98-
process.wait().context("failed to wait for the process")
96+
cb(process).await
9997
}
10098

10199
pub async fn run_command_with_log_pipe(cmd: Command) -> Result<ExitStatus> {
102-
run_command_with_log_pipe_and_callback(cmd, async |_| Ok(())).await
100+
run_command_with_log_pipe_and_callback(cmd, |mut child| async move { Ok(child.wait()?) }).await
103101
}

src/executor/memory/executor.rs

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,16 @@ impl Executor for MemoryExecutor {
9898
debug!("cmd: {cmd:?}");
9999

100100
let runner_fifo = RunnerFifo::new()?;
101-
let on_process_started = async |pid| -> anyhow::Result<()> {
102-
let marker_result = Self::handle_fifo(runner_fifo, pid, ipc).await?;
101+
let on_process_started = |mut child: std::process::Child| async move {
102+
let (marker_result, exit_status) =
103+
Self::handle_fifo(runner_fifo, ipc, &mut child).await?;
103104

104105
// Directly write to the profile folder, to avoid having to define another field
105106
marker_result
106107
.save_to(execution_context.profile_folder.join("results"))
107108
.unwrap();
108109

109-
Ok(())
110+
Ok(exit_status)
110111
};
111112

112113
let status = run_command_with_log_pipe_and_callback(cmd, on_process_started).await?;
@@ -152,11 +153,9 @@ impl Executor for MemoryExecutor {
152153
impl MemoryExecutor {
153154
async fn handle_fifo(
154155
mut runner_fifo: RunnerFifo,
155-
pid: u32,
156156
ipc: MemtrackIpcServer,
157-
) -> anyhow::Result<ExecutionTimestamps> {
158-
debug!("handle_fifo called with PID {pid}");
159-
157+
child: &mut std::process::Child,
158+
) -> anyhow::Result<(ExecutionTimestamps, std::process::ExitStatus)> {
160159
// Accept the IPC connection from memtrack and get the sender it sends us
161160
// Use a timeout to prevent hanging if the process doesn't start properly
162161
// https://github.com/servo/ipc-channel/issues/261
@@ -170,15 +169,6 @@ impl MemoryExecutor {
170169
.context("Timeout waiting for IPC connection from memtrack process")??;
171170
let ipc_client = Rc::new(MemtrackIpcClient::from_accepted(memtrack_sender));
172171

173-
let ipc_client_health = Rc::clone(&ipc_client);
174-
let health_check = async move || {
175-
// Ping memtrack via IPC to check if it's still responding
176-
match ipc_client_health.ping() {
177-
Ok(()) => Ok(true),
178-
Err(_) => Ok(false),
179-
}
180-
};
181-
182172
let on_cmd = async move |cmd: &FifoCommand| {
183173
const INVALID_INTEGRATION_ERROR: &str = "This integration doesn't support memory profiling. Please update your integration to a version that supports memory profiling.";
184174

@@ -234,9 +224,9 @@ impl MemoryExecutor {
234224
Ok(None)
235225
};
236226

237-
let (marker_result, _) = runner_fifo
238-
.handle_fifo_messages(health_check, on_cmd)
239-
.await?;
240-
Ok(marker_result)
227+
let (marker_result, _, exit_status) =
228+
runner_fifo.handle_fifo_messages(child, on_cmd).await?;
229+
230+
Ok((marker_result, exit_status))
241231
}
242232
}

src/executor/shared/fifo.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,22 @@ impl RunnerFifo {
131131
Ok(())
132132
}
133133

134-
/// Handles all incoming FIFO messages until it's closed, or until the health check closure
135-
/// returns `false` or an error.
134+
/// Handles all incoming FIFO messages until it's closed, or until the child process exits.
136135
///
137136
/// The `handle_cmd` callback is invoked first for each command. If it returns `Some(response)`,
138137
/// that response is sent and the shared implementation is skipped. If it returns `None`,
139138
/// the command falls through to the shared implementation for standard handling.
139+
///
140+
/// Returns execution timestamps, benchmark data, and the exit status of the child process.
140141
pub async fn handle_fifo_messages(
141142
&mut self,
142-
mut health_check: impl AsyncFnMut() -> anyhow::Result<bool>,
143+
child: &mut std::process::Child,
143144
mut handle_cmd: impl AsyncFnMut(&FifoCommand) -> anyhow::Result<Option<FifoCommand>>,
144-
) -> anyhow::Result<(ExecutionTimestamps, FifoBenchmarkData)> {
145+
) -> anyhow::Result<(
146+
ExecutionTimestamps,
147+
FifoBenchmarkData,
148+
std::process::ExitStatus,
149+
)> {
145150
let mut bench_order_by_timestamp = Vec::<(u64, String)>::new();
146151
let mut bench_pids = HashSet::<pid_t>::new();
147152
let mut markers = Vec::<MarkerType>::new();
@@ -241,20 +246,24 @@ impl RunnerFifo {
241246
}
242247
}
243248

244-
let is_alive = health_check().await?;
245-
if !is_alive {
246-
debug!("Process terminated, stopping the command handler");
247-
break;
249+
// Check if the process has exited using try_wait (non-blocking)
250+
match child.try_wait() {
251+
Ok(None) => {} // Still running, continue loop
252+
Ok(Some(exit_status)) => {
253+
debug!(
254+
"Process terminated with status: {exit_status}, stopping the command handler"
255+
);
256+
let marker_result =
257+
ExecutionTimestamps::new(&bench_order_by_timestamp, &markers);
258+
let fifo_data = FifoBenchmarkData {
259+
integration,
260+
bench_pids,
261+
};
262+
return Ok((marker_result, fifo_data, exit_status));
263+
}
264+
Err(e) => return Err(anyhow::Error::from(e)),
248265
}
249266
}
250-
251-
let marker_result = ExecutionTimestamps::new(&bench_order_by_timestamp, &markers);
252-
let fifo_data = FifoBenchmarkData {
253-
integration,
254-
bench_pids,
255-
};
256-
257-
Ok((marker_result, fifo_data))
258267
}
259268
}
260269

src/executor/wall_time/perf/mod.rs

Lines changed: 22 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@ use runner_shared::metadata::PerfMetadata;
3131
use runner_shared::unwind_data::UnwindData;
3232
use std::path::Path;
3333
use std::path::PathBuf;
34-
use std::sync::Arc;
35-
use std::time::Duration;
3634
use std::{cell::OnceCell, collections::HashMap, process::ExitStatus};
37-
use tokio::sync::Mutex;
3835

3936
mod jit_dump;
4037
mod memory_mappings;
@@ -205,14 +202,15 @@ impl PerfRunner {
205202
let cmd = wrap_with_sudo(wrapped_builder)?.build();
206203
debug!("cmd: {cmd:?}");
207204

208-
let on_process_started = async |_| -> anyhow::Result<()> {
205+
let on_process_started = |mut child: std::process::Child| async move {
209206
// If we output pipedata, we do not parse the perf map during teardown yet, so we need to parse memory
210207
// maps as we receive the `CurrentBenchmark` fifo commands.
211-
let data = Self::handle_fifo(runner_fifo, perf_fifo, self.output_pipedata).await?;
208+
let (data, exit_status) =
209+
Self::handle_fifo(runner_fifo, perf_fifo, self.output_pipedata, &mut child).await?;
212210
self.benchmark_data.set(data).unwrap_or_else(|_| {
213211
error!("Failed to set benchmark data in PerfRunner");
214212
});
215-
Ok(())
213+
Ok(exit_status)
216214
};
217215
run_command_with_log_pipe_and_callback(cmd, on_process_started).await
218216
}
@@ -248,37 +246,21 @@ impl PerfRunner {
248246

249247
async fn handle_fifo(
250248
mut runner_fifo: RunnerFifo,
251-
perf_fifo: PerfFifo,
249+
mut perf_fifo: PerfFifo,
252250
parse_memory_maps: bool,
253-
) -> anyhow::Result<BenchmarkData> {
251+
child: &mut std::process::Child,
252+
) -> anyhow::Result<(BenchmarkData, std::process::ExitStatus)> {
254253
let mut symbols_by_pid = HashMap::<pid_t, ProcessSymbols>::new();
255254
let mut unwind_data_by_pid = HashMap::<pid_t, Vec<UnwindData>>::new();
256255

257-
let perf_fifo = Arc::new(Mutex::new(perf_fifo));
258-
let mut perf_ping_timeout = 5;
259-
let health_check = async || {
260-
let perf_ping = tokio::time::timeout(Duration::from_secs(perf_ping_timeout), async {
261-
perf_fifo.lock().await.ping().await
262-
})
263-
.await;
264-
if let Ok(Err(_)) | Err(_) = perf_ping {
265-
debug!("Failed to ping perf FIFO, ending perf fifo loop");
266-
return Ok(false);
267-
}
268-
// Perf has started successfully, we can decrease the timeout for future pings
269-
perf_ping_timeout = 1;
270-
271-
Ok(true)
272-
};
273-
274256
let on_cmd = async |cmd: &FifoCommand| {
275257
#[allow(deprecated)]
276258
match cmd {
277259
FifoCommand::StartBenchmark => {
278-
perf_fifo.lock().await.start_events().await?;
260+
perf_fifo.start_events().await?;
279261
}
280262
FifoCommand::StopBenchmark => {
281-
perf_fifo.lock().await.stop_events().await?;
263+
perf_fifo.stop_events().await?;
282264
}
283265
FifoCommand::CurrentBenchmark { pid, .. } => {
284266
#[cfg(target_os = "linux")]
@@ -294,7 +276,7 @@ impl PerfRunner {
294276
}
295277
}
296278
FifoCommand::PingPerf => {
297-
if perf_fifo.lock().await.ping().await.is_err() {
279+
if perf_fifo.ping().await.is_err() {
298280
return Ok(Some(FifoCommand::Err));
299281
}
300282
return Ok(Some(FifoCommand::Ack));
@@ -310,15 +292,18 @@ impl PerfRunner {
310292
Ok(None)
311293
};
312294

313-
let (marker_result, fifo_data) = runner_fifo
314-
.handle_fifo_messages(health_check, on_cmd)
315-
.await?;
316-
Ok(BenchmarkData {
317-
fifo_data,
318-
marker_result,
319-
symbols_by_pid,
320-
unwind_data_by_pid,
321-
})
295+
let (marker_result, fifo_data, exit_status) =
296+
runner_fifo.handle_fifo_messages(child, on_cmd).await?;
297+
298+
Ok((
299+
BenchmarkData {
300+
fifo_data,
301+
marker_result,
302+
symbols_by_pid,
303+
unwind_data_by_pid,
304+
},
305+
exit_status,
306+
))
322307
}
323308

324309
fn get_perf_file_path<P: AsRef<Path>>(&self, profile_folder: P) -> PathBuf {

0 commit comments

Comments
 (0)