From 88e927e13d4cef5872d7fb90b295fa5ea2737340 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Thu, 20 Oct 2022 15:45:56 +0200 Subject: [PATCH 01/25] use flume instead of crossbeam-channel --- Cargo.lock | 55 ++++++++++++++++++++++++++++++++------- puffin_http/Cargo.toml | 2 +- puffin_http/src/server.rs | 17 ++++++------ 3 files changed, 54 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 94eff0a0..4cdc1720 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -548,15 +548,6 @@ dependencies = [ "itertools", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176dc175b78f56c0f321911d9c8eb2b77a78a4860b9c19db83835fea1a46649b" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -873,6 +864,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "foreign-types" version = "0.5.0" @@ -909,6 +912,18 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + [[package]] name = "gdk-pixbuf-sys" version = "0.15.10" @@ -956,8 +971,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1403,6 +1420,15 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "natord" version = "1.0.9" @@ -1852,7 +1878,7 @@ name = "puffin_http" version = "0.16.1" dependencies = [ "anyhow", - "crossbeam-channel", + "flume", "log", "once_cell", "parking_lot", @@ -2187,6 +2213,15 @@ dependencies = [ "serde", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "static_assertions" version = "1.1.0" diff --git a/puffin_http/Cargo.toml b/puffin_http/Cargo.toml index 58f038a8..ed18b8aa 100644 --- a/puffin_http/Cargo.toml +++ b/puffin_http/Cargo.toml @@ -15,7 +15,7 @@ include = ["**/*.rs", "Cargo.toml", "README.md"] [dependencies] anyhow = "1.0" -crossbeam-channel = "0.5" +flume = "0.11" log = "0.4" parking_lot = "0.12" puffin = { version = "0.19.1", path = "../puffin", features = [ diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 4af136cc..a1393828 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -231,12 +231,11 @@ impl Server { .set_nonblocking(true) .context("TCP set_nonblocking")?; - // We use crossbeam_channel instead of `mpsc`, + // We use flume instead of `mpsc`, // because on shutdown we want all frames to be sent. // `mpsc::Receiver` stops receiving as soon as the `Sender` is dropped, - // but `crossbeam_channel` will continue until the channel is empty. - let (tx, rx): (crossbeam_channel::Sender>, _) = - crossbeam_channel::unbounded(); + // but `flume` will continue until the channel is empty. + let (tx, rx): (flume::Sender>, _) = flume::unbounded(); let num_clients = Arc::new(AtomicUsize::default()); let num_clients_cloned = num_clients.clone(); @@ -300,7 +299,7 @@ type Packet = Arc<[u8]>; struct Client { client_addr: SocketAddr, - packet_tx: Option>, + packet_tx: Option>, join_handle: Option>, } @@ -339,7 +338,7 @@ impl PuffinServerImpl { log::info!("{} connected", client_addr); - let (packet_tx, packet_rx) = crossbeam_channel::bounded(MAX_FRAMES_IN_QUEUE); + let (packet_tx, packet_rx) = flume::bounded(MAX_FRAMES_IN_QUEUE); let join_handle = std::thread::Builder::new() .name("puffin-server-client".to_owned()) @@ -393,8 +392,8 @@ impl PuffinServerImpl { None => false, Some(packet_tx) => match packet_tx.try_send(packet.clone()) { Ok(()) => true, - Err(crossbeam_channel::TrySendError::Disconnected(_)) => false, - Err(crossbeam_channel::TrySendError::Full(_)) => { + Err(flume::TrySendError::Disconnected(_)) => false, + Err(flume::TrySendError::Full(_)) => { log::info!( "puffin client {} is not accepting data fast enough; dropping a frame", client.client_addr @@ -410,7 +409,7 @@ impl PuffinServerImpl { } fn client_loop( - packet_rx: crossbeam_channel::Receiver, + packet_rx: flume::Receiver, client_addr: SocketAddr, mut tcp_stream: TcpStream, ) { From 9990504b4d6d67897ed693d832fd6959ee422186 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 18:56:26 +0200 Subject: [PATCH 02/25] split client connection and frame send management use Arc RwLock container to share clients list --- puffin_http/src/server.rs | 57 ++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index a1393828..d1c62fc0 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -5,7 +5,7 @@ use std::{ net::{SocketAddr, TcpListener, TcpStream}, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, + Arc, RwLock, }, }; @@ -237,23 +237,28 @@ impl Server { // but `flume` will continue until the channel is empty. let (tx, rx): (flume::Sender>, _) = flume::unbounded(); + let clients = Arc::new(RwLock::new(Vec::new())); let num_clients = Arc::new(AtomicUsize::default()); let num_clients_cloned = num_clients.clone(); let join_handle = std::thread::Builder::new() .name("puffin-server".to_owned()) .spawn(move || { - let mut server_impl = PuffinServerImpl { + let mut ps_connection = PuffinServerConnection { tcp_listener, - clients: Default::default(), + clients: clients.clone(), + num_clients: num_clients_cloned.clone(), + }; + let mut server_impl = PuffinServerImpl { + clients, num_clients: num_clients_cloned, - send_all_scopes: false, + //TODO: send_all_scopes: false, frame_view: Default::default(), }; while let Ok(frame) = rx.recv() { server_impl.frame_view.add_frame(frame.clone()); - if let Err(err) = server_impl.accept_new_clients() { + if let Err(err) = ps_connection.accept_new_clients() { log::warn!("puffin server failure: {}", err); } @@ -318,16 +323,12 @@ impl Drop for Client { } /// Listens for incoming connections -/// and streams them puffin profiler data. -struct PuffinServerImpl { +struct PuffinServerConnection { tcp_listener: TcpListener, - clients: Vec, + clients: Arc>>, num_clients: Arc, - send_all_scopes: bool, - frame_view: FrameView, } - -impl PuffinServerImpl { +impl PuffinServerConnection { fn accept_new_clients(&mut self) -> anyhow::Result<()> { loop { match self.tcp_listener.accept() { @@ -346,13 +347,15 @@ impl PuffinServerImpl { .context("Couldn't spawn thread")?; // Send all scopes when new client connects. - self.send_all_scopes = true; - self.clients.push(Client { + // TODO: send all previous scopes at connection, not on regular send + //self.send_all_scopes = true; + self.clients.write().unwrap().push(Client { client_addr, packet_tx: Some(packet_tx), join_handle: Some(join_handle), }); - self.num_clients.store(self.clients.len(), Ordering::SeqCst); + self.num_clients + .store(self.clients.read().unwrap().len(), Ordering::SeqCst); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { break; // Nothing to do for now. @@ -364,9 +367,18 @@ impl PuffinServerImpl { } Ok(()) } +} +/// streams to client puffin profiler data. +struct PuffinServerImpl { + clients: Arc>>, + num_clients: Arc, + frame_view: FrameView, +} + +impl PuffinServerImpl { pub fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> { - if self.clients.is_empty() { + if self.clients.read().unwrap().is_empty() { return Ok(()); } puffin::profile_function!(); @@ -378,17 +390,14 @@ impl PuffinServerImpl { .unwrap(); frame - .write_into( - self.frame_view.scope_collection(), - self.send_all_scopes, - &mut packet, - ) + .write_into(self.frame_view.scope_collection(), false, &mut packet) .context("Encode puffin frame")?; - self.send_all_scopes = false; + //TODO self.send_all_scopes = false; let packet: Packet = packet.into(); - self.clients.retain(|client| match &client.packet_tx { + let mut clients = self.clients.write().unwrap(); + clients.retain(|client| match &client.packet_tx { None => false, Some(packet_tx) => match packet_tx.try_send(packet.clone()) { Ok(()) => true, @@ -402,7 +411,7 @@ impl PuffinServerImpl { } }, }); - self.num_clients.store(self.clients.len(), Ordering::SeqCst); + self.num_clients.store(clients.len(), Ordering::SeqCst); Ok(()) } From 1af94fc01f6027f9f721f8d574687ebca0a5b224 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 18:56:32 +0200 Subject: [PATCH 03/25] rename 'puffin server impl' into 'puffin server send' this reflect change done when split management --- puffin_http/src/server.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index d1c62fc0..023dd6b3 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -249,7 +249,7 @@ impl Server { clients: clients.clone(), num_clients: num_clients_cloned.clone(), }; - let mut server_impl = PuffinServerImpl { + let mut ps_send = PuffinServerSend { clients, num_clients: num_clients_cloned, //TODO: send_all_scopes: false, @@ -257,12 +257,12 @@ impl Server { }; while let Ok(frame) = rx.recv() { - server_impl.frame_view.add_frame(frame.clone()); + ps_send.frame_view.add_frame(frame.clone()); if let Err(err) = ps_connection.accept_new_clients() { log::warn!("puffin server failure: {}", err); } - if let Err(err) = server_impl.send(&frame) { + if let Err(err) = ps_send.send(&frame) { log::warn!("puffin server failure: {}", err); } } @@ -370,13 +370,13 @@ impl PuffinServerConnection { } /// streams to client puffin profiler data. -struct PuffinServerImpl { +struct PuffinServerSend { clients: Arc>>, num_clients: Arc, frame_view: FrameView, } -impl PuffinServerImpl { +impl PuffinServerSend { pub fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> { if self.clients.read().unwrap().is_empty() { return Ok(()); From d1f27144a070c0c5c2b0b6cef492dcd705a04b22 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 18:56:35 +0200 Subject: [PATCH 04/25] add async with async-std used for client connection and frame send first introduction, not used ideally --- Cargo.lock | 245 +++++++++++++++++++++++++++++++++++++- puffin_http/Cargo.toml | 1 + puffin_http/src/server.rs | 42 ++++--- 3 files changed, 267 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4cdc1720..e4772fe7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -156,6 +156,119 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "175571dd1d178ced59193a6fc02dde1b972eb0bc56c892cde9beeceac5bf0f6b" +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ca9a001c1e8ba5149f91a74362376cc6bc5b919d92d988668657bd570bdcec" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.3.1", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-io" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6baa8f0178795da0e71bc42c9e5d13261aac7ee549853162e66a241ba17964" +dependencies = [ + "async-lock", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "tracing", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener 5.3.1", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-std" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615" +dependencies = [ + "async-channel 1.9.0", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "atk-sys" version = "0.15.1" @@ -244,6 +357,19 @@ dependencies = [ "objc2 0.5.2", ] +[[package]] +name = "blocking" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" +dependencies = [ + "async-channel 2.3.1", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -456,9 +582,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" dependencies = [ "crossbeam-utils", ] @@ -845,6 +971,39 @@ dependencies = [ "str-buf", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" +dependencies = [ + "event-listener 5.3.1", + "pin-project-lite", +] + +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "fdeflate" version = "0.3.3" @@ -912,12 +1071,40 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", +] + [[package]] name = "futures-core" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-lite" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cef40d21ae2c515b51041df9ed313ed21e572df340ea58a922a0aefe7e8891a1" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -1011,6 +1198,18 @@ dependencies = [ "system-deps", ] +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "glow" version = "0.13.0" @@ -1272,6 +1471,15 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2db585e1d738fc771bf08a151420d3ed193d9d895a36df7f6f8a9456b911ddc" +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1352,6 +1560,9 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "value-bag", +] [[package]] name = "lz4_flex" @@ -1715,6 +1926,12 @@ dependencies = [ "system-deps", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1756,6 +1973,23 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "piper" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkg-config" version = "0.3.28" @@ -1878,6 +2112,7 @@ name = "puffin_http" version = "0.16.1" dependencies = [ "anyhow", + "async-std", "flume", "log", "once_cell", @@ -2471,6 +2706,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "value-bag" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2" + [[package]] name = "vec1" version = "1.10.1" diff --git a/puffin_http/Cargo.toml b/puffin_http/Cargo.toml index ed18b8aa..d3ebcf10 100644 --- a/puffin_http/Cargo.toml +++ b/puffin_http/Cargo.toml @@ -15,6 +15,7 @@ include = ["**/*.rs", "Cargo.toml", "README.md"] [dependencies] anyhow = "1.0" +async-std = "1.13" flume = "0.11" log = "0.4" parking_lot = "0.12" diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 023dd6b3..ba438af0 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -1,8 +1,12 @@ use anyhow::Context as _; +use async_std::{ + io::WriteExt, + net::{TcpListener, TcpStream}, + task, +}; use puffin::{FrameSinkId, FrameView, GlobalProfiler}; use std::{ - io::Write, - net::{SocketAddr, TcpListener, TcpStream}, + net::SocketAddr, sync::{ atomic::{AtomicUsize, Ordering}, Arc, RwLock, @@ -226,10 +230,11 @@ impl Server { sink_install: fn(puffin::FrameSink) -> FrameSinkId, sink_remove: fn(FrameSinkId) -> (), ) -> anyhow::Result { - let tcp_listener = TcpListener::bind(bind_addr).context("binding server TCP socket")?; - tcp_listener - .set_nonblocking(true) - .context("TCP set_nonblocking")?; + let tcp_listener = task::block_on(async { + TcpListener::bind(bind_addr) + .await + .context("binding server TCP socket") + })?; // We use flume instead of `mpsc`, // because on shutdown we want all frames to be sent. @@ -258,11 +263,11 @@ impl Server { while let Ok(frame) = rx.recv() { ps_send.frame_view.add_frame(frame.clone()); - if let Err(err) = ps_connection.accept_new_clients() { + if let Err(err) = task::block_on(ps_connection.accept_new_clients()) { log::warn!("puffin server failure: {}", err); } - if let Err(err) = ps_send.send(&frame) { + if let Err(err) = task::block_on(ps_send.send(&frame)) { log::warn!("puffin server failure: {}", err); } } @@ -329,21 +334,19 @@ struct PuffinServerConnection { num_clients: Arc, } impl PuffinServerConnection { - fn accept_new_clients(&mut self) -> anyhow::Result<()> { + async fn accept_new_clients(&mut self) -> anyhow::Result<()> { loop { - match self.tcp_listener.accept() { + match self.tcp_listener.accept().await { Ok((tcp_stream, client_addr)) => { - tcp_stream - .set_nonblocking(false) - .context("stream.set_nonblocking")?; - log::info!("{} connected", client_addr); let (packet_tx, packet_rx) = flume::bounded(MAX_FRAMES_IN_QUEUE); let join_handle = std::thread::Builder::new() .name("puffin-server-client".to_owned()) - .spawn(move || client_loop(packet_rx, client_addr, tcp_stream)) + .spawn(move || { + task::block_on(client_loop(packet_rx, client_addr, tcp_stream)); + }) .context("Couldn't spawn thread")?; // Send all scopes when new client connects. @@ -377,7 +380,7 @@ struct PuffinServerSend { } impl PuffinServerSend { - pub fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> { + pub async fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> { if self.clients.read().unwrap().is_empty() { return Ok(()); } @@ -387,6 +390,7 @@ impl PuffinServerSend { packet .write_all(&crate::PROTOCOL_VERSION.to_le_bytes()) + .await .unwrap(); frame @@ -417,13 +421,13 @@ impl PuffinServerSend { } } -fn client_loop( +async fn client_loop( packet_rx: flume::Receiver, client_addr: SocketAddr, mut tcp_stream: TcpStream, ) { - while let Ok(packet) = packet_rx.recv() { - if let Err(err) = tcp_stream.write_all(&packet) { + while let Ok(packet) = packet_rx.recv_async().await { + if let Err(err) = tcp_stream.write_all(&packet).await { log::info!( "puffin server failed sending to {}: {} (kind: {:?})", client_addr, From 5d82cb10d05bca9b1a2c16672c5b8803e0a8fcd3 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 18:56:38 +0200 Subject: [PATCH 05/25] split puffin-server thread into tasks ps-connect : to manage client connection ps-send : to manage frame send --- puffin_http/src/server.rs | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index ba438af0..93995410 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -23,7 +23,7 @@ const MAX_FRAMES_IN_QUEUE: usize = 30; #[must_use = "When Server is dropped, the server is closed, so keep it around!"] pub struct Server { sink_id: FrameSinkId, - join_handle: Option>, + join_handle: Option>, num_clients: Arc, sink_remove: fn(FrameSinkId) -> (), } @@ -243,17 +243,28 @@ impl Server { let (tx, rx): (flume::Sender>, _) = flume::unbounded(); let clients = Arc::new(RwLock::new(Vec::new())); + let clients_cloned = clients.clone(); let num_clients = Arc::new(AtomicUsize::default()); let num_clients_cloned = num_clients.clone(); - let join_handle = std::thread::Builder::new() - .name("puffin-server".to_owned()) - .spawn(move || { + task::Builder::new() + .name("ps-connect".to_owned()) + .spawn(async move { let mut ps_connection = PuffinServerConnection { tcp_listener, - clients: clients.clone(), - num_clients: num_clients_cloned.clone(), + clients: clients_cloned, + num_clients: num_clients_cloned, }; + if let Err(err) = ps_connection.accept_new_clients().await { + log::warn!("puffin server failure: {}", err); + } + }) + .context("Couldn't spawn ps-connect task")?; + + let num_clients_cloned = num_clients.clone(); + let join_handle = task::Builder::new() + .name("ps-send".to_owned()) + .spawn(async move { let mut ps_send = PuffinServerSend { clients, num_clients: num_clients_cloned, @@ -261,18 +272,14 @@ impl Server { frame_view: Default::default(), }; - while let Ok(frame) = rx.recv() { + while let Ok(frame) = rx.recv_async().await { ps_send.frame_view.add_frame(frame.clone()); - if let Err(err) = task::block_on(ps_connection.accept_new_clients()) { - log::warn!("puffin server failure: {}", err); - } - - if let Err(err) = task::block_on(ps_send.send(&frame)) { + if let Err(err) = ps_send.send(&frame).await { log::warn!("puffin server failure: {}", err); } } }) - .context("Couldn't spawn thread")?; + .context("Couldn't spawn ps-send task")?; // Call the `install` function to add ourselves as a sink let sink_id = sink_install(Box::new(move |frame| { @@ -300,7 +307,7 @@ impl Drop for Server { // Take care to send everything before we shut down: if let Some(join_handle) = self.join_handle.take() { - join_handle.join().ok(); + task::block_on(join_handle); //.ok ? } } } @@ -384,7 +391,7 @@ impl PuffinServerSend { if self.clients.read().unwrap().is_empty() { return Ok(()); } - puffin::profile_function!(); + //puffin::profile_function!(); //TODO: enable again later let mut packet = vec![]; From f0b55335d3f8f8f5fe3bd11d7da83ef1a2d03bbc Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 18:56:40 +0200 Subject: [PATCH 06/25] use task instead of thread for Client --- puffin_http/src/server.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 93995410..f21f003c 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -317,7 +317,7 @@ type Packet = Arc<[u8]>; struct Client { client_addr: SocketAddr, packet_tx: Option>, - join_handle: Option>, + join_handle: Option>, } impl Drop for Client { @@ -329,7 +329,7 @@ impl Drop for Client { // Wait for the shutdown: if let Some(join_handle) = self.join_handle.take() { - join_handle.join().ok(); + task::block_on(join_handle); // .ok() } } } @@ -349,12 +349,12 @@ impl PuffinServerConnection { let (packet_tx, packet_rx) = flume::bounded(MAX_FRAMES_IN_QUEUE); - let join_handle = std::thread::Builder::new() - .name("puffin-server-client".to_owned()) - .spawn(move || { - task::block_on(client_loop(packet_rx, client_addr, tcp_stream)); + let join_handle = task::Builder::new() + .name("ps-client".to_owned()) + .spawn(async move { + client_loop(packet_rx, client_addr, tcp_stream).await; }) - .context("Couldn't spawn thread")?; + .context("Couldn't spawn ps-client task")?; // Send all scopes when new client connects. // TODO: send all previous scopes at connection, not on regular send From f6c00e5f08907d332ba136f6c946aa62fedf5781 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 19:00:32 +0200 Subject: [PATCH 07/25] add run into Server and use thread to process it --- Cargo.lock | 5 +- puffin_http/Cargo.toml | 1 + puffin_http/src/server.rs | 96 +++++++++++++++++++++++---------------- 3 files changed, 62 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4772fe7..362d610f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1094,9 +1094,9 @@ checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cef40d21ae2c515b51041df9ed313ed21e572df340ea58a922a0aefe7e8891a1" +checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" dependencies = [ "fastrand", "futures-core", @@ -2114,6 +2114,7 @@ dependencies = [ "anyhow", "async-std", "flume", + "futures-lite", "log", "once_cell", "parking_lot", diff --git a/puffin_http/Cargo.toml b/puffin_http/Cargo.toml index d3ebcf10..ce9340d4 100644 --- a/puffin_http/Cargo.toml +++ b/puffin_http/Cargo.toml @@ -17,6 +17,7 @@ include = ["**/*.rs", "Cargo.toml", "README.md"] anyhow = "1.0" async-std = "1.13" flume = "0.11" +futures-lite = { version = "2.6", default-features = false } log = "0.4" parking_lot = "0.12" puffin = { version = "0.19.1", path = "../puffin", features = [ diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index f21f003c..43fbe836 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -4,6 +4,7 @@ use async_std::{ net::{TcpListener, TcpStream}, task, }; +use futures_lite::future; use puffin::{FrameSinkId, FrameView, GlobalProfiler}; use std::{ net::SocketAddr, @@ -23,7 +24,7 @@ const MAX_FRAMES_IN_QUEUE: usize = 30; #[must_use = "When Server is dropped, the server is closed, so keep it around!"] pub struct Server { sink_id: FrameSinkId, - join_handle: Option>, + join_handle: Option>, num_clients: Arc, sink_remove: fn(FrameSinkId) -> (), } @@ -230,11 +231,7 @@ impl Server { sink_install: fn(puffin::FrameSink) -> FrameSinkId, sink_remove: fn(FrameSinkId) -> (), ) -> anyhow::Result { - let tcp_listener = task::block_on(async { - TcpListener::bind(bind_addr) - .await - .context("binding server TCP socket") - })?; + let bind_addr = String::from(bind_addr); // We use flume instead of `mpsc`, // because on shutdown we want all frames to be sent. @@ -242,12 +239,45 @@ impl Server { // but `flume` will continue until the channel is empty. let (tx, rx): (flume::Sender>, _) = flume::unbounded(); + let num_clients = Arc::new(AtomicUsize::default()); + let num_clients_cloned = num_clients.clone(); + let join_handle = std::thread::Builder::new() + .name("puffin-server".to_owned()) + .spawn(move || { + Server::run(bind_addr, rx, num_clients_cloned).unwrap(); + }) + .context("Can't start puffin-server thread.")?; + + // Call the `install` function to add ourselves as a sink + let sink_id = sink_install(Box::new(move |frame| { + tx.send(frame).ok(); + })); + + Ok(Server { + sink_id, + join_handle: Some(join_handle), + num_clients, + sink_remove, + }) + } + + /// start and run puffin server service + pub fn run( + bind_addr: String, + rx: flume::Receiver>, + num_clients: Arc, + ) -> anyhow::Result<()> { + let tcp_listener = task::block_on(async { + TcpListener::bind(bind_addr) + .await + .context("binding server TCP socket") + })?; + let clients = Arc::new(RwLock::new(Vec::new())); let clients_cloned = clients.clone(); - let num_clients = Arc::new(AtomicUsize::default()); let num_clients_cloned = num_clients.clone(); - task::Builder::new() + let psconnect_handle = task::Builder::new() .name("ps-connect".to_owned()) .spawn(async move { let mut ps_connection = PuffinServerConnection { @@ -255,19 +285,16 @@ impl Server { clients: clients_cloned, num_clients: num_clients_cloned, }; - if let Err(err) = ps_connection.accept_new_clients().await { - log::warn!("puffin server failure: {}", err); - } + ps_connection.accept_new_clients().await }) .context("Couldn't spawn ps-connect task")?; - let num_clients_cloned = num_clients.clone(); - let join_handle = task::Builder::new() + let pssend_handle = task::Builder::new() .name("ps-send".to_owned()) .spawn(async move { let mut ps_send = PuffinServerSend { clients, - num_clients: num_clients_cloned, + num_clients, //TODO: send_all_scopes: false, frame_view: Default::default(), }; @@ -281,17 +308,8 @@ impl Server { }) .context("Couldn't spawn ps-send task")?; - // Call the `install` function to add ourselves as a sink - let sink_id = sink_install(Box::new(move |frame| { - tx.send(frame).ok(); - })); - - Ok(Server { - sink_id, - join_handle: Some(join_handle), - num_clients, - sink_remove, - }) + let (con_res, _) = future::block_on(future::zip(psconnect_handle, pssend_handle)); + con_res.context("Client connection management task") } /// Number of clients currently connected. @@ -307,7 +325,7 @@ impl Drop for Server { // Take care to send everything before we shut down: if let Some(join_handle) = self.join_handle.take() { - task::block_on(join_handle); //.ok ? + join_handle.join().ok(); } } } @@ -408,23 +426,25 @@ impl PuffinServerSend { let packet: Packet = packet.into(); let mut clients = self.clients.write().unwrap(); - clients.retain(|client| match &client.packet_tx { + clients.retain(|client| { + task::block_on(async { Self::send_to_client(client, packet.clone()).await }) + }); + self.num_clients.store(clients.len(), Ordering::SeqCst); + + Ok(()) + } + + async fn send_to_client(client: &Client, packet: Packet) -> bool { + match &client.packet_tx { None => false, - Some(packet_tx) => match packet_tx.try_send(packet.clone()) { + Some(packet_tx) => match packet_tx.send_async(packet).await { Ok(()) => true, - Err(flume::TrySendError::Disconnected(_)) => false, - Err(flume::TrySendError::Full(_)) => { - log::info!( - "puffin client {} is not accepting data fast enough; dropping a frame", - client.client_addr - ); + Err(err) => { + log::info!("puffin send error: {} for '{}'", err, client.client_addr); true } }, - }); - self.num_clients.store(clients.len(), Ordering::SeqCst); - - Ok(()) + } } } From 611b0568de178a6005803c6b4a5765408c7339b4 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 19:00:34 +0200 Subject: [PATCH 08/25] use async Arc and RwLock instead of sync from async-std instead of from std --- puffin_http/src/server.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 43fbe836..c2441784 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -2,16 +2,14 @@ use anyhow::Context as _; use async_std::{ io::WriteExt, net::{TcpListener, TcpStream}, + sync::{Arc, RwLock}, task, }; use futures_lite::future; use puffin::{FrameSinkId, FrameView, GlobalProfiler}; use std::{ net::SocketAddr, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, RwLock, - }, + sync::atomic::{AtomicUsize, Ordering}, }; /// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough. @@ -377,13 +375,13 @@ impl PuffinServerConnection { // Send all scopes when new client connects. // TODO: send all previous scopes at connection, not on regular send //self.send_all_scopes = true; - self.clients.write().unwrap().push(Client { + self.clients.write().await.push(Client { client_addr, packet_tx: Some(packet_tx), join_handle: Some(join_handle), }); self.num_clients - .store(self.clients.read().unwrap().len(), Ordering::SeqCst); + .store(self.clients.read().await.len(), Ordering::SeqCst); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { break; // Nothing to do for now. @@ -406,7 +404,7 @@ struct PuffinServerSend { impl PuffinServerSend { pub async fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> { - if self.clients.read().unwrap().is_empty() { + if self.clients.read().await.is_empty() { return Ok(()); } //puffin::profile_function!(); //TODO: enable again later @@ -425,7 +423,7 @@ impl PuffinServerSend { let packet: Packet = packet.into(); - let mut clients = self.clients.write().unwrap(); + let mut clients = self.clients.write().await; clients.retain(|client| { task::block_on(async { Self::send_to_client(client, packet.clone()).await }) }); From 76c22d57e7f07e46dab58a3ce7c3d74f3f95fe11 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 19:00:35 +0200 Subject: [PATCH 09/25] use net::SocketAddr from async-std instead of std --- puffin_http/src/server.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index c2441784..e275a9b3 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -1,16 +1,13 @@ use anyhow::Context as _; use async_std::{ io::WriteExt, - net::{TcpListener, TcpStream}, + net::{SocketAddr, TcpListener, TcpStream}, sync::{Arc, RwLock}, task, }; use futures_lite::future; use puffin::{FrameSinkId, FrameView, GlobalProfiler}; -use std::{ - net::SocketAddr, - sync::atomic::{AtomicUsize, Ordering}, -}; +use std::sync::atomic::{AtomicUsize, Ordering}; /// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough. const MAX_FRAMES_IN_QUEUE: usize = 30; From 9a47860fb70d28510ad6670823e828e12410220b Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 19:00:36 +0200 Subject: [PATCH 10/25] move tcp_listener creation into async task ps-connect --- puffin_http/src/server.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index e275a9b3..1feed451 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -262,12 +262,6 @@ impl Server { rx: flume::Receiver>, num_clients: Arc, ) -> anyhow::Result<()> { - let tcp_listener = task::block_on(async { - TcpListener::bind(bind_addr) - .await - .context("binding server TCP socket") - })?; - let clients = Arc::new(RwLock::new(Vec::new())); let clients_cloned = clients.clone(); let num_clients_cloned = num_clients.clone(); @@ -275,6 +269,11 @@ impl Server { let psconnect_handle = task::Builder::new() .name("ps-connect".to_owned()) .spawn(async move { + let tcp_listener = TcpListener::bind(bind_addr) + .await + .context("binding server TCP socket") + .unwrap(); //TODO use ? + let mut ps_connection = PuffinServerConnection { tcp_listener, clients: clients_cloned, From c4b86ce320d9e925c661ee3d7419276e9c4df7b5 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 19:00:36 +0200 Subject: [PATCH 11/25] remove use of retain in PuffinServerSend::send this remove use of async function in retain. Async function don't work in retain with local executor. --- puffin_http/src/server.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 1feed451..23608c10 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -419,9 +419,16 @@ impl PuffinServerSend { let packet: Packet = packet.into(); + // Send frame to clients, remove disconnected clients and update num_clients var let mut clients = self.clients.write().await; - clients.retain(|client| { - task::block_on(async { Self::send_to_client(client, packet.clone()).await }) + let mut idx_to_remove = Vec::new(); + for (idx, client) in clients.iter().enumerate() { + if !Self::send_to_client(client, packet.clone()).await { + idx_to_remove.push(idx); + } + } + idx_to_remove.iter().rev().for_each(|idx| { + clients.remove(*idx); }); self.num_clients.store(clients.len(), Ordering::SeqCst); From 36b1954abc0bf2ee060b30f02735fe8421b89926 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 19:00:17 +0200 Subject: [PATCH 12/25] use dedicated LocalExecutor for puffin_http server this avoid to spawn multiple threads for management of async task of puffin server. Now all puffin server task are executed in the same thread, a dedecated thread. --- Cargo.lock | 1 + puffin_http/Cargo.toml | 1 + puffin_http/src/server.rs | 61 +++++++++++++++++++++++---------------- 3 files changed, 38 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 362d610f..a393e1ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2112,6 +2112,7 @@ name = "puffin_http" version = "0.16.1" dependencies = [ "anyhow", + "async-executor", "async-std", "flume", "futures-lite", diff --git a/puffin_http/Cargo.toml b/puffin_http/Cargo.toml index ce9340d4..8aaaafca 100644 --- a/puffin_http/Cargo.toml +++ b/puffin_http/Cargo.toml @@ -15,6 +15,7 @@ include = ["**/*.rs", "Cargo.toml", "README.md"] [dependencies] anyhow = "1.0" +async-executor = "1.13" async-std = "1.13" flume = "0.11" futures-lite = { version = "2.6", default-features = false } diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 23608c10..642f21ec 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -1,13 +1,16 @@ use anyhow::Context as _; +use async_executor::{LocalExecutor, Task}; use async_std::{ io::WriteExt, net::{SocketAddr, TcpListener, TcpStream}, sync::{Arc, RwLock}, - task, }; use futures_lite::future; use puffin::{FrameSinkId, FrameView, GlobalProfiler}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{ + rc::Rc, + sync::atomic::{AtomicUsize, Ordering}, +}; /// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough. const MAX_FRAMES_IN_QUEUE: usize = 30; @@ -262,30 +265,33 @@ impl Server { rx: flume::Receiver>, num_clients: Arc, ) -> anyhow::Result<()> { + let executor = Rc::new(LocalExecutor::new()); + let clients = Arc::new(RwLock::new(Vec::new())); let clients_cloned = clients.clone(); let num_clients_cloned = num_clients.clone(); - let psconnect_handle = task::Builder::new() - .name("ps-connect".to_owned()) - .spawn(async move { + let executor_cloned = executor.clone(); + let psconnect_handle = //task::Builder::new() + //.name("ps-connect".to_owned()) + executor.spawn(async move { + log::trace!("Start listening on {bind_addr}"); let tcp_listener = TcpListener::bind(bind_addr) .await - .context("binding server TCP socket") - .unwrap(); //TODO use ? + .context("binding server TCP socket")?; let mut ps_connection = PuffinServerConnection { + executor: executor_cloned, tcp_listener, clients: clients_cloned, num_clients: num_clients_cloned, }; ps_connection.accept_new_clients().await - }) - .context("Couldn't spawn ps-connect task")?; + }); - let pssend_handle = task::Builder::new() - .name("ps-send".to_owned()) - .spawn(async move { + let pssend_handle = //task::Builder::new() + //.name("ps-send".to_owned()) + executor.spawn(async move { let mut ps_send = PuffinServerSend { clients, num_clients, @@ -293,16 +299,19 @@ impl Server { frame_view: Default::default(), }; + log::trace!("Wait frame to send"); while let Ok(frame) = rx.recv_async().await { ps_send.frame_view.add_frame(frame.clone()); if let Err(err) = ps_send.send(&frame).await { log::warn!("puffin server failure: {}", err); } } - }) - .context("Couldn't spawn ps-send task")?; + }); + //.context("Couldn't spawn ps-send task")?; - let (con_res, _) = future::block_on(future::zip(psconnect_handle, pssend_handle)); + let (con_res, _) = future::block_on( + executor.run(async { future::zip(psconnect_handle, pssend_handle).await }), + ); con_res.context("Client connection management task") } @@ -329,7 +338,7 @@ type Packet = Arc<[u8]>; struct Client { client_addr: SocketAddr, packet_tx: Option>, - join_handle: Option>, + join_handle: Option>, } impl Drop for Client { @@ -341,18 +350,20 @@ impl Drop for Client { // Wait for the shutdown: if let Some(join_handle) = self.join_handle.take() { - task::block_on(join_handle); // .ok() + future::block_on(join_handle); // .ok() } } } /// Listens for incoming connections -struct PuffinServerConnection { +struct PuffinServerConnection<'a> { + executor: Rc>, tcp_listener: TcpListener, clients: Arc>>, num_clients: Arc, } -impl PuffinServerConnection { + +impl<'a> PuffinServerConnection<'a> { async fn accept_new_clients(&mut self) -> anyhow::Result<()> { loop { match self.tcp_listener.accept().await { @@ -361,12 +372,12 @@ impl PuffinServerConnection { let (packet_tx, packet_rx) = flume::bounded(MAX_FRAMES_IN_QUEUE); - let join_handle = task::Builder::new() - .name("ps-client".to_owned()) - .spawn(async move { + let join_handle = //task::Builder::new() + //.name("ps-client".to_owned()) + self.executor.spawn(async move { client_loop(packet_rx, client_addr, tcp_stream).await; - }) - .context("Couldn't spawn ps-client task")?; + }); + //.context("Couldn't spawn ps-client task")?; // Send all scopes when new client connects. // TODO: send all previous scopes at connection, not on regular send @@ -403,7 +414,7 @@ impl PuffinServerSend { if self.clients.read().await.is_empty() { return Ok(()); } - //puffin::profile_function!(); //TODO: enable again later + puffin::profile_function!(); let mut packet = vec![]; From 8e490703d53afaa6e91b46b6c69c378a4a8cd133 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 19:00:33 +0200 Subject: [PATCH 13/25] Add log for error in client loop print in log::info when packet channel receiver return an error --- puffin_http/src/server.rs | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 642f21ec..7cc49a44 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -465,15 +465,23 @@ async fn client_loop( client_addr: SocketAddr, mut tcp_stream: TcpStream, ) { - while let Ok(packet) = packet_rx.recv_async().await { - if let Err(err) = tcp_stream.write_all(&packet).await { - log::info!( - "puffin server failed sending to {}: {} (kind: {:?})", - client_addr, - err, - err.kind() - ); - break; + loop { + match packet_rx.recv_async().await { + Ok(packet) => { + if let Err(err) = tcp_stream.write_all(&packet).await { + log::info!( + "puffin server failed sending to {}: {} (kind: {:?})", + client_addr, + err, + err.kind() + ); + break; + } + } + Err(err) => { + log::info!("Error in client_loop: {}", err); + break; + } } } } From 070ca0ea4a908ef21f7c8e5d6e51c657f088c23a Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Fri, 12 Aug 2022 19:00:33 +0200 Subject: [PATCH 14/25] add some profile scope and function markers - "accept_client" - send_to_client - "write frame to client" --- puffin_http/src/server.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 7cc49a44..9e54b349 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -368,6 +368,7 @@ impl<'a> PuffinServerConnection<'a> { loop { match self.tcp_listener.accept().await { Ok((tcp_stream, client_addr)) => { + puffin::profile_scope!("accept_client"); log::info!("{} connected", client_addr); let (packet_tx, packet_rx) = flume::bounded(MAX_FRAMES_IN_QUEUE); @@ -447,6 +448,7 @@ impl PuffinServerSend { } async fn send_to_client(client: &Client, packet: Packet) -> bool { + puffin::profile_function!(); match &client.packet_tx { None => false, Some(packet_tx) => match packet_tx.send_async(packet).await { @@ -468,6 +470,7 @@ async fn client_loop( loop { match packet_rx.recv_async().await { Ok(packet) => { + puffin::profile_scope!("write frame to client"); if let Err(err) = tcp_stream.write_all(&packet).await { log::info!( "puffin server failed sending to {}: {} (kind: {:?})", From a3c6113479b88fff38df43e155c32e0e0d3e3b4e Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Thu, 24 Nov 2022 14:47:41 +0100 Subject: [PATCH 15/25] Clean: change unneeded Arc into simple RC Allowed because all task run in an unique Thread --- puffin_http/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 9e54b349..3756b67d 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -333,7 +333,7 @@ impl Drop for Server { } } -type Packet = Arc<[u8]>; +type Packet = Rc<[u8]>; struct Client { client_addr: SocketAddr, From 4864b739a6b9ed052484aa682d8303d3c7aae02e Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Thu, 24 Nov 2022 22:31:33 +0100 Subject: [PATCH 16/25] remove useless atomic to client variable Use Rc & RefCell instead of Arc & RwLock. With LocalExecutor, no need of Sync & Send trait --- puffin_http/src/server.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 3756b67d..bbf6b414 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -3,11 +3,12 @@ use async_executor::{LocalExecutor, Task}; use async_std::{ io::WriteExt, net::{SocketAddr, TcpListener, TcpStream}, - sync::{Arc, RwLock}, + sync::Arc, }; use futures_lite::future; use puffin::{FrameSinkId, FrameView, GlobalProfiler}; use std::{ + cell::RefCell, rc::Rc, sync::atomic::{AtomicUsize, Ordering}, }; @@ -267,7 +268,7 @@ impl Server { ) -> anyhow::Result<()> { let executor = Rc::new(LocalExecutor::new()); - let clients = Arc::new(RwLock::new(Vec::new())); + let clients = Rc::new(RefCell::new(Vec::new())); let clients_cloned = clients.clone(); let num_clients_cloned = num_clients.clone(); @@ -359,7 +360,7 @@ impl Drop for Client { struct PuffinServerConnection<'a> { executor: Rc>, tcp_listener: TcpListener, - clients: Arc>>, + clients: Rc>>, num_clients: Arc, } @@ -383,13 +384,13 @@ impl<'a> PuffinServerConnection<'a> { // Send all scopes when new client connects. // TODO: send all previous scopes at connection, not on regular send //self.send_all_scopes = true; - self.clients.write().await.push(Client { + self.clients.borrow_mut().push(Client { client_addr, packet_tx: Some(packet_tx), join_handle: Some(join_handle), }); self.num_clients - .store(self.clients.read().await.len(), Ordering::SeqCst); + .store(self.clients.borrow().len(), Ordering::SeqCst); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { break; // Nothing to do for now. @@ -405,14 +406,14 @@ impl<'a> PuffinServerConnection<'a> { /// streams to client puffin profiler data. struct PuffinServerSend { - clients: Arc>>, + clients: Rc>>, num_clients: Arc, frame_view: FrameView, } impl PuffinServerSend { pub async fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> { - if self.clients.read().await.is_empty() { + if self.clients.borrow().is_empty() { return Ok(()); } puffin::profile_function!(); @@ -432,7 +433,7 @@ impl PuffinServerSend { let packet: Packet = packet.into(); // Send frame to clients, remove disconnected clients and update num_clients var - let mut clients = self.clients.write().await; + let mut clients = self.clients.borrow_mut(); let mut idx_to_remove = Vec::new(); for (idx, client) in clients.iter().enumerate() { if !Self::send_to_client(client, packet.clone()).await { From ea19e69af633ef0e398f894c4ce8aa1e48c071ee Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Sun, 11 Dec 2022 17:38:59 +0100 Subject: [PATCH 17/25] dont borrow_mut cliens during send_to_client calls --- puffin_http/src/server.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index bbf6b414..d774a57b 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -433,13 +433,15 @@ impl PuffinServerSend { let packet: Packet = packet.into(); // Send frame to clients, remove disconnected clients and update num_clients var - let mut clients = self.clients.borrow_mut(); + let clients = self.clients.borrow(); let mut idx_to_remove = Vec::new(); for (idx, client) in clients.iter().enumerate() { if !Self::send_to_client(client, packet.clone()).await { idx_to_remove.push(idx); } } + + let mut clients = self.clients.borrow_mut(); idx_to_remove.iter().rev().for_each(|idx| { clients.remove(*idx); }); From 99a9a6f13866a41b3037c89ac8b9de3709ebee79 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Sat, 21 Dec 2024 20:16:31 +0100 Subject: [PATCH 18/25] wip: test dep only needed async crates net, future-lite and async-executor --- Cargo.lock | 108 +++----------------------------------- puffin_http/Cargo.toml | 3 +- puffin_http/src/server.rs | 18 +++---- 3 files changed, 17 insertions(+), 112 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a393e1ca..9be29b18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -156,17 +156,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "175571dd1d178ced59193a6fc02dde1b972eb0bc56c892cde9beeceac5bf0f6b" -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener 2.5.3", - "futures-core", -] - [[package]] name = "async-channel" version = "2.3.1" @@ -192,21 +181,6 @@ dependencies = [ "slab", ] -[[package]] -name = "async-global-executor" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" -dependencies = [ - "async-channel 2.3.1", - "async-executor", - "async-io", - "async-lock", - "blocking", - "futures-lite", - "once_cell", -] - [[package]] name = "async-io" version = "2.3.3" @@ -232,35 +206,20 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener 5.3.1", + "event-listener", "event-listener-strategy", "pin-project-lite", ] [[package]] -name = "async-std" -version = "1.13.0" +name = "async-net" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615" +checksum = "b948000fad4873c1c9339d60f2623323a0cfd3816e5181033c6a5cb68b2accf7" dependencies = [ - "async-channel 1.9.0", - "async-global-executor", "async-io", - "async-lock", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", + "blocking", "futures-lite", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", - "wasm-bindgen-futures", ] [[package]] @@ -363,7 +322,7 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" dependencies = [ - "async-channel 2.3.1", + "async-channel", "async-task", "futures-io", "futures-lite", @@ -971,12 +930,6 @@ dependencies = [ "str-buf", ] -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "event-listener" version = "5.3.1" @@ -994,7 +947,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" dependencies = [ - "event-listener 5.3.1", + "event-listener", "pin-project-lite", ] @@ -1071,15 +1024,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "futures-channel" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" -dependencies = [ - "futures-core", -] - [[package]] name = "futures-core" version = "0.3.31" @@ -1198,18 +1142,6 @@ dependencies = [ "system-deps", ] -[[package]] -name = "gloo-timers" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", -] - [[package]] name = "glow" version = "0.13.0" @@ -1471,15 +1403,6 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2db585e1d738fc771bf08a151420d3ed193d9d895a36df7f6f8a9456b911ddc" -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -1560,9 +1483,6 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" -dependencies = [ - "value-bag", -] [[package]] name = "lz4_flex" @@ -1973,12 +1893,6 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - [[package]] name = "piper" version = "0.2.4" @@ -2113,7 +2027,7 @@ version = "0.16.1" dependencies = [ "anyhow", "async-executor", - "async-std", + "async-net", "flume", "futures-lite", "log", @@ -2708,12 +2622,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "value-bag" -version = "1.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2" - [[package]] name = "vec1" version = "1.10.1" diff --git a/puffin_http/Cargo.toml b/puffin_http/Cargo.toml index 8aaaafca..6bccbb90 100644 --- a/puffin_http/Cargo.toml +++ b/puffin_http/Cargo.toml @@ -16,7 +16,7 @@ include = ["**/*.rs", "Cargo.toml", "README.md"] [dependencies] anyhow = "1.0" async-executor = "1.13" -async-std = "1.13" +async-net = "2.0.0" flume = "0.11" futures-lite = { version = "2.6", default-features = false } log = "0.4" @@ -26,6 +26,7 @@ puffin = { version = "0.19.1", path = "../puffin", features = [ "lz4", "serialization", ] } +#smol = "2.0" [dev-dependencies] simple_logger = "4.2" diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index d774a57b..eadfdff0 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -1,16 +1,15 @@ use anyhow::Context as _; use async_executor::{LocalExecutor, Task}; -use async_std::{ - io::WriteExt, - net::{SocketAddr, TcpListener, TcpStream}, - sync::Arc, -}; -use futures_lite::future; +use async_net::{SocketAddr, TcpListener, TcpStream}; +use futures_lite::{future, AsyncWriteExt}; use puffin::{FrameSinkId, FrameView, GlobalProfiler}; use std::{ cell::RefCell, rc::Rc, - sync::atomic::{AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; /// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough. @@ -420,10 +419,7 @@ impl PuffinServerSend { let mut packet = vec![]; - packet - .write_all(&crate::PROTOCOL_VERSION.to_le_bytes()) - .await - .unwrap(); + std::io::Write::write_all(&mut packet, &crate::PROTOCOL_VERSION.to_le_bytes()).unwrap(); frame .write_into(self.frame_view.scope_collection(), false, &mut packet) From 364ba88923b8cfabd79b412e72d0ae27159db55f Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Sun, 22 Dec 2024 18:48:14 +0100 Subject: [PATCH 19/25] style: move meta_serialized creation juste before use --- puffin/src/frame_data.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/puffin/src/frame_data.rs b/puffin/src/frame_data.rs index ed29d188..ca41d890 100644 --- a/puffin/src/frame_data.rs +++ b/puffin/src/frame_data.rs @@ -568,9 +568,10 @@ impl FrameData { use bincode::Options as _; use byteorder::{WriteBytesExt as _, LE}; - let meta_serialized = bincode::options().serialize(&self.meta)?; write.write_all(b"PFD4")?; + + let meta_serialized = bincode::options().serialize(&self.meta)?; write.write_all(&(meta_serialized.len() as u32).to_le_bytes())?; write.write_all(&meta_serialized)?; From 5e1fab41a39996b0f942c27da5360b9f53032129 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Sun, 22 Dec 2024 19:51:34 +0100 Subject: [PATCH 20/25] refactor(server_http): replace FrameView by ScopeCollection The frame_view member is only added for use scope_collection member. Directly use ScopeCollection struct. --- puffin_http/src/server.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index eadfdff0..68a35f23 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -2,7 +2,7 @@ use anyhow::Context as _; use async_executor::{LocalExecutor, Task}; use async_net::{SocketAddr, TcpListener, TcpStream}; use futures_lite::{future, AsyncWriteExt}; -use puffin::{FrameSinkId, FrameView, GlobalProfiler}; +use puffin::{FrameSinkId, GlobalProfiler, ScopeCollection}; use std::{ cell::RefCell, rc::Rc, @@ -295,13 +295,12 @@ impl Server { let mut ps_send = PuffinServerSend { clients, num_clients, - //TODO: send_all_scopes: false, - frame_view: Default::default(), + send_all_scopes: false, + scope_collection: Default::default(), }; log::trace!("Wait frame to send"); while let Ok(frame) = rx.recv_async().await { - ps_send.frame_view.add_frame(frame.clone()); if let Err(err) = ps_send.send(&frame).await { log::warn!("puffin server failure: {}", err); } @@ -407,7 +406,8 @@ impl<'a> PuffinServerConnection<'a> { struct PuffinServerSend { clients: Rc>>, num_clients: Arc, - frame_view: FrameView, + send_all_scopes: bool, + scope_collection: ScopeCollection, } impl PuffinServerSend { @@ -417,14 +417,19 @@ impl PuffinServerSend { } puffin::profile_function!(); + // Keep scope_collection up-to-date + frame.scope_delta.iter().for_each(|new_scope| { + self.scope_collection.insert(new_scope.clone()); + }); + let mut packet = vec![]; std::io::Write::write_all(&mut packet, &crate::PROTOCOL_VERSION.to_le_bytes()).unwrap(); frame - .write_into(self.frame_view.scope_collection(), false, &mut packet) + .write_into(&self.scope_collection, self.send_all_scopes, &mut packet) .context("Encode puffin frame")?; - //TODO self.send_all_scopes = false; + self.send_all_scopes = false; let packet: Packet = packet.into(); From 8c7dc62d72912555ddb331ef9c8bc02667bf56a2 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Sun, 22 Dec 2024 18:49:51 +0100 Subject: [PATCH 21/25] perf!: remove manual seq length serialization this is useless as already done by serde serialization. BREAKING CHANGE: the frame serialization format change. Introducing PFD5 --- puffin/src/frame_data.rs | 44 +++++++++++++++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/puffin/src/frame_data.rs b/puffin/src/frame_data.rs index ca41d890..08badf9d 100644 --- a/puffin/src/frame_data.rs +++ b/puffin/src/frame_data.rs @@ -566,13 +566,11 @@ impl FrameData { write: &mut impl std::io::Write, ) -> anyhow::Result<()> { use bincode::Options as _; - use byteorder::{WriteBytesExt as _, LE}; + use byteorder::WriteBytesExt as _; - - write.write_all(b"PFD4")?; + write.write_all(b"PFD5")?; let meta_serialized = bincode::options().serialize(&self.meta)?; - write.write_all(&(meta_serialized.len() as u32).to_le_bytes())?; write.write_all(&meta_serialized)?; self.create_packed(); @@ -590,7 +588,6 @@ impl FrameData { }; let serialized_scopes = bincode::options().serialize(&to_serialize_scopes)?; - write.write_u32::(serialized_scopes.len() as u32)?; write.write_all(&serialized_scopes)?; Ok(()) } @@ -769,6 +766,43 @@ impl FrameData { .context("Can not deserialize scope details")? }; + let new_scopes: Vec<_> = deserialized_scopes + .into_iter() + .map(|x| Arc::new(x.clone())) + .collect(); + + Ok(Some(Self { + meta, + data: RwLock::new(FrameDataState::Packed(streams_compressed)), + scope_delta: new_scopes, + full_delta: false, + })) + } else if &header == b"PFD5" { + // Added 2024-12-22: remove useless manual sequence size serialization. + let meta = { + let mut meta = Vec::new(); + read.read_exact(&mut meta)?; + bincode::options() + .deserialize(&meta) + .context("bincode deserialize")? + }; + + let streams_compressed_length = read.read_u32::()? as usize; + let compression_kind = CompressionKind::from_u8(read.read_u8()?)?; + let streams_compressed = { + let mut streams_compressed = vec![0_u8; streams_compressed_length]; + read.read_exact(&mut streams_compressed)?; + PackedStreams::new(compression_kind, streams_compressed) + }; + + let deserialized_scopes: Vec = { + let mut serialized_scopes = Vec::new(); + read.read_exact(&mut serialized_scopes)?; + bincode::options() + .deserialize_from(serialized_scopes.as_slice()) + .context("Can not deserialize scope details")? + }; + let new_scopes: Vec<_> = deserialized_scopes .into_iter() .map(|x| Arc::new(x.clone())) From 3c64fded50300c4b5127a5e8c4278e48f9b1570b Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Sun, 22 Dec 2024 19:31:30 +0100 Subject: [PATCH 22/25] perf: avoid vector cloning for scope_delta during serialization --- puffin/src/frame_data.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/puffin/src/frame_data.rs b/puffin/src/frame_data.rs index 08badf9d..7e0d6183 100644 --- a/puffin/src/frame_data.rs +++ b/puffin/src/frame_data.rs @@ -581,13 +581,17 @@ impl FrameData { write.write_u8(packed_streams.compression_kind as u8)?; write.write_all(&packed_streams.bytes)?; - let to_serialize_scopes: Vec<_> = if send_all_scopes { - scope_collection.scopes_by_id().values().cloned().collect() + let serialized_scopes: Vec<_> = if send_all_scopes { + let to_serialize_scopes = scope_collection + .scopes_by_id() + .values() + .cloned() + .collect::>(); + bincode::options().serialize(&to_serialize_scopes) } else { - self.scope_delta.clone() - }; + bincode::options().serialize(&self.scope_delta) + }?; - let serialized_scopes = bincode::options().serialize(&to_serialize_scopes)?; write.write_all(&serialized_scopes)?; Ok(()) } From 9b44746cab5bb6a4e85121bb6283f9c633c3c61f Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Sat, 21 Dec 2024 22:32:19 +0100 Subject: [PATCH 23/25] wip: rework scope_collection send ony once --- puffin/src/frame_data.rs | 5 ++--- puffin/src/profile_view.rs | 2 +- puffin_http/src/server.rs | 33 +++++++++++++++++++++++++-------- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/puffin/src/frame_data.rs b/puffin/src/frame_data.rs index 7e0d6183..8efbdb0b 100644 --- a/puffin/src/frame_data.rs +++ b/puffin/src/frame_data.rs @@ -561,8 +561,7 @@ impl FrameData { #[cfg(feature = "serialization")] pub fn write_into( &self, - scope_collection: &crate::ScopeCollection, - send_all_scopes: bool, + scope_collection: Option<&crate::ScopeCollection>, write: &mut impl std::io::Write, ) -> anyhow::Result<()> { use bincode::Options as _; @@ -581,7 +580,7 @@ impl FrameData { write.write_u8(packed_streams.compression_kind as u8)?; write.write_all(&packed_streams.bytes)?; - let serialized_scopes: Vec<_> = if send_all_scopes { + let serialized_scopes: Vec<_> = if let Some(scope_collection) = scope_collection { let to_serialize_scopes = scope_collection .scopes_by_id() .values() diff --git a/puffin/src/profile_view.rs b/puffin/src/profile_view.rs index 7882f3c4..07c64ab6 100644 --- a/puffin/src/profile_view.rs +++ b/puffin/src/profile_view.rs @@ -229,7 +229,7 @@ impl FrameView { write.write_all(b"PUF0")?; for frame in self.all_uniq() { - frame.write_into(&self.scope_collection, false, write)?; + frame.write_into(None, write)?; } Ok(()) } diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 68a35f23..28527614 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -295,7 +295,6 @@ impl Server { let mut ps_send = PuffinServerSend { clients, num_clients, - send_all_scopes: false, scope_collection: Default::default(), }; @@ -338,6 +337,7 @@ struct Client { client_addr: SocketAddr, packet_tx: Option>, join_handle: Option>, + send_all_scopes: bool, } impl Drop for Client { @@ -386,6 +386,7 @@ impl<'a> PuffinServerConnection<'a> { client_addr, packet_tx: Some(packet_tx), join_handle: Some(join_handle), + send_all_scopes: true, }); self.num_clients .store(self.clients.borrow().len(), Ordering::SeqCst); @@ -406,7 +407,6 @@ impl<'a> PuffinServerConnection<'a> { struct PuffinServerSend { clients: Rc>>, num_clients: Arc, - send_all_scopes: bool, scope_collection: ScopeCollection, } @@ -422,22 +422,36 @@ impl PuffinServerSend { self.scope_collection.insert(new_scope.clone()); }); - let mut packet = vec![]; - + let mut packet = Vec::with_capacity(2048); std::io::Write::write_all(&mut packet, &crate::PROTOCOL_VERSION.to_le_bytes()).unwrap(); frame - .write_into(&self.scope_collection, self.send_all_scopes, &mut packet) + .write_into(None, &mut packet) .context("Encode puffin frame")?; - self.send_all_scopes = false; - let packet: Packet = packet.into(); + //TODO: compute packet_all_scopes only if a client need it + let mut packet_all_scopes = Vec::with_capacity(2048); + std::io::Write::write_all( + &mut packet_all_scopes, + &crate::PROTOCOL_VERSION.to_le_bytes(), + ) + .unwrap(); + frame + .write_into(Some(&self.scope_collection), &mut packet_all_scopes) + .context("Encode puffin frame")?; + let packet_all_scopes: Packet = packet_all_scopes.into(); + // Send frame to clients, remove disconnected clients and update num_clients var let clients = self.clients.borrow(); let mut idx_to_remove = Vec::new(); for (idx, client) in clients.iter().enumerate() { - if !Self::send_to_client(client, packet.clone()).await { + let packet = if client.send_all_scopes { + packet_all_scopes.clone() + } else { + packet.clone() + }; + if !Self::send_to_client(client, packet).await { idx_to_remove.push(idx); } } @@ -446,6 +460,9 @@ impl PuffinServerSend { idx_to_remove.iter().rev().for_each(|idx| { clients.remove(*idx); }); + clients + .iter_mut() + .for_each(|client| client.send_all_scopes = false); self.num_clients.store(clients.len(), Ordering::SeqCst); Ok(()) From 5cf766bb5997766dc028423274132127e3dab79e Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Sun, 22 Dec 2024 16:50:28 +0100 Subject: [PATCH 24/25] wip: improve frames (de)serialization - remove useless vec size serialisation, already done by seq serialization. > breaking change: introduce new frames format - remove some temporary vec allocation during frames (de)serialization --- puffin/src/frame_data.rs | 56 +++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/puffin/src/frame_data.rs b/puffin/src/frame_data.rs index 8efbdb0b..01eaef1a 100644 --- a/puffin/src/frame_data.rs +++ b/puffin/src/frame_data.rs @@ -562,15 +562,13 @@ impl FrameData { pub fn write_into( &self, scope_collection: Option<&crate::ScopeCollection>, - write: &mut impl std::io::Write, + mut write: &mut impl std::io::Write, ) -> anyhow::Result<()> { use bincode::Options as _; use byteorder::WriteBytesExt as _; write.write_all(b"PFD5")?; - - let meta_serialized = bincode::options().serialize(&self.meta)?; - write.write_all(&meta_serialized)?; + bincode::options().serialize_into(&mut write, &self.meta)?; self.create_packed(); let packed_streams_lock = self.data.read(); @@ -580,18 +578,25 @@ impl FrameData { write.write_u8(packed_streams.compression_kind as u8)?; write.write_all(&packed_streams.bytes)?; - let serialized_scopes: Vec<_> = if let Some(scope_collection) = scope_collection { - let to_serialize_scopes = scope_collection - .scopes_by_id() - .values() - .cloned() - .collect::>(); - bincode::options().serialize(&to_serialize_scopes) - } else { - bincode::options().serialize(&self.scope_delta) - }?; + struct Wrapper<'a>(&'a crate::ScopeCollection); //TODO: rename, and move + + impl<'a> serde::Serialize for Wrapper<'a> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let iter = self.0.scopes_by_id().values(); + serializer.collect_seq(iter) + } + } + + scope_collection + .map(|collection| { + let scopes_wrapper = Wrapper(collection); + bincode::options().serialize_into(&mut write, &scopes_wrapper) + }) + .unwrap_or_else(|| bincode::options().serialize_into(write, &self.scope_delta))?; - write.write_all(&serialized_scopes)?; Ok(()) } @@ -600,7 +605,7 @@ impl FrameData { /// [`None`] is returned if the end of the stream is reached (EOF), /// or an end-of-stream sentinel of `0u32` is read. #[cfg(feature = "serialization")] - pub fn read_next(read: &mut impl std::io::Read) -> anyhow::Result> { + pub fn read_next(mut read: &mut impl std::io::Read) -> anyhow::Result> { use anyhow::Context as _; use bincode::Options as _; use byteorder::{ReadBytesExt, LE}; @@ -781,12 +786,10 @@ impl FrameData { full_delta: false, })) } else if &header == b"PFD5" { - // Added 2024-12-22: remove useless manual sequence size serialization. + // Added 2024-12-22: remove useless manual read seq length, and intermediate vector. let meta = { - let mut meta = Vec::new(); - read.read_exact(&mut meta)?; bincode::options() - .deserialize(&meta) + .deserialize_from(&mut read) .context("bincode deserialize")? }; @@ -798,23 +801,16 @@ impl FrameData { PackedStreams::new(compression_kind, streams_compressed) }; - let deserialized_scopes: Vec = { - let mut serialized_scopes = Vec::new(); - read.read_exact(&mut serialized_scopes)?; + let deserialized_scopes: Vec> = { bincode::options() - .deserialize_from(serialized_scopes.as_slice()) + .deserialize_from(read) // serialized_scopes.as_slice() .context("Can not deserialize scope details")? }; - let new_scopes: Vec<_> = deserialized_scopes - .into_iter() - .map(|x| Arc::new(x.clone())) - .collect(); - Ok(Some(Self { meta, data: RwLock::new(FrameDataState::Packed(streams_compressed)), - scope_delta: new_scopes, + scope_delta: deserialized_scopes, full_delta: false, })) } else { From 80dad3e7b1e5d7ff95d7f79eefeeab6cc64b2306 Mon Sep 17 00:00:00 2001 From: Gwen Lg Date: Tue, 14 Jan 2025 21:23:03 +0100 Subject: [PATCH 25/25] wip: use async unsend::RwLock for clients list --- Cargo.lock | 10 ++++++++++ puffin_http/Cargo.toml | 1 + puffin_http/src/server.rs | 21 +++++++++++---------- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9be29b18..5082c5fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2036,6 +2036,7 @@ dependencies = [ "paste", "puffin", "simple_logger", + "unsend", ] [[package]] @@ -2611,6 +2612,15 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" +[[package]] +name = "unsend" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a53ed0f41cab22aa8d4aee9d2357a1b21cb365537d7349066c76fcd67bc3121d" +dependencies = [ + "pin-project-lite", +] + [[package]] name = "url" version = "2.5.0" diff --git a/puffin_http/Cargo.toml b/puffin_http/Cargo.toml index 6bccbb90..8d7fd93d 100644 --- a/puffin_http/Cargo.toml +++ b/puffin_http/Cargo.toml @@ -26,6 +26,7 @@ puffin = { version = "0.19.1", path = "../puffin", features = [ "lz4", "serialization", ] } +unsend = { version = "0.2.1", default-features = false } #smol = "2.0" [dev-dependencies] diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 28527614..97964ac3 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -1,16 +1,17 @@ use anyhow::Context as _; use async_executor::{LocalExecutor, Task}; use async_net::{SocketAddr, TcpListener, TcpStream}; + use futures_lite::{future, AsyncWriteExt}; use puffin::{FrameSinkId, GlobalProfiler, ScopeCollection}; use std::{ - cell::RefCell, rc::Rc, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, }; +use unsend::lock::RwLock; /// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough. const MAX_FRAMES_IN_QUEUE: usize = 30; @@ -267,7 +268,7 @@ impl Server { ) -> anyhow::Result<()> { let executor = Rc::new(LocalExecutor::new()); - let clients = Rc::new(RefCell::new(Vec::new())); + let clients = Rc::new(RwLock::new(Vec::new())); let clients_cloned = clients.clone(); let num_clients_cloned = num_clients.clone(); @@ -304,6 +305,7 @@ impl Server { log::warn!("puffin server failure: {}", err); } } + log::trace!("End to Wait frame to send"); }); //.context("Couldn't spawn ps-send task")?; @@ -358,7 +360,7 @@ impl Drop for Client { struct PuffinServerConnection<'a> { executor: Rc>, tcp_listener: TcpListener, - clients: Rc>>, + clients: Rc>>, num_clients: Arc, } @@ -382,14 +384,14 @@ impl<'a> PuffinServerConnection<'a> { // Send all scopes when new client connects. // TODO: send all previous scopes at connection, not on regular send //self.send_all_scopes = true; - self.clients.borrow_mut().push(Client { + self.clients.write().await.push(Client { client_addr, packet_tx: Some(packet_tx), join_handle: Some(join_handle), send_all_scopes: true, }); self.num_clients - .store(self.clients.borrow().len(), Ordering::SeqCst); + .store(self.clients.read().await.len(), Ordering::SeqCst); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { break; // Nothing to do for now. @@ -405,14 +407,14 @@ impl<'a> PuffinServerConnection<'a> { /// streams to client puffin profiler data. struct PuffinServerSend { - clients: Rc>>, + clients: Rc>>, num_clients: Arc, scope_collection: ScopeCollection, } impl PuffinServerSend { pub async fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> { - if self.clients.borrow().is_empty() { + if self.clients.read().await.is_empty() { return Ok(()); } puffin::profile_function!(); @@ -443,9 +445,8 @@ impl PuffinServerSend { let packet_all_scopes: Packet = packet_all_scopes.into(); // Send frame to clients, remove disconnected clients and update num_clients var - let clients = self.clients.borrow(); let mut idx_to_remove = Vec::new(); - for (idx, client) in clients.iter().enumerate() { + for (idx, client) in self.clients.read().await.iter().enumerate() { let packet = if client.send_all_scopes { packet_all_scopes.clone() } else { @@ -456,7 +457,7 @@ impl PuffinServerSend { } } - let mut clients = self.clients.borrow_mut(); + let mut clients = self.clients.write().await; idx_to_remove.iter().rev().for_each(|idx| { clients.remove(*idx); });