Skip to content

Commit af1f5e9

Browse files
committed
feat(memtrack): serialize events to allow streamed decoding
1 parent 78e2e13 commit af1f5e9

2 files changed

Lines changed: 84 additions & 5 deletions

File tree

crates/runner-shared/src/artifacts/memtrack.rs

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,38 @@
11
use libc::pid_t;
22
use serde::{Deserialize, Serialize};
3+
use std::io::Read;
34

45
#[derive(Debug, Clone, Serialize, Deserialize)]
56
pub struct MemtrackArtifact {
67
pub events: Vec<MemtrackEvent>,
78
}
8-
impl super::ArtifactExt for MemtrackArtifact {}
9+
impl super::ArtifactExt for MemtrackArtifact {
10+
fn encode_to_vec(&self) -> Vec<u8> {
11+
// This is required for `decode_streamed`:
12+
let mut buf = Vec::new();
13+
let mut serializer = rmp_serde::Serializer::new(&mut buf);
14+
for event in &self.events {
15+
event.serialize(&mut serializer).unwrap();
16+
}
17+
buf
18+
}
19+
}
20+
21+
impl MemtrackArtifact {
22+
pub fn decode_streamed<R: std::io::Read>(reader: R) -> anyhow::Result<MemtrackEventStream<R>> {
23+
Ok(MemtrackEventStream {
24+
deserializer: rmp_serde::Deserializer::new(reader),
25+
})
26+
}
27+
28+
pub fn encode_streamed(&self) -> impl Iterator<Item = Vec<u8>> + '_ {
29+
self.events
30+
.iter()
31+
.map(|event| rmp_serde::to_vec_named(event).unwrap())
32+
}
33+
}
934

10-
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
35+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
1136
pub struct MemtrackEvent {
1237
pub pid: pid_t,
1338
pub tid: pid_t,
@@ -17,7 +42,7 @@ pub struct MemtrackEvent {
1742
pub kind: MemtrackEventKind,
1843
}
1944

20-
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
45+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
2146
#[serde(tag = "type")]
2247
pub enum MemtrackEventKind {
2348
Malloc { size: u64 },
@@ -29,3 +54,54 @@ pub enum MemtrackEventKind {
2954
Munmap { size: u64 },
3055
Brk { size: u64 },
3156
}
57+
58+
pub struct MemtrackEventStream<R: Read> {
59+
deserializer: rmp_serde::Deserializer<rmp_serde::decode::ReadReader<R>>,
60+
}
61+
62+
impl<R: Read> Iterator for MemtrackEventStream<R> {
63+
type Item = MemtrackEvent;
64+
65+
fn next(&mut self) -> Option<Self::Item> {
66+
MemtrackEvent::deserialize(&mut self.deserializer).ok()
67+
}
68+
}
69+
70+
#[cfg(test)]
71+
mod tests {
72+
use crate::artifacts::ArtifactExt;
73+
74+
use super::*;
75+
use std::io::Cursor;
76+
77+
#[test]
78+
fn decode_streamed_reads_until_eof() -> anyhow::Result<()> {
79+
let events = vec![
80+
MemtrackEvent {
81+
pid: 1,
82+
tid: 11,
83+
timestamp: 100,
84+
addr: 0x10,
85+
kind: MemtrackEventKind::Malloc { size: 64 },
86+
},
87+
MemtrackEvent {
88+
pid: 1,
89+
tid: 12,
90+
timestamp: 200,
91+
addr: 0x20,
92+
kind: MemtrackEventKind::Free,
93+
},
94+
];
95+
96+
let artifact = MemtrackArtifact {
97+
events: events.clone(),
98+
};
99+
let buf = artifact.encode_to_vec();
100+
101+
let stream = MemtrackArtifact::decode_streamed(Cursor::new(buf))?;
102+
let collected: Vec<_> = stream.collect();
103+
assert_eq!(collected, events);
104+
105+
Ok(())
106+
}
107+
}

crates/runner-shared/src/artifacts/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@ where
1717
std::any::type_name::<Self>().rsplit("::").next().unwrap()
1818
}
1919

20+
fn encode_to_vec(&self) -> Vec<u8> {
21+
rmp_serde::to_vec_named(self).expect("Failed to encode type")
22+
}
23+
2024
fn save_file_to<P: AsRef<std::path::Path>>(
2125
&self,
2226
folder: P,
2327
filename: &str,
2428
) -> anyhow::Result<()> {
2529
std::fs::create_dir_all(folder.as_ref())?;
26-
let data = rmp_serde::to_vec_named(self)?;
27-
std::fs::write(folder.as_ref().join(filename), data)?;
30+
std::fs::write(folder.as_ref().join(filename), self.encode_to_vec())?;
2831

2932
debug!("Saved {} result to {:?}", Self::name(), folder.as_ref());
3033
Ok(())

0 commit comments

Comments
 (0)