-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathexecutor.rs
More file actions
185 lines (160 loc) · 6.82 KB
/
executor.rs
File metadata and controls
185 lines (160 loc) · 6.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
use crate::executor::ExecutorName;
use crate::executor::helpers::command::CommandBuilder;
use crate::executor::helpers::env::get_base_injected_env;
use crate::executor::helpers::get_bench_command::get_bench_command;
use crate::executor::helpers::run_command_with_log_pipe::run_command_with_log_pipe_and_callback;
use crate::executor::helpers::run_with_env::wrap_with_env;
use crate::executor::helpers::run_with_sudo::wrap_with_sudo;
use crate::executor::shared::fifo::RunnerFifo;
use crate::executor::{ExecutionContext, Executor};
use crate::instruments::mongo_tracer::MongoTracer;
use crate::prelude::*;
use crate::run::check_system::SystemInfo;
use crate::runner_mode::RunnerMode;
use async_trait::async_trait;
use ipc_channel::ipc;
use memtrack::MemtrackIpcClient;
use memtrack::MemtrackIpcServer;
use runner_shared::artifacts::{ArtifactExt, ExecutionTimestamps};
use runner_shared::fifo::Command as FifoCommand;
use runner_shared::fifo::IntegrationMode;
use std::path::Path;
use std::process::Command;
use std::rc::Rc;
use tempfile::NamedTempFile;
pub struct MemoryExecutor;
impl MemoryExecutor {
fn build_memtrack_command(
execution_context: &ExecutionContext,
) -> Result<(MemtrackIpcServer, CommandBuilder, NamedTempFile)> {
// FIXME: We only support native languages for now
// Find memtrack binary - check env variable or use default command name
let memtrack_path = std::env::var("CODSPEED_MEMTRACK_BINARY")
.unwrap_or_else(|_| "codspeed-memtrack".to_string());
// Setup memtrack IPC server
let (ipc_server, server_name) = ipc::IpcOneShotServer::new()?;
// Build the memtrack command
let mut cmd_builder = CommandBuilder::new(memtrack_path);
cmd_builder.arg("track");
cmd_builder.arg("--output");
cmd_builder.arg(execution_context.profile_folder.join("results"));
cmd_builder.arg("--ipc-server");
cmd_builder.arg(server_name);
cmd_builder.arg(get_bench_command(&execution_context.config)?);
// Wrap command with environment forwarding
let extra_env =
get_base_injected_env(RunnerMode::Memory, &execution_context.profile_folder);
let (cmd_builder, env_file) = wrap_with_env(cmd_builder, &extra_env)?;
Ok((ipc_server, cmd_builder, env_file))
}
}
#[async_trait(?Send)]
impl Executor for MemoryExecutor {
fn name(&self) -> ExecutorName {
ExecutorName::Memory
}
async fn setup(
&self,
_system_info: &SystemInfo,
_setup_cache_dir: Option<&Path>,
) -> Result<()> {
// Validate that the codspeed-memtrack command is available
let memtrack_path = std::env::var("CODSPEED_MEMTRACK_BINARY")
.unwrap_or_else(|_| "codspeed-memtrack".to_string());
info!("Validating memtrack binary at path: {memtrack_path}");
let output = Command::new(&memtrack_path).arg("--version").output()?;
if !output.status.success() {
bail!(
"codspeed-memtrack command is not available or failed to execute\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
);
}
Ok(())
}
async fn run(
&self,
execution_context: &ExecutionContext,
_mongo_tracer: &Option<MongoTracer>,
) -> Result<()> {
// Create the results/ directory inside the profile folder to avoid having memtrack create it with wrong permissions
std::fs::create_dir_all(execution_context.profile_folder.join("results"))?;
let (ipc, cmd_builder, _env_file) = Self::build_memtrack_command(execution_context)?;
let cmd = wrap_with_sudo(cmd_builder)?.build();
debug!("cmd: {cmd:?}");
let runner_fifo = RunnerFifo::new()?;
let on_process_started = async |pid| -> anyhow::Result<()> {
let marker_result = Self::handle_fifo(runner_fifo, pid, ipc).await?;
// Directly write to the profile folder, to avoid having to define another field
marker_result
.save_to(execution_context.profile_folder.join("results"))
.unwrap();
Ok(())
};
let status = run_command_with_log_pipe_and_callback(cmd, on_process_started).await?;
debug!("cmd exit status: {status:?}");
if !status.success() {
bail!("failed to execute memory tracker process: {status}");
}
Ok(())
}
async fn teardown(&self, _execution_context: &ExecutionContext) -> Result<()> {
Ok(())
}
}
impl MemoryExecutor {
async fn handle_fifo(
mut runner_fifo: RunnerFifo,
pid: u32,
ipc: MemtrackIpcServer,
) -> anyhow::Result<ExecutionTimestamps> {
debug!("handle_fifo called with PID {pid}");
// Accept the IPC connection from memtrack and get the sender it sends us
let (_, memtrack_sender) = ipc.accept()?;
let ipc_client = Rc::new(MemtrackIpcClient::from_accepted(memtrack_sender));
let ipc_client_health = Rc::clone(&ipc_client);
let health_check = async move || {
// Ping memtrack via IPC to check if it's still responding
match ipc_client_health.ping() {
Ok(()) => Ok(true),
Err(_) => Ok(false),
}
};
let on_cmd = async move |cmd: &FifoCommand| {
match cmd {
FifoCommand::CurrentBenchmark { pid, uri } => {
debug!("Current benchmark: {pid}, {uri}");
}
FifoCommand::StartBenchmark => {
debug!("Enabling memtrack via IPC");
if let Err(e) = ipc_client.enable() {
error!("Failed to enable memtrack: {e}");
return Ok(FifoCommand::Err);
}
}
FifoCommand::StopBenchmark => {
debug!("Disabling memtrack via IPC");
if let Err(e) = ipc_client.disable() {
// There's a chance that memtrack has already exited here, so just log as debug
debug!("Failed to disable memtrack: {e}");
return Ok(FifoCommand::Err);
}
}
FifoCommand::GetIntegrationMode => {
return Ok(FifoCommand::IntegrationModeResponse(
IntegrationMode::Analysis,
));
}
_ => {
warn!("Unhandled FIFO command: {cmd:?}");
return Ok(FifoCommand::Err);
}
}
Ok(FifoCommand::Ack)
};
let (marker_result, _) = runner_fifo
.handle_fifo_messages(health_check, on_cmd)
.await?;
Ok(marker_result)
}
}