Skip to content

Commit 12452f0

Browse files
committed
feat(memtrack): serialize events serially to allow streamed decoding
1 parent 78e2e13 commit 12452f0

File tree

2 files changed

+82
-5
lines changed

2 files changed

+82
-5
lines changed

crates/runner-shared/src/artifacts/memtrack.rs

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,32 @@
11
use libc::pid_t;
22
use serde::{Deserialize, Serialize};
3+
use std::io::{Read, Write};
34

45
#[derive(Debug, Clone, Serialize, Deserialize)]
56
pub struct MemtrackArtifact {
67
pub events: Vec<MemtrackEvent>,
78
}
8-
impl super::ArtifactExt for MemtrackArtifact {}
9+
impl super::ArtifactExt for MemtrackArtifact {
10+
fn encode_to_writer<W: Write>(&self, writer: W) -> anyhow::Result<()> {
11+
// This is required for `decode_streamed`: We can't stream the deserialization of
12+
// the whole artifact, so we have to encode them one by one.
13+
let mut serializer = rmp_serde::Serializer::new(writer);
14+
for event in &self.events {
15+
event.serialize(&mut serializer)?;
16+
}
17+
Ok(())
18+
}
19+
}
20+
21+
impl MemtrackArtifact {
22+
pub fn decode_streamed<R: std::io::Read>(reader: R) -> anyhow::Result<MemtrackEventStream<R>> {
23+
Ok(MemtrackEventStream {
24+
deserializer: rmp_serde::Deserializer::new(reader),
25+
})
26+
}
27+
}
928

10-
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
29+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
1130
pub struct MemtrackEvent {
1231
pub pid: pid_t,
1332
pub tid: pid_t,
@@ -17,7 +36,7 @@ pub struct MemtrackEvent {
1736
pub kind: MemtrackEventKind,
1837
}
1938

20-
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
39+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
2140
#[serde(tag = "type")]
2241
pub enum MemtrackEventKind {
2342
Malloc { size: u64 },
@@ -29,3 +48,55 @@ pub enum MemtrackEventKind {
2948
Munmap { size: u64 },
3049
Brk { size: u64 },
3150
}
51+
52+
pub struct MemtrackEventStream<R: Read> {
53+
deserializer: rmp_serde::Deserializer<rmp_serde::decode::ReadReader<R>>,
54+
}
55+
56+
impl<R: Read> Iterator for MemtrackEventStream<R> {
57+
type Item = MemtrackEvent;
58+
59+
fn next(&mut self) -> Option<Self::Item> {
60+
MemtrackEvent::deserialize(&mut self.deserializer).ok()
61+
}
62+
}
63+
64+
#[cfg(test)]
65+
mod tests {
66+
use crate::artifacts::ArtifactExt;
67+
68+
use super::*;
69+
use std::io::Cursor;
70+
71+
#[test]
72+
fn test_decode_streamed() -> anyhow::Result<()> {
73+
let events = vec![
74+
MemtrackEvent {
75+
pid: 1,
76+
tid: 11,
77+
timestamp: 100,
78+
addr: 0x10,
79+
kind: MemtrackEventKind::Malloc { size: 64 },
80+
},
81+
MemtrackEvent {
82+
pid: 1,
83+
tid: 12,
84+
timestamp: 200,
85+
addr: 0x20,
86+
kind: MemtrackEventKind::Free,
87+
},
88+
];
89+
90+
let artifact = MemtrackArtifact {
91+
events: events.clone(),
92+
};
93+
let mut buf = Vec::new();
94+
artifact.encode_to_writer(&mut buf)?;
95+
96+
let stream = MemtrackArtifact::decode_streamed(Cursor::new(buf))?;
97+
let collected: Vec<_> = stream.collect();
98+
assert_eq!(collected, events);
99+
100+
Ok(())
101+
}
102+
}

crates/runner-shared/src/artifacts/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@ where
1717
std::any::type_name::<Self>().rsplit("::").next().unwrap()
1818
}
1919

20+
fn encode_to_writer<W: std::io::Write>(&self, mut writer: W) -> anyhow::Result<()> {
21+
let encoded = rmp_serde::to_vec_named(self)?;
22+
writer.write_all(&encoded)?;
23+
Ok(())
24+
}
25+
2026
fn save_file_to<P: AsRef<std::path::Path>>(
2127
&self,
2228
folder: P,
2329
filename: &str,
2430
) -> anyhow::Result<()> {
2531
std::fs::create_dir_all(folder.as_ref())?;
26-
let data = rmp_serde::to_vec_named(self)?;
27-
std::fs::write(folder.as_ref().join(filename), data)?;
32+
let file = std::fs::File::create(folder.as_ref().join(filename))?;
33+
self.encode_to_writer(file)?;
2834

2935
debug!("Saved {} result to {:?}", Self::name(), folder.as_ref());
3036
Ok(())

0 commit comments

Comments
 (0)