Skip to content

Commit a9e737f

Browse files
committed
feat: add memory executor
1 parent b8936b7 commit a9e737f

12 files changed

Lines changed: 829 additions & 494 deletions

File tree

Cargo.lock

Lines changed: 579 additions & 475 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ memmap2 = "0.9.5"
5757
nix = { version = "0.29.0", features = ["fs", "time", "user"] }
5858
futures = "0.3.31"
5959
runner-shared = { path = "crates/runner-shared" }
60+
memtrack = { path = "crates/memtrack", default-features = false }
61+
ipc-channel = "0.18"
6062
shellexpand = { version = "3.1.1", features = ["tilde"] }
6163
addr2line = "0.25"
6264
gimli = "0.32"

src/run/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ pub enum RunnerMode {
148148
Instrumentation,
149149
Simulation,
150150
Walltime,
151+
Memory,
151152
}
152153

153154
#[derive(ValueEnum, Clone, Debug, PartialEq)]

src/run/runner/helpers/env.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub fn get_base_injected_env(
1515
#[allow(deprecated)]
1616
RunnerMode::Instrumentation | RunnerMode::Simulation => "instrumentation",
1717
RunnerMode::Walltime => "walltime",
18+
RunnerMode::Memory => "memory",
1819
};
1920
HashMap::from([
2021
("PYTHONHASHSEED", "0".into()),

src/run/runner/interfaces.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub struct RunData {
1010
pub enum ExecutorName {
1111
Valgrind,
1212
WallTime,
13+
Memory,
1314
}
1415

1516
#[allow(clippy::to_string_trait_impl)]
@@ -18,6 +19,7 @@ impl ToString for ExecutorName {
1819
match self {
1920
ExecutorName::Valgrind => "valgrind".to_string(),
2021
ExecutorName::WallTime => "walltime".to_string(),
22+
ExecutorName::Memory => "memory".to_string(),
2123
}
2224
}
2325
}

src/run/runner/memory/executor.rs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
use crate::prelude::*;
2+
use crate::run::instruments::mongo_tracer::MongoTracer;
3+
use crate::run::runner::executor::Executor;
4+
use crate::run::runner::helpers::command::CommandBuilder;
5+
use crate::run::runner::helpers::get_bench_command::get_bench_command;
6+
use crate::run::runner::helpers::run_command_with_log_pipe::run_command_with_log_pipe_and_callback;
7+
use crate::run::runner::helpers::run_with_sudo::wrap_with_sudo;
8+
use crate::run::runner::shared::fifo::RunnerFifo;
9+
use crate::run::runner::{ExecutorName, RunData};
10+
use crate::run::{check_system::SystemInfo, config::Config};
11+
use async_trait::async_trait;
12+
use ipc_channel::ipc;
13+
use memtrack::MemtrackIpcClient;
14+
use memtrack::MemtrackIpcServer;
15+
use runner_shared::benchmark_results::{ArtifactExt, ExecutionTimestamps};
16+
use runner_shared::fifo::Command as FifoCommand;
17+
use runner_shared::fifo::IntegrationMode;
18+
use std::path::Path;
19+
use std::process::Command;
20+
use std::rc::Rc;
21+
22+
pub struct MemoryExecutor;
23+
24+
impl MemoryExecutor {
25+
fn build_memtrack_command(
26+
config: &Config,
27+
run_data: &RunData,
28+
) -> Result<(MemtrackIpcServer, CommandBuilder)> {
29+
// FIXME: We only support native languages for now
30+
31+
// Find memtrack binary - check env variable or use default command name
32+
let memtrack_path = std::env::var("CODSPEED_MEMTRACK_BINARY")
33+
.unwrap_or_else(|_| "codspeed-memtrack".to_string());
34+
35+
let mut cmd_builder = CommandBuilder::new(memtrack_path);
36+
cmd_builder.arg("track");
37+
cmd_builder.arg(get_bench_command(config)?);
38+
cmd_builder.arg("--output");
39+
cmd_builder.arg(run_data.profile_folder.join("results"));
40+
41+
// Setup memtrack IPC server
42+
let (ipc_server, server_name) = ipc::IpcOneShotServer::new()?;
43+
cmd_builder.arg("--ipc-server");
44+
cmd_builder.arg(server_name);
45+
46+
Ok((ipc_server, cmd_builder))
47+
}
48+
}
49+
50+
#[async_trait(?Send)]
51+
impl Executor for MemoryExecutor {
52+
fn name(&self) -> ExecutorName {
53+
ExecutorName::Memory
54+
}
55+
56+
async fn setup(
57+
&self,
58+
_system_info: &SystemInfo,
59+
_setup_cache_dir: Option<&Path>,
60+
) -> Result<()> {
61+
// Validate that the codspeed-memtrack command is available
62+
let memtrack_path = std::env::var("CODSPEED_MEMTRACK_BINARY")
63+
.unwrap_or_else(|_| "codspeed-memtrack".to_string());
64+
65+
info!("Validating memtrack binary at path: {}", memtrack_path);
66+
let output = Command::new(&memtrack_path).arg("--version").output()?;
67+
if !output.status.success() {
68+
bail!("codspeed-memtrack command is not available or failed to execute");
69+
}
70+
71+
Ok(())
72+
}
73+
74+
async fn run(
75+
&self,
76+
config: &Config,
77+
_system_info: &SystemInfo,
78+
run_data: &RunData,
79+
_mongo_tracer: &Option<MongoTracer>,
80+
) -> Result<()> {
81+
// Create the results/ directory inside the profile folder to avoid having memtrack create it with wrong permissions
82+
std::fs::create_dir_all(run_data.profile_folder.join("results"))?;
83+
84+
let (ipc, cmd_builder) = Self::build_memtrack_command(config, run_data)?;
85+
let cmd = wrap_with_sudo(cmd_builder)?.build();
86+
debug!("cmd: {cmd:?}");
87+
88+
let runner_fifo = RunnerFifo::new()?;
89+
let on_process_started = async |pid| -> anyhow::Result<()> {
90+
let marker_result = Self::handle_fifo(runner_fifo, pid, ipc).await?;
91+
92+
// Directly write to the profile folder, to avoid having to define another field
93+
marker_result
94+
.save_to(run_data.profile_folder.join("results"))
95+
.unwrap();
96+
97+
Ok(())
98+
};
99+
100+
let status = run_command_with_log_pipe_and_callback(cmd, on_process_started).await?;
101+
debug!("cmd exit status: {:?}", status);
102+
103+
if !status.success() {
104+
bail!("failed to execute memory tracker process: {status}");
105+
}
106+
107+
Ok(())
108+
}
109+
110+
async fn teardown(
111+
&self,
112+
_config: &Config,
113+
_system_info: &SystemInfo,
114+
_run_data: &RunData,
115+
) -> Result<()> {
116+
Ok(())
117+
}
118+
}
119+
120+
impl MemoryExecutor {
121+
async fn handle_fifo(
122+
mut runner_fifo: RunnerFifo,
123+
pid: u32,
124+
ipc: MemtrackIpcServer,
125+
) -> anyhow::Result<ExecutionTimestamps> {
126+
debug!("handle_fifo called with PID {pid}");
127+
128+
// Accept the IPC connection from memtrack and get the sender it sends us
129+
let (_, memtrack_sender) = ipc.accept()?;
130+
let ipc_client = Rc::new(MemtrackIpcClient::from_accepted(memtrack_sender));
131+
132+
let ipc_client_health = Rc::clone(&ipc_client);
133+
let health_check = async move || {
134+
// Ping memtrack via IPC to check if it's still responding
135+
match ipc_client_health.ping() {
136+
Ok(()) => Ok(true),
137+
Err(_) => Ok(false),
138+
}
139+
};
140+
141+
let on_cmd = async move |cmd: &FifoCommand| {
142+
match cmd {
143+
FifoCommand::StartBenchmark => {
144+
debug!("Enabling memtrack via IPC");
145+
if let Err(e) = ipc_client.enable() {
146+
error!("Failed to enable memtrack: {e}");
147+
return Ok(FifoCommand::Err);
148+
}
149+
}
150+
FifoCommand::StopBenchmark => {
151+
debug!("Disabling memtrack via IPC");
152+
if let Err(e) = ipc_client.disable() {
153+
// There's a chance that memtrack has already exited here, so just log as debug
154+
debug!("Failed to disable memtrack: {e}");
155+
return Ok(FifoCommand::Err);
156+
}
157+
}
158+
FifoCommand::GetIntegrationMode => {
159+
return Ok(FifoCommand::IntegrationModeResponse(
160+
IntegrationMode::Analysis,
161+
));
162+
}
163+
_ => {
164+
warn!("Unhandled FIFO command: {cmd:?}");
165+
return Ok(FifoCommand::Err);
166+
}
167+
}
168+
169+
Ok(FifoCommand::Ack)
170+
};
171+
172+
let (marker_result, _) = runner_fifo
173+
.handle_fifo_messages(health_check, on_cmd)
174+
.await?;
175+
Ok(marker_result)
176+
}
177+
}

src/run/runner/memory/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod executor;

src/run/runner/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use super::{RunnerMode, config::Config};
77
mod executor;
88
mod helpers;
99
mod interfaces;
10+
mod memory;
1011
mod shared;
1112
#[cfg(test)]
1213
mod tests;
@@ -16,6 +17,7 @@ mod wall_time;
1617
use executor::Executor;
1718
use helpers::profile_folder::create_profile_folder;
1819
pub use interfaces::{ExecutorName, RunData};
20+
use memory::executor::MemoryExecutor;
1921
use valgrind::executor::ValgrindExecutor;
2022
use wall_time::executor::WallTimeExecutor;
2123

@@ -26,6 +28,7 @@ impl Display for RunnerMode {
2628
RunnerMode::Instrumentation => write!(f, "instrumentation"),
2729
RunnerMode::Simulation => write!(f, "simulation"),
2830
RunnerMode::Walltime => write!(f, "walltime"),
31+
RunnerMode::Memory => write!(f, "memory"),
2932
}
3033
}
3134
}
@@ -37,13 +40,15 @@ pub fn get_executor_from_mode(mode: &RunnerMode) -> Box<dyn Executor> {
3740
#[allow(deprecated)]
3841
RunnerMode::Instrumentation | RunnerMode::Simulation => Box::new(ValgrindExecutor),
3942
RunnerMode::Walltime => Box::new(WallTimeExecutor::new()),
43+
RunnerMode::Memory => Box::new(MemoryExecutor),
4044
}
4145
}
4246

4347
pub fn get_all_executors() -> Vec<Box<dyn Executor>> {
4448
vec![
4549
Box::new(ValgrindExecutor),
4650
Box::new(WallTimeExecutor::new()),
51+
Box::new(MemoryExecutor),
4752
]
4853
}
4954

src/run/runner/shared/fifo.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::prelude::*;
22
use anyhow::Context;
33
use nix::{sys::time::TimeValLike, time::clock_gettime};
4-
use runner_shared::benchmark_results::MarkerResult;
4+
use runner_shared::benchmark_results::ExecutionTimestamps;
55
use runner_shared::fifo::{Command as FifoCommand, MarkerType};
66
use runner_shared::fifo::{RUNNER_ACK_FIFO, RUNNER_CTL_FIFO};
77
use std::cmp::Ordering;
@@ -127,7 +127,7 @@ impl RunnerFifo {
127127
&mut self,
128128
mut health_check: impl AsyncFnMut() -> anyhow::Result<bool>,
129129
mut handle_cmd: impl AsyncFnMut(&FifoCommand) -> anyhow::Result<FifoCommand>,
130-
) -> anyhow::Result<(MarkerResult, FifoBenchmarkData)> {
130+
) -> anyhow::Result<(ExecutionTimestamps, FifoBenchmarkData)> {
131131
let mut bench_order_by_timestamp = Vec::<(u64, String)>::new();
132132
let mut bench_pids = HashSet::<pid_t>::new();
133133
let mut markers = Vec::<MarkerType>::new();
@@ -214,7 +214,7 @@ impl RunnerFifo {
214214
self.send_cmd(handle_cmd(&cmd).await?).await?;
215215
}
216216

217-
let marker_result = MarkerResult::new(&bench_order_by_timestamp, &markers);
217+
let marker_result = ExecutionTimestamps::new(&bench_order_by_timestamp, &markers);
218218
let fifo_data = FifoBenchmarkData {
219219
integration,
220220
bench_pids,

src/run/runner/tests.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::run::check_system::SystemInfo;
22
use crate::run::config::Config;
33
use crate::run::runner::executor::Executor;
44
use crate::run::runner::interfaces::RunData;
5+
use crate::run::runner::memory::executor::MemoryExecutor;
56
use crate::run::runner::valgrind::executor::ValgrindExecutor;
67
use crate::run::{RunnerMode, runner::wall_time::executor::WallTimeExecutor};
78
use rstest_reuse::{self, *};
@@ -299,3 +300,54 @@ fi
299300
assert!(result.is_err(), "Command should fail");
300301
}
301302
}
303+
304+
mod memory {
305+
use super::*;
306+
307+
async fn get_memory_executor() -> MemoryExecutor {
308+
let executor = MemoryExecutor;
309+
let system_info = SystemInfo::new().unwrap();
310+
executor.setup(&system_info, None).await.unwrap();
311+
executor
312+
}
313+
314+
#[cfg(test)]
315+
fn memory_config(command: &str) -> Config {
316+
Config {
317+
mode: RunnerMode::Memory,
318+
command: command.to_string(),
319+
..Config::test()
320+
}
321+
}
322+
323+
#[apply(test_cases)]
324+
#[tokio::test]
325+
async fn test_memory_executor(#[case] cmd: &str) {
326+
let (system_info, run_data, _temp_dir) = create_test_setup().await;
327+
let executor = get_memory_executor().await;
328+
329+
let config = memory_config(cmd);
330+
executor
331+
.run(&config, &system_info, &run_data, &None)
332+
.await
333+
.unwrap();
334+
}
335+
336+
#[apply(env_test_cases)]
337+
#[tokio::test]
338+
async fn test_memory_executor_with_env(#[case] env_case: (&str, &str)) {
339+
let (system_info, run_data, _temp_dir) = create_test_setup().await;
340+
let executor = get_memory_executor().await;
341+
342+
let (env_var, env_value) = env_case;
343+
temp_env::async_with_vars(&[(env_var, Some(env_value))], async {
344+
let cmd = env_var_validation_script(env_var, env_value);
345+
let config = memory_config(&cmd);
346+
executor
347+
.run(&config, &system_info, &run_data, &None)
348+
.await
349+
.unwrap();
350+
})
351+
.await;
352+
}
353+
}

0 commit comments

Comments
 (0)