Skip to content

Commit 9aaf842

Browse files
committed
feat: add memory executor
1 parent 4c2fbb2 commit 9aaf842

8 files changed

Lines changed: 189 additions & 0 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ memmap2 = "0.9.5"
5757
nix = { version = "0.29.0", features = ["fs", "time", "user"] }
5858
futures = "0.3.31"
5959
runner-shared = { path = "crates/runner-shared" }
60+
heaptrack = { path = "crates/heaptrack" }
61+
ipc-channel = "0.18"
6062
shellexpand = { version = "3.1.1", features = ["tilde"] }
6163
addr2line = "0.25"
6264
gimli = "0.32"

src/run/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ pub enum RunnerMode {
148148
Instrumentation,
149149
Simulation,
150150
Walltime,
151+
Memory,
151152
}
152153

153154
#[derive(ValueEnum, Clone, Debug, PartialEq)]

src/run/runner/helpers/env.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub fn get_base_injected_env(
1515
#[allow(deprecated)]
1616
RunnerMode::Instrumentation | RunnerMode::Simulation => "instrumentation",
1717
RunnerMode::Walltime => "walltime",
18+
RunnerMode::Memory => "memory",
1819
};
1920
HashMap::from([
2021
("PYTHONHASHSEED", "0".into()),

src/run/runner/interfaces.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub struct RunData {
1010
pub enum ExecutorName {
1111
Valgrind,
1212
WallTime,
13+
Memory,
1314
}
1415

1516
#[allow(clippy::to_string_trait_impl)]
@@ -18,6 +19,7 @@ impl ToString for ExecutorName {
1819
match self {
1920
ExecutorName::Valgrind => "valgrind".to_string(),
2021
ExecutorName::WallTime => "walltime".to_string(),
22+
ExecutorName::Memory => "memory".to_string(),
2123
}
2224
}
2325
}

src/run/runner/memory/executor.rs

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
use crate::prelude::*;
2+
use crate::run::instruments::mongo_tracer::MongoTracer;
3+
use crate::run::runner::executor::Executor;
4+
use crate::run::runner::helpers::command::CommandBuilder;
5+
use crate::run::runner::helpers::get_bench_command::get_bench_command;
6+
use crate::run::runner::helpers::run_command_with_log_pipe::run_command_with_log_pipe_and_callback;
7+
use crate::run::runner::helpers::run_with_sudo::wrap_with_sudo;
8+
use crate::run::runner::shared::fifo::RunnerFifo;
9+
use crate::run::runner::{ExecutorName, RunData};
10+
use crate::run::{check_system::SystemInfo, config::Config};
11+
use async_trait::async_trait;
12+
use heaptrack::HeaptrackIpcClient;
13+
use heaptrack::HeaptrackIpcServer;
14+
use ipc_channel::ipc;
15+
use runner_shared::fifo::Command as FifoCommand;
16+
use runner_shared::fifo::IntegrationMode;
17+
use std::path::Path;
18+
use std::process::Command;
19+
use std::rc::Rc;
20+
21+
pub struct MemoryExecutor;
22+
23+
impl MemoryExecutor {
24+
fn build_heaptrack_command(
25+
config: &Config,
26+
run_data: &RunData,
27+
) -> Result<(HeaptrackIpcServer, CommandBuilder)> {
28+
// FIXME: We only support native languages for now
29+
30+
// Find heaptrack binary - check env variable or use default command name
31+
let heaptrack_path = std::env::var("CODSPEED_HEAPTRACK_BINARY")
32+
.unwrap_or_else(|_| "codspeed-heaptrack".to_string());
33+
34+
// Always use env to preserve LD_LIBRARY_PATH and other environment variables
35+
let mut cmd_builder = CommandBuilder::new("env");
36+
37+
// Preserve LD_LIBRARY_PATH from the current environment if it exists
38+
if let Ok(ld_library_path) = std::env::var("LD_LIBRARY_PATH") {
39+
cmd_builder.arg(format!("LD_LIBRARY_PATH={ld_library_path}"));
40+
}
41+
42+
cmd_builder.arg(&heaptrack_path);
43+
cmd_builder.arg("track");
44+
cmd_builder.arg(get_bench_command(config)?);
45+
cmd_builder.arg("--output");
46+
cmd_builder.arg(run_data.profile_folder.join("results"));
47+
48+
// Setup heaptrack IPC server
49+
let (ipc_server, server_name) = ipc::IpcOneShotServer::new()?;
50+
cmd_builder.arg("--ipc-server");
51+
cmd_builder.arg(server_name);
52+
53+
Ok((ipc_server, cmd_builder))
54+
}
55+
}
56+
57+
#[async_trait(?Send)]
58+
impl Executor for MemoryExecutor {
59+
fn name(&self) -> ExecutorName {
60+
ExecutorName::Memory
61+
}
62+
63+
async fn setup(
64+
&self,
65+
_system_info: &SystemInfo,
66+
_setup_cache_dir: Option<&Path>,
67+
) -> Result<()> {
68+
// Validate that the codspeed-heaptrack command is available
69+
let heaptrack_path = std::env::var("CODSPEED_HEAPTRACK_BINARY")
70+
.unwrap_or_else(|_| "codspeed-heaptrack".to_string());
71+
72+
info!("Validating heaptrack binary at path: {}", heaptrack_path);
73+
let output = Command::new(&heaptrack_path).arg("--version").output()?;
74+
if !output.status.success() {
75+
bail!("codspeed-heaptrack command is not available or failed to execute");
76+
}
77+
78+
Ok(())
79+
}
80+
81+
async fn run(
82+
&self,
83+
config: &Config,
84+
_system_info: &SystemInfo,
85+
run_data: &RunData,
86+
_mongo_tracer: &Option<MongoTracer>,
87+
) -> Result<()> {
88+
let (ipc, cmd_builder) = Self::build_heaptrack_command(config, run_data)?;
89+
let cmd = wrap_with_sudo(cmd_builder)?.build();
90+
debug!("cmd: {cmd:?}");
91+
92+
let runner_fifo = RunnerFifo::new()?;
93+
let on_process_started = async |pid| -> anyhow::Result<()> {
94+
Self::handle_fifo(runner_fifo, pid, ipc).await?;
95+
Ok(())
96+
};
97+
98+
let status = run_command_with_log_pipe_and_callback(cmd, on_process_started).await?;
99+
debug!("cmd exit status: {:?}", status);
100+
101+
if !status.success() {
102+
bail!("failed to execute memory tracker process: {status}");
103+
}
104+
105+
Ok(())
106+
}
107+
108+
async fn teardown(
109+
&self,
110+
_config: &Config,
111+
_system_info: &SystemInfo,
112+
_run_data: &RunData,
113+
) -> Result<()> {
114+
Ok(())
115+
}
116+
}
117+
118+
impl MemoryExecutor {
119+
async fn handle_fifo(
120+
mut runner_fifo: RunnerFifo,
121+
pid: u32,
122+
ipc: HeaptrackIpcServer,
123+
) -> anyhow::Result<()> {
124+
debug!("handle_fifo called with PID {pid}");
125+
126+
// Accept the IPC connection from heaptrack and get the sender it sends us
127+
let (_, heaptrack_sender) = ipc.accept()?;
128+
let ipc_client = Rc::new(HeaptrackIpcClient::from_accepted(heaptrack_sender));
129+
130+
let ipc_client_health = Rc::clone(&ipc_client);
131+
let health_check = async move || {
132+
// Ping heaptrack via IPC to check if it's still responding
133+
match ipc_client_health.ping() {
134+
Ok(()) => Ok(true),
135+
Err(_) => Ok(false),
136+
}
137+
};
138+
139+
let on_cmd = async move |cmd: &FifoCommand| {
140+
match cmd {
141+
FifoCommand::StartBenchmark => {
142+
debug!("Enabling heaptrack via IPC");
143+
if let Err(e) = ipc_client.enable() {
144+
error!("Failed to enable heaptrack: {e}");
145+
return Ok(FifoCommand::Err);
146+
}
147+
}
148+
FifoCommand::StopBenchmark => {
149+
debug!("Disabling heaptrack via IPC");
150+
if let Err(e) = ipc_client.disable() {
151+
// There's a chance that heaptrack has already exited here, so just log as debug
152+
debug!("Failed to disable heaptrack: {e}");
153+
return Ok(FifoCommand::Err);
154+
}
155+
}
156+
FifoCommand::GetIntegrationMode => {
157+
return Ok(FifoCommand::IntegrationModeResponse(
158+
IntegrationMode::Analysis,
159+
));
160+
}
161+
_ => {
162+
warn!("Unhandled FIFO command: {cmd:?}");
163+
return Ok(FifoCommand::Err);
164+
}
165+
}
166+
167+
Ok(FifoCommand::Ack)
168+
};
169+
170+
let _data = runner_fifo
171+
.handle_fifo_messages(health_check, on_cmd)
172+
.await?;
173+
// TODO: We have to write the data somewhere too
174+
Ok(())
175+
}
176+
}

src/run/runner/memory/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod executor;

src/run/runner/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use super::{RunnerMode, config::Config};
77
mod executor;
88
mod helpers;
99
mod interfaces;
10+
mod memory;
1011
mod shared;
1112
#[cfg(test)]
1213
mod tests;
@@ -16,6 +17,7 @@ mod wall_time;
1617
use executor::Executor;
1718
use helpers::profile_folder::create_profile_folder;
1819
pub use interfaces::{ExecutorName, RunData};
20+
use memory::executor::MemoryExecutor;
1921
use valgrind::executor::ValgrindExecutor;
2022
use wall_time::executor::WallTimeExecutor;
2123

@@ -26,6 +28,7 @@ impl Display for RunnerMode {
2628
RunnerMode::Instrumentation => write!(f, "instrumentation"),
2729
RunnerMode::Simulation => write!(f, "simulation"),
2830
RunnerMode::Walltime => write!(f, "walltime"),
31+
RunnerMode::Memory => write!(f, "memory"),
2932
}
3033
}
3134
}
@@ -37,13 +40,15 @@ pub fn get_executor_from_mode(mode: &RunnerMode) -> Box<dyn Executor> {
3740
#[allow(deprecated)]
3841
RunnerMode::Instrumentation | RunnerMode::Simulation => Box::new(ValgrindExecutor),
3942
RunnerMode::Walltime => Box::new(WallTimeExecutor::new()),
43+
RunnerMode::Memory => Box::new(MemoryExecutor),
4044
}
4145
}
4246

4347
pub fn get_all_executors() -> Vec<Box<dyn Executor>> {
4448
vec![
4549
Box::new(ValgrindExecutor),
4650
Box::new(WallTimeExecutor::new()),
51+
Box::new(MemoryExecutor),
4752
]
4853
}
4954

src/run/uploader/upload.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ async fn create_profile_archive(
5858
) -> Result<ProfileArchive> {
5959
let time_start = std::time::Instant::now();
6060
let profile_archive = match executor_name {
61+
ExecutorName::Memory => todo!(),
6162
ExecutorName::Valgrind => {
6263
debug!("Creating compressed tar archive for Valgrind");
6364
let enc = GzipEncoder::new(Vec::new());

0 commit comments

Comments
 (0)