Skip to content

Commit e524405

Browse files
committed
feat(memtrack): add zstd compression support
1 parent 5498552 commit e524405

5 files changed

Lines changed: 127 additions & 29 deletions

File tree

Cargo.lock

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/memtrack/src/main.rs

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ use clap::Parser;
33
use ipc_channel::ipc::{self};
44
use log::{debug, info};
55
use memtrack::{MemtrackIpcMessage, Tracker, handle_ipc_message};
6-
use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact, MemtrackEvent};
7-
use std::path::PathBuf;
6+
use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact, MemtrackEvent, MemtrackWriter};
7+
use std::path::{Path, PathBuf};
88
use std::process::Command;
99
use std::sync::atomic::{AtomicBool, Ordering};
10+
use std::sync::mpsc::channel;
1011
use std::sync::{Arc, Mutex};
1112
use std::thread;
1213

@@ -51,10 +52,8 @@ fn main() -> Result<()> {
5152
} => {
5253
debug!("Starting memtrack for command: {command}");
5354

54-
let (root_pid, events, status) =
55-
track_command(&command, ipc_server).context("Failed to track command")?;
56-
let result = MemtrackArtifact { events };
57-
result.save_with_pid_to(&out_dir, root_pid as libc::pid_t)?;
55+
let status =
56+
track_command(&command, ipc_server, &out_dir).context("Failed to track command")?;
5857

5958
std::process::exit(status.code().unwrap_or(1));
6059
}
@@ -64,7 +63,8 @@ fn main() -> Result<()> {
6463
fn track_command(
6564
cmd_string: &str,
6665
ipc_server_name: Option<String>,
67-
) -> anyhow::Result<(u32, Vec<MemtrackEvent>, std::process::ExitStatus)> {
66+
out_dir: &Path,
67+
) -> anyhow::Result<std::process::ExitStatus> {
6868
let tracker = Tracker::new()?;
6969

7070
let tracker_arc = Arc::new(Mutex::new(tracker));
@@ -95,37 +95,67 @@ fn track_command(
9595
let event_rx = { tracker_arc.lock().unwrap().track(root_pid)? };
9696
info!("Spawned child with pid {root_pid}");
9797

98-
// Spawn event processing thread
99-
let process_events = Arc::new(AtomicBool::new(true));
100-
let process_events_clone = process_events.clone();
101-
let processing_thread = thread::spawn(move || {
102-
let mut events = Vec::new();
98+
// Create the artifact, then drain and write the events to disk
99+
let file_name = MemtrackArtifact::file_name(Some(root_pid));
100+
let out_file = std::fs::File::create(out_dir.join(file_name))?;
101+
102+
let (write_tx, write_rx) = channel::<MemtrackEvent>();
103+
104+
// Stage A: Fast drain thread - This is required so that we immediately clear the ring buffer
105+
// because it only has a limited size.
106+
static DRAIN_EVENTS: AtomicBool = AtomicBool::new(true);
107+
let write_tx_clone = write_tx.clone();
108+
let drain_thread = thread::spawn(move || {
103109
loop {
104-
if !process_events_clone.load(Ordering::Relaxed) {
110+
if !DRAIN_EVENTS.load(Ordering::Relaxed) {
105111
break;
106112
}
107-
108113
let Ok(event) = event_rx.try_recv() else {
109114
continue;
110115
};
116+
let _ = write_tx_clone.send(event.into());
117+
}
118+
});
119+
120+
// Stage B: Writer thread - Immediately writes the events to disk
121+
let writer_thread = thread::spawn(move || -> anyhow::Result<()> {
122+
let mut writer = MemtrackWriter::new(out_file)?;
111123

112-
events.push(event.into());
124+
while let Ok(first) = write_rx.recv() {
125+
writer.write_event(&first)?;
126+
127+
// Drain any backlog in a tight loop (batching)
128+
while let Ok(ev) = write_rx.try_recv() {
129+
writer.write_event(&ev)?;
130+
}
113131
}
114-
events
132+
writer.finish()?;
133+
134+
Ok(())
115135
});
116136

117137
// Wait for the command to complete
118138
let status = child.wait().context("Failed to wait for command")?;
119139
info!("Command exited with status: {status}");
120140

121-
info!("Waiting for the event processing thread to finish");
122-
process_events.store(false, Ordering::Relaxed);
123-
let events = processing_thread
141+
// Close the write channel to signal threads to finish
142+
DRAIN_EVENTS.store(false, Ordering::Relaxed);
143+
drop(write_tx);
144+
145+
// Wait for drain thread to finish
146+
info!("Waiting for the drain thread to finish");
147+
drain_thread
148+
.join()
149+
.map_err(|_| anyhow::anyhow!("Failed to join drain thread"))?;
150+
151+
// Wait for writer thread to finish and propagate errors
152+
info!("Waiting for the writer thread to finish");
153+
writer_thread
124154
.join()
125-
.map_err(|_| anyhow::anyhow!("Failed to join event thread"))?;
155+
.map_err(|_| anyhow::anyhow!("Failed to join writer thread"))??;
126156

127157
// IPC thread will exit when channel closes
128158
drop(ipc_handle);
129159

130-
Ok((root_pid as u32, events, status))
160+
Ok(status)
131161
}

crates/runner-shared/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ log = { workspace = true }
1313
rmp = "0.8.14"
1414
rmp-serde = "1.3.0"
1515
libc = { workspace = true }
16+
zstd = "0.13"

crates/runner-shared/src/artifacts/memtrack.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,22 @@ pub struct MemtrackArtifact {
88
}
99
impl super::ArtifactExt for MemtrackArtifact {
1010
fn encode_to_writer<W: Write>(&self, writer: W) -> anyhow::Result<()> {
11-
// This is required for `decode_streamed`: We can't stream the deserialization of
12-
// the whole artifact, so we have to encode them one by one.
13-
let mut serializer = rmp_serde::Serializer::new(writer);
11+
let mut writer = MemtrackWriter::new(writer)?;
1412
for event in &self.events {
15-
event.serialize(&mut serializer)?;
13+
writer.write_event(event)?;
1614
}
15+
writer.finish()?;
1716
Ok(())
1817
}
1918
}
2019

2120
impl MemtrackArtifact {
22-
pub fn decode_streamed<R: std::io::Read>(reader: R) -> anyhow::Result<MemtrackEventStream<R>> {
21+
pub fn decode_streamed<R: std::io::Read>(
22+
reader: R,
23+
) -> anyhow::Result<MemtrackEventStream<zstd::Decoder<'static, std::io::BufReader<R>>>> {
24+
let decoder = zstd::Decoder::new(reader)?;
2325
Ok(MemtrackEventStream {
24-
deserializer: rmp_serde::Deserializer::new(reader),
26+
deserializer: rmp_serde::Deserializer::new(decoder),
2527
})
2628
}
2729
}
@@ -61,6 +63,33 @@ impl<R: Read> Iterator for MemtrackEventStream<R> {
6163
}
6264
}
6365

66+
/// Streaming writer for memtrack events with compression
67+
pub struct MemtrackWriter<W: Write> {
68+
serializer: rmp_serde::Serializer<zstd::Encoder<'static, W>>,
69+
}
70+
71+
impl<W: Write> MemtrackWriter<W> {
72+
pub fn new(writer: W) -> anyhow::Result<Self> {
73+
let encoder = zstd::Encoder::new(writer, 1)?;
74+
Ok(Self {
75+
serializer: rmp_serde::Serializer::new(encoder),
76+
})
77+
}
78+
79+
/// Write a single event to the stream
80+
pub fn write_event(&mut self, event: &MemtrackEvent) -> anyhow::Result<()> {
81+
event.serialize(&mut self.serializer)?;
82+
Ok(())
83+
}
84+
85+
/// Finish writing and flush the compression stream
86+
pub fn finish(self) -> anyhow::Result<()> {
87+
let encoder = self.serializer.into_inner();
88+
encoder.finish()?;
89+
Ok(())
90+
}
91+
}
92+
6493
#[cfg(test)]
6594
mod tests {
6695
use crate::artifacts::ArtifactExt;
@@ -92,6 +121,7 @@ mod tests {
92121
};
93122
let mut buf = Vec::new();
94123
artifact.encode_to_writer(&mut buf)?;
124+
std::fs::write("test.bin", &buf)?;
95125

96126
let stream = MemtrackArtifact::decode_streamed(Cursor::new(buf))?;
97127
let collected: Vec<_> = stream.collect();

crates/runner-shared/src/artifacts/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ where
1717
std::any::type_name::<Self>().rsplit("::").next().unwrap()
1818
}
1919

20+
fn file_name(pid: Option<pid_t>) -> String {
21+
if let Some(pid) = pid {
22+
format!("{pid}.{}.msgpack", Self::name())
23+
} else {
24+
format!("{}.msgpack", Self::name())
25+
}
26+
}
27+
2028
fn encode_to_writer<W: std::io::Write>(&self, mut writer: W) -> anyhow::Result<()> {
2129
let encoded = rmp_serde::to_vec_named(self)?;
2230
writer.write_all(&encoded)?;
@@ -37,14 +45,14 @@ where
3745
}
3846

3947
fn save_to<P: AsRef<std::path::Path>>(&self, folder: P) -> anyhow::Result<()> {
40-
self.save_file_to(folder, &format!("{}.msgpack", Self::name()))
48+
self.save_file_to(folder, &Self::file_name(None))
4149
}
4250

4351
fn save_with_pid_to<P: AsRef<std::path::Path>>(
4452
&self,
4553
folder: P,
4654
pid: pid_t,
4755
) -> anyhow::Result<()> {
48-
self.save_file_to(folder, &format!("{pid}.{}.msgpack", Self::name()))
56+
self.save_file_to(folder, &Self::file_name(Some(pid)))
4957
}
5058
}

0 commit comments

Comments
 (0)