diff --git a/Cargo.lock b/Cargo.lock index 328f2fa9..1cde54cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1579,6 +1579,15 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "memfile" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f64636fdb65a5f0740f920c4281f3dbb76a71e25e25914b6d27000739897d40e" +dependencies = [ + "libc", +] + [[package]] name = "memmap2" version = "0.9.7" @@ -2250,6 +2259,7 @@ dependencies = [ "itertools", "js-sys", "lz4_flex", + "memfile", "parking_lot", "ruzstd", "serde", diff --git a/puffin/Cargo.toml b/puffin/Cargo.toml index 348aed97..234fbd9f 100644 --- a/puffin/Cargo.toml +++ b/puffin/Cargo.toml @@ -60,6 +60,9 @@ web-time = { version = "0.2", optional = true } [dev-dependencies] criterion = "0.5" +[target.'cfg(target_os = "linux")'.dev-dependencies] +memfile = "0.3" + [[bench]] name = "benchmark" harness = false diff --git a/puffin/src/frame_data.rs b/puffin/src/frame_data.rs index d1fa0992..b071b407 100644 --- a/puffin/src/frame_data.rs +++ b/puffin/src/frame_data.rs @@ -569,16 +569,13 @@ impl FrameData { pub fn write_into( &self, scope_collection: Option<&crate::ScopeCollection>, - write: &mut impl std::io::Write, + mut write: &mut impl std::io::Write, ) -> anyhow::Result<()> { use bincode::Options as _; - use byteorder::{LE, WriteBytesExt as _}; + use byteorder::WriteBytesExt as _; - let meta_serialized = bincode::options().serialize(&self.meta)?; - - write.write_all(b"PFD4")?; - write.write_all(&(meta_serialized.len() as u32).to_le_bytes())?; - write.write_all(&meta_serialized)?; + write.write_all(b"PFD5")?; + bincode::options().serialize_into(&mut write, &self.meta)?; self.create_packed(); let packed_streams_lock = self.data.read(); @@ -588,15 +585,12 @@ impl FrameData { write.write_u8(packed_streams.compression_kind as u8)?; write.write_all(&packed_streams.bytes)?; - let to_serialize_scopes: Vec<_> = if let Some(scope_collection) = scope_collection { - scope_collection.scopes_by_id().values().cloned().collect() + if let Some(scope_collection) = scope_collection { + bincode::options().serialize_into(&mut write, &scope_collection.serializable())?; } else { - self.scope_delta.clone() - }; + bincode::options().serialize_into(write, &self.scope_delta)?; + } - let serialized_scopes = bincode::options().serialize(&to_serialize_scopes)?; - write.write_u32::(serialized_scopes.len() as u32)?; - write.write_all(&serialized_scopes)?; Ok(()) } @@ -605,7 +599,7 @@ impl FrameData { /// [`None`] is returned if the end of the stream is reached (EOF), /// or an end-of-stream sentinel of `0u32` is read. #[cfg(feature = "serialization")] - pub fn read_next(read: &mut impl std::io::Read) -> anyhow::Result> { + pub fn read_next(mut read: &mut impl std::io::Read) -> anyhow::Result> { use anyhow::Context as _; use bincode::Options as _; use byteorder::{LE, ReadBytesExt}; @@ -785,6 +779,34 @@ impl FrameData { scope_delta: new_scopes, full_delta: false, })) + } else if &header == b"PFD5" { + // Added 2024-12-22: remove useless manual sequence size serialization and temporary vector. + let meta = { + bincode::options() + .deserialize_from(&mut read) + .context("bincode deserialize")? + }; + + let streams_compressed_length = read.read_u32::()? as usize; + let compression_kind = CompressionKind::from_u8(read.read_u8()?)?; + let streams_compressed = { + let mut streams_compressed = vec![0_u8; streams_compressed_length]; + read.read_exact(&mut streams_compressed)?; + PackedStreams::new(compression_kind, streams_compressed) + }; + + let deserialized_scopes: Vec> = { + bincode::options() + .deserialize_from(read) // serialized_scopes.as_slice() + .context("Can not deserialize scope details")? + }; + + Ok(Some(Self { + meta, + data: RwLock::new(FrameDataState::Packed(streams_compressed)), + scope_delta: deserialized_scopes, + full_delta: false, + })) } else { anyhow::bail!( "Failed to decode: this data is newer than this reader. Please update your puffin version!" diff --git a/puffin/src/profile_view.rs b/puffin/src/profile_view.rs index 21999b48..6bb46182 100644 --- a/puffin/src/profile_view.rs +++ b/puffin/src/profile_view.rs @@ -441,3 +441,87 @@ impl FrameStats { self.total_ram_used = 0; } } + +#[cfg(all(test, feature = "serialization"))] +mod tests { + use std::{io::Seek, ops::DerefMut, sync::Arc, thread, time::Duration}; + + use memfile::MemFile; + use parking_lot::Mutex; + + use crate::{GlobalProfiler, profile_scope, set_scopes_on}; + + use super::FrameView; + + #[test] + fn read_pfd4_file() -> anyhow::Result<()> { + let mut file = std::fs::File::open("tests/data/capture_PFD4.puffin")?; + let _ = FrameView::read(&mut file)?; + Ok(()) + } + + #[test] + fn read_pfd3_file() -> anyhow::Result<()> { + let mut file = std::fs::File::open("tests/data/capture_PFD3.puffin")?; + let _ = FrameView::read(&mut file)?; + Ok(()) + } + + #[test] + fn read_pfd2_file() -> anyhow::Result<()> { + let mut file = std::fs::File::open("tests/data/capture_PFD2.puffin")?; + let _ = FrameView::read(&mut file)?; + Ok(()) + } + + #[test] + fn read_pfd1_file() -> anyhow::Result<()> { + let mut file = std::fs::File::open("tests/data/capture_PFD1.puffin")?; + let _ = FrameView::read(&mut file)?; + Ok(()) + } + + fn run_write(file: MemFile) { + // Init profiler sink with sync wrinting + let writer = Arc::new(Mutex::new(file)); + let sink = GlobalProfiler::lock().add_sink(Box::new(move |frame_data| { + let mut writer = writer.lock(); + frame_data.write_into(None, writer.deref_mut()).unwrap(); + })); + + set_scopes_on(true); // need this to enable capture + // run frames + for idx in 0..4 { + profile_scope!("main", idx.to_string()); + { + profile_scope!("sleep 1ms"); + let sleep_duration = Duration::from_millis(1); + thread::sleep(sleep_duration); + } + { + profile_scope!("sleep 2ms"); + let sleep_duration = Duration::from_millis(2); + thread::sleep(sleep_duration); + } + GlobalProfiler::lock().new_frame(); + } + + set_scopes_on(false); + GlobalProfiler::lock().new_frame(); //Force to get last frame + GlobalProfiler::lock().remove_sink(sink); + } + + fn run_read(mut file: MemFile) { + file.rewind().unwrap(); + let _ = FrameView::read(&mut file).expect("read :"); + } + + #[test] + #[cfg(target_os = "linux")] + fn deserialize_serialized() { + let file = MemFile::create_default("deserialize_serialized.puffin").unwrap(); + run_write(file.try_clone().unwrap()); + thread::sleep(Duration::from_secs(1)); + run_read(file); + } +} diff --git a/puffin/src/scope_details.rs b/puffin/src/scope_details.rs index f85e9da6..55549422 100644 --- a/puffin/src/scope_details.rs +++ b/puffin/src/scope_details.rs @@ -58,6 +58,28 @@ impl ScopeCollection { pub fn scopes_by_id(&self) -> &HashMap> { &self.0.scope_id_to_details } + + /// A wrapper than allow `Serialize` all the scopes values of `ScopeCollection`. + #[cfg(feature = "serialization")] + pub fn serializable(&self) -> Serializable<'_> { + Serializable(self) + } +} + +/// A wrapper than impl `Serialize` for `ScopeCollection`. +/// This `struct` is created by the [`serializable`] method on `ScopeCollection`. +#[cfg(feature = "serialization")] +pub struct Serializable<'a>(&'a crate::ScopeCollection); + +#[cfg(feature = "serialization")] +impl serde::Serialize for Serializable<'_> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let iter = self.0.scopes_by_id().values(); + serializer.collect_seq(iter) + } } /// Scopes are identified by user-provided name while functions are identified by the function name. diff --git a/puffin/tests/common/mod.rs b/puffin/tests/common/mod.rs new file mode 100644 index 00000000..6d0e0d4a --- /dev/null +++ b/puffin/tests/common/mod.rs @@ -0,0 +1,111 @@ +#![allow(dead_code)] + +use std::{ + io::Write, + sync::Arc, + thread::{self, JoinHandle}, + time::Duration, +}; + +use parking_lot::Mutex; +#[cfg(feature = "serialization")] +use puffin::FrameData; +use puffin::{FrameSinkId, GlobalProfiler}; + +pub fn process_1() { + puffin::profile_function!(); + sub_process_1_1(); + (0..2).for_each(|_| sub_process_1_2()); +} + +fn sub_process_1_1() { + puffin::profile_function!(); + thread::sleep(Duration::from_millis(1)); +} + +fn sub_process_1_2() { + puffin::profile_function!(); + thread::sleep(Duration::from_micros(2)); +} + +pub fn example_run() { + for idx in 0..4 { + puffin::profile_scope!("main", idx.to_string()); + + { + puffin::profile_scope!("sleep 1ms"); + let sleep_duration = Duration::from_millis(1); + thread::sleep(sleep_duration); + } + + { + puffin::profile_scope!("sleep 2ms"); + let sleep_duration = Duration::from_millis(2); + thread::sleep(sleep_duration); + } + //println!("before new_frame {idx}"); + puffin::GlobalProfiler::lock().new_frame(); + //println!("after new_frame {idx}"); + } +} + +pub struct FrameWriterImpl { + writer: Arc>, +} + +impl FrameWriterImpl { + pub fn from_writer(mut writer: W) -> Self { + writer.write_all(b"PUF0").unwrap(); //Hack: should not be duplicated + Self { + writer: Arc::new(Mutex::new(writer)), + } + } + + #[cfg(feature = "serialization")] + fn write_frame(&self, frame_data: Arc) { + use std::ops::DerefMut; + + let mut writer = self.writer.lock(); + frame_data.write_into(None, writer.deref_mut()).unwrap(); + } +} + +pub struct FrameWriterSink { + sink_id: FrameSinkId, + write_thread: Option>, +} +impl Drop for FrameWriterSink { + fn drop(&mut self) { + GlobalProfiler::lock().remove_sink(self.sink_id); + if let Some(write_handle) = self.write_thread.take() { + let _ = write_handle.join(); + } + } +} + +#[cfg(feature = "serialization")] +#[must_use] +pub fn init_frames_writer(writer: impl Write + Send + 'static) -> FrameWriterSink { + use std::sync::mpsc; + + let frame_writer = FrameWriterImpl::from_writer(writer); + let (frame_sender, frames_recv) = mpsc::channel(); + + let write_thread = thread::Builder::new() + .name("frame_writer".into()) + .spawn(move || { + while let Ok(frame_data) = frames_recv.recv() { + frame_writer.write_frame(frame_data); + } + }) + .unwrap(); + + // Init profiler sink and enable capture + let sink_id = GlobalProfiler::lock().add_sink(Box::new(move |frame_data| { + frame_sender.send(frame_data).unwrap() + })); + FrameWriterSink { + sink_id, + write_thread: Some(write_thread), + } +} diff --git a/puffin/tests/data/capture_PFD1.puffin b/puffin/tests/data/capture_PFD1.puffin new file mode 100644 index 00000000..033786d3 Binary files /dev/null and b/puffin/tests/data/capture_PFD1.puffin differ diff --git a/puffin/tests/data/capture_PFD2.puffin b/puffin/tests/data/capture_PFD2.puffin new file mode 100644 index 00000000..5204245c Binary files /dev/null and b/puffin/tests/data/capture_PFD2.puffin differ diff --git a/puffin/tests/data/capture_PFD3.puffin b/puffin/tests/data/capture_PFD3.puffin new file mode 100644 index 00000000..772a44c7 Binary files /dev/null and b/puffin/tests/data/capture_PFD3.puffin differ diff --git a/puffin/tests/data/capture_PFD4.puffin b/puffin/tests/data/capture_PFD4.puffin new file mode 100644 index 00000000..572a6855 Binary files /dev/null and b/puffin/tests/data/capture_PFD4.puffin differ diff --git a/puffin/tests/frame_serialization.rs b/puffin/tests/frame_serialization.rs new file mode 100644 index 00000000..dd8594dc --- /dev/null +++ b/puffin/tests/frame_serialization.rs @@ -0,0 +1,17 @@ +mod common; + +#[cfg(feature = "serialization")] +#[test] +fn frame_serialization() { + let frame_data = Vec::new(); + let _frame_writer = common::init_frames_writer(frame_data); + + //println!("set_scopes_on(true)"); + puffin::set_scopes_on(true); // need this to enable capture + + common::example_run(); + + //println!("set_scopes_on(false)"); + puffin::set_scopes_on(false); + puffin::GlobalProfiler::lock().new_frame(); //Force to get last frame +} diff --git a/puffin/tests/multiple_frames.rs b/puffin/tests/multiple_frames.rs new file mode 100644 index 00000000..6f27abef --- /dev/null +++ b/puffin/tests/multiple_frames.rs @@ -0,0 +1,39 @@ +mod common; + +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, +}; + +use puffin::{FrameData, GlobalProfiler}; + +#[test] +fn multiple_frames() { + const NB_LOOP: usize = 10; + fn profiler_sink(frame_data: Arc, frame_count: Arc) { + let previous_count = frame_count.fetch_add(1, Ordering::Relaxed); + let frame_meta = frame_data.meta(); + assert_eq!(frame_meta.frame_index, previous_count as u64); + assert_eq!(frame_meta.num_scopes, 4); + } + + // Init profiler sink and enable capture + let frame_count = Arc::new(AtomicUsize::default()); + let frame_count_clone = frame_count.clone(); + let sink_id = GlobalProfiler::lock().add_sink(Box::new(move |frame_data| { + profiler_sink(frame_data, frame_count_clone.clone()); + })); + puffin::set_scopes_on(true); + + // Run process + std::iter::repeat_n((), NB_LOOP).for_each(|_| { + common::process_1(); + puffin::GlobalProfiler::lock().new_frame(); + }); + + let frame_count = frame_count.load(Ordering::Relaxed); + assert_eq!(frame_count, NB_LOOP); + + // End frame, and uninit profiler + GlobalProfiler::lock().remove_sink(sink_id); +} diff --git a/puffin/tests/single_frame.rs b/puffin/tests/single_frame.rs new file mode 100644 index 00000000..1a090733 --- /dev/null +++ b/puffin/tests/single_frame.rs @@ -0,0 +1,25 @@ +mod common; + +use std::sync::Arc; + +use puffin::{FrameData, GlobalProfiler}; + +#[test] +fn single_frame() { + fn profiler_sink(frame_data: Arc) { + let frame_meta = frame_data.meta(); + assert_eq!(frame_meta.frame_index, 0); + assert_eq!(frame_meta.num_scopes, 4); + } + + // Init profiler sink and enable capture + let sink_id = GlobalProfiler::lock().add_sink(Box::new(profiler_sink)); + puffin::set_scopes_on(true); + + // Run process + common::process_1(); + + // End frame, and uninit profiler + puffin::GlobalProfiler::lock().new_frame(); + GlobalProfiler::lock().remove_sink(sink_id); +} diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 1b78df44..62fdb101 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -1,4 +1,5 @@ use anyhow::Context as _; +use crossbeam_channel::TryRecvError; use puffin::{FrameSinkId, GlobalProfiler, ScopeCollection}; use std::{ io::Write as _, @@ -7,6 +8,7 @@ use std::{ Arc, atomic::{AtomicUsize, Ordering}, }, + time::Duration, }; /// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough. @@ -261,13 +263,22 @@ impl Server { scope_collection: Default::default(), }; - while let Ok(frame) = rx.recv() { + loop { if let Err(err) = server_impl.accept_new_clients() { log::warn!("puffin server failure: {err}"); } - - if let Err(err) = server_impl.send(&frame) { - log::warn!("puffin server failure: {err}"); + match rx.try_recv() { + Ok(frame) => { + if let Err(err) = server_impl.send(&frame) { + log::warn!("puffin server failure: {err}"); + } + } + Err(TryRecvError::Empty) => { + std::thread::sleep(Duration::from_millis(1)); + } + Err(TryRecvError::Disconnected) => { + break; + } } } })