Skip to content

Commit 936bb32

Browse files
committed
feat(memtrack): serialize events serially to allow streamed decoding
1 parent 78e2e13 commit 936bb32

File tree

2 files changed

+81
-5
lines changed

2 files changed

+81
-5
lines changed

crates/runner-shared/src/artifacts/memtrack.rs

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,31 @@
11
use libc::pid_t;
22
use serde::{Deserialize, Serialize};
3+
use std::io::{Read, Write};
34

45
#[derive(Debug, Clone, Serialize, Deserialize)]
56
pub struct MemtrackArtifact {
67
pub events: Vec<MemtrackEvent>,
78
}
8-
impl super::ArtifactExt for MemtrackArtifact {}
9+
impl super::ArtifactExt for MemtrackArtifact {
10+
fn encode_to_writer<W: Write>(&self, writer: W) -> anyhow::Result<()> {
11+
// This is required for `decode_streamed`:
12+
let mut serializer = rmp_serde::Serializer::new(writer);
13+
for event in &self.events {
14+
event.serialize(&mut serializer)?;
15+
}
16+
Ok(())
17+
}
18+
}
19+
20+
impl MemtrackArtifact {
21+
pub fn decode_streamed<R: std::io::Read>(reader: R) -> anyhow::Result<MemtrackEventStream<R>> {
22+
Ok(MemtrackEventStream {
23+
deserializer: rmp_serde::Deserializer::new(reader),
24+
})
25+
}
26+
}
927

10-
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
28+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
1129
pub struct MemtrackEvent {
1230
pub pid: pid_t,
1331
pub tid: pid_t,
@@ -17,7 +35,7 @@ pub struct MemtrackEvent {
1735
pub kind: MemtrackEventKind,
1836
}
1937

20-
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
38+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
2139
#[serde(tag = "type")]
2240
pub enum MemtrackEventKind {
2341
Malloc { size: u64 },
@@ -29,3 +47,55 @@ pub enum MemtrackEventKind {
2947
Munmap { size: u64 },
3048
Brk { size: u64 },
3149
}
50+
51+
pub struct MemtrackEventStream<R: Read> {
52+
deserializer: rmp_serde::Deserializer<rmp_serde::decode::ReadReader<R>>,
53+
}
54+
55+
impl<R: Read> Iterator for MemtrackEventStream<R> {
56+
type Item = MemtrackEvent;
57+
58+
fn next(&mut self) -> Option<Self::Item> {
59+
MemtrackEvent::deserialize(&mut self.deserializer).ok()
60+
}
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use crate::artifacts::ArtifactExt;
66+
67+
use super::*;
68+
use std::io::Cursor;
69+
70+
#[test]
71+
fn test_decode_streamed() -> anyhow::Result<()> {
72+
let events = vec![
73+
MemtrackEvent {
74+
pid: 1,
75+
tid: 11,
76+
timestamp: 100,
77+
addr: 0x10,
78+
kind: MemtrackEventKind::Malloc { size: 64 },
79+
},
80+
MemtrackEvent {
81+
pid: 1,
82+
tid: 12,
83+
timestamp: 200,
84+
addr: 0x20,
85+
kind: MemtrackEventKind::Free,
86+
},
87+
];
88+
89+
let artifact = MemtrackArtifact {
90+
events: events.clone(),
91+
};
92+
let mut buf = Vec::new();
93+
artifact.encode_to_writer(&mut buf)?;
94+
95+
let stream = MemtrackArtifact::decode_streamed(Cursor::new(buf))?;
96+
let collected: Vec<_> = stream.collect();
97+
assert_eq!(collected, events);
98+
99+
Ok(())
100+
}
101+
}

crates/runner-shared/src/artifacts/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@ where
1717
std::any::type_name::<Self>().rsplit("::").next().unwrap()
1818
}
1919

20+
fn encode_to_writer<W: std::io::Write>(&self, mut writer: W) -> anyhow::Result<()> {
21+
let encoded = rmp_serde::to_vec_named(self)?;
22+
writer.write_all(&encoded)?;
23+
Ok(())
24+
}
25+
2026
fn save_file_to<P: AsRef<std::path::Path>>(
2127
&self,
2228
folder: P,
2329
filename: &str,
2430
) -> anyhow::Result<()> {
2531
std::fs::create_dir_all(folder.as_ref())?;
26-
let data = rmp_serde::to_vec_named(self)?;
27-
std::fs::write(folder.as_ref().join(filename), data)?;
32+
let file = std::fs::File::create(folder.as_ref().join(filename))?;
33+
self.encode_to_writer(file)?;
2834

2935
debug!("Saved {} result to {:?}", Self::name(), folder.as_ref());
3036
Ok(())

0 commit comments

Comments
 (0)