diff --git a/Cargo.lock b/Cargo.lock index 94eff0a0..5082c5fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -156,6 +156,78 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "175571dd1d178ced59193a6fc02dde1b972eb0bc56c892cde9beeceac5bf0f6b" +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ca9a001c1e8ba5149f91a74362376cc6bc5b919d92d988668657bd570bdcec" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "slab", +] + +[[package]] +name = "async-io" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6baa8f0178795da0e71bc42c9e5d13261aac7ee549853162e66a241ba17964" +dependencies = [ + "async-lock", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "tracing", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-net" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b948000fad4873c1c9339d60f2623323a0cfd3816e5181033c6a5cb68b2accf7" +dependencies = [ + "async-io", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "atk-sys" version = "0.15.1" @@ -244,6 +316,19 @@ dependencies = [ "objc2 0.5.2", ] +[[package]] +name = "blocking" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -456,9 +541,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" dependencies = [ "crossbeam-utils", ] @@ -548,15 +633,6 @@ dependencies = [ "itertools", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176dc175b78f56c0f321911d9c8eb2b77a78a4860b9c19db83835fea1a46649b" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -854,6 +930,33 @@ dependencies = [ "str-buf", ] +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" +dependencies = [ + "event-listener", + "pin-project-lite", +] + +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "fdeflate" version = "0.3.3" @@ -873,6 +976,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "foreign-types" version = "0.5.0" @@ -909,6 +1024,37 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-lite" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + [[package]] name = "gdk-pixbuf-sys" version = "0.15.10" @@ -956,8 +1102,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1403,6 +1551,15 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "natord" version = "1.0.9" @@ -1689,6 +1846,12 @@ dependencies = [ "system-deps", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1730,6 +1893,17 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "piper" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkg-config" version = "0.3.28" @@ -1852,13 +2026,17 @@ name = "puffin_http" version = "0.16.1" dependencies = [ "anyhow", - "crossbeam-channel", + "async-executor", + "async-net", + "flume", + "futures-lite", "log", "once_cell", "parking_lot", "paste", "puffin", "simple_logger", + "unsend", ] [[package]] @@ -2187,6 +2365,15 @@ dependencies = [ "serde", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -2425,6 +2612,15 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" +[[package]] +name = "unsend" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a53ed0f41cab22aa8d4aee9d2357a1b21cb365537d7349066c76fcd67bc3121d" +dependencies = [ + "pin-project-lite", +] + [[package]] name = "url" version = "2.5.0" diff --git a/puffin/src/frame_data.rs b/puffin/src/frame_data.rs index ed29d188..01eaef1a 100644 --- a/puffin/src/frame_data.rs +++ b/puffin/src/frame_data.rs @@ -561,18 +561,14 @@ impl FrameData { #[cfg(feature = "serialization")] pub fn write_into( &self, - scope_collection: &crate::ScopeCollection, - send_all_scopes: bool, - write: &mut impl std::io::Write, + scope_collection: Option<&crate::ScopeCollection>, + mut write: &mut impl std::io::Write, ) -> anyhow::Result<()> { use bincode::Options as _; - use byteorder::{WriteBytesExt as _, LE}; + use byteorder::WriteBytesExt as _; - let meta_serialized = bincode::options().serialize(&self.meta)?; - - write.write_all(b"PFD4")?; - write.write_all(&(meta_serialized.len() as u32).to_le_bytes())?; - write.write_all(&meta_serialized)?; + write.write_all(b"PFD5")?; + bincode::options().serialize_into(&mut write, &self.meta)?; self.create_packed(); let packed_streams_lock = self.data.read(); @@ -582,15 +578,25 @@ impl FrameData { write.write_u8(packed_streams.compression_kind as u8)?; write.write_all(&packed_streams.bytes)?; - let to_serialize_scopes: Vec<_> = if send_all_scopes { - scope_collection.scopes_by_id().values().cloned().collect() - } else { - self.scope_delta.clone() - }; + struct Wrapper<'a>(&'a crate::ScopeCollection); //TODO: rename, and move + + impl<'a> serde::Serialize for Wrapper<'a> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let iter = self.0.scopes_by_id().values(); + serializer.collect_seq(iter) + } + } + + scope_collection + .map(|collection| { + let scopes_wrapper = Wrapper(collection); + bincode::options().serialize_into(&mut write, &scopes_wrapper) + }) + .unwrap_or_else(|| bincode::options().serialize_into(write, &self.scope_delta))?; - let serialized_scopes = bincode::options().serialize(&to_serialize_scopes)?; - write.write_u32::(serialized_scopes.len() as u32)?; - write.write_all(&serialized_scopes)?; Ok(()) } @@ -599,7 +605,7 @@ impl FrameData { /// [`None`] is returned if the end of the stream is reached (EOF), /// or an end-of-stream sentinel of `0u32` is read. #[cfg(feature = "serialization")] - pub fn read_next(read: &mut impl std::io::Read) -> anyhow::Result> { + pub fn read_next(mut read: &mut impl std::io::Read) -> anyhow::Result> { use anyhow::Context as _; use bincode::Options as _; use byteorder::{ReadBytesExt, LE}; @@ -779,6 +785,34 @@ impl FrameData { scope_delta: new_scopes, full_delta: false, })) + } else if &header == b"PFD5" { + // Added 2024-12-22: remove useless manual read seq length, and intermediate vector. + let meta = { + bincode::options() + .deserialize_from(&mut read) + .context("bincode deserialize")? + }; + + let streams_compressed_length = read.read_u32::()? as usize; + let compression_kind = CompressionKind::from_u8(read.read_u8()?)?; + let streams_compressed = { + let mut streams_compressed = vec![0_u8; streams_compressed_length]; + read.read_exact(&mut streams_compressed)?; + PackedStreams::new(compression_kind, streams_compressed) + }; + + let deserialized_scopes: Vec> = { + bincode::options() + .deserialize_from(read) // serialized_scopes.as_slice() + .context("Can not deserialize scope details")? + }; + + Ok(Some(Self { + meta, + data: RwLock::new(FrameDataState::Packed(streams_compressed)), + scope_delta: deserialized_scopes, + full_delta: false, + })) } else { anyhow::bail!("Failed to decode: this data is newer than this reader. Please update your puffin version!"); } diff --git a/puffin/src/profile_view.rs b/puffin/src/profile_view.rs index 7882f3c4..07c64ab6 100644 --- a/puffin/src/profile_view.rs +++ b/puffin/src/profile_view.rs @@ -229,7 +229,7 @@ impl FrameView { write.write_all(b"PUF0")?; for frame in self.all_uniq() { - frame.write_into(&self.scope_collection, false, write)?; + frame.write_into(None, write)?; } Ok(()) } diff --git a/puffin_http/Cargo.toml b/puffin_http/Cargo.toml index 58f038a8..8d7fd93d 100644 --- a/puffin_http/Cargo.toml +++ b/puffin_http/Cargo.toml @@ -15,7 +15,10 @@ include = ["**/*.rs", "Cargo.toml", "README.md"] [dependencies] anyhow = "1.0" -crossbeam-channel = "0.5" +async-executor = "1.13" +async-net = "2.0.0" +flume = "0.11" +futures-lite = { version = "2.6", default-features = false } log = "0.4" parking_lot = "0.12" puffin = { version = "0.19.1", path = "../puffin", features = [ @@ -23,6 +26,8 @@ puffin = { version = "0.19.1", path = "../puffin", features = [ "lz4", "serialization", ] } +unsend = { version = "0.2.1", default-features = false } +#smol = "2.0" [dev-dependencies] simple_logger = "4.2" diff --git a/puffin_http/src/server.rs b/puffin_http/src/server.rs index 4af136cc..97964ac3 100644 --- a/puffin_http/src/server.rs +++ b/puffin_http/src/server.rs @@ -1,13 +1,17 @@ use anyhow::Context as _; -use puffin::{FrameSinkId, FrameView, GlobalProfiler}; +use async_executor::{LocalExecutor, Task}; +use async_net::{SocketAddr, TcpListener, TcpStream}; + +use futures_lite::{future, AsyncWriteExt}; +use puffin::{FrameSinkId, GlobalProfiler, ScopeCollection}; use std::{ - io::Write, - net::{SocketAddr, TcpListener, TcpStream}, + rc::Rc, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, }; +use unsend::lock::RwLock; /// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough. const MAX_FRAMES_IN_QUEUE: usize = 30; @@ -226,44 +230,22 @@ impl Server { sink_install: fn(puffin::FrameSink) -> FrameSinkId, sink_remove: fn(FrameSinkId) -> (), ) -> anyhow::Result { - let tcp_listener = TcpListener::bind(bind_addr).context("binding server TCP socket")?; - tcp_listener - .set_nonblocking(true) - .context("TCP set_nonblocking")?; + let bind_addr = String::from(bind_addr); - // We use crossbeam_channel instead of `mpsc`, + // We use flume instead of `mpsc`, // because on shutdown we want all frames to be sent. // `mpsc::Receiver` stops receiving as soon as the `Sender` is dropped, - // but `crossbeam_channel` will continue until the channel is empty. - let (tx, rx): (crossbeam_channel::Sender>, _) = - crossbeam_channel::unbounded(); + // but `flume` will continue until the channel is empty. + let (tx, rx): (flume::Sender>, _) = flume::unbounded(); let num_clients = Arc::new(AtomicUsize::default()); let num_clients_cloned = num_clients.clone(); - let join_handle = std::thread::Builder::new() .name("puffin-server".to_owned()) .spawn(move || { - let mut server_impl = PuffinServerImpl { - tcp_listener, - clients: Default::default(), - num_clients: num_clients_cloned, - send_all_scopes: false, - frame_view: Default::default(), - }; - - while let Ok(frame) = rx.recv() { - server_impl.frame_view.add_frame(frame.clone()); - if let Err(err) = server_impl.accept_new_clients() { - log::warn!("puffin server failure: {}", err); - } - - if let Err(err) = server_impl.send(&frame) { - log::warn!("puffin server failure: {}", err); - } - } + Server::run(bind_addr, rx, num_clients_cloned).unwrap(); }) - .context("Couldn't spawn thread")?; + .context("Can't start puffin-server thread.")?; // Call the `install` function to add ourselves as a sink let sink_id = sink_install(Box::new(move |frame| { @@ -278,6 +260,61 @@ impl Server { }) } + /// start and run puffin server service + pub fn run( + bind_addr: String, + rx: flume::Receiver>, + num_clients: Arc, + ) -> anyhow::Result<()> { + let executor = Rc::new(LocalExecutor::new()); + + let clients = Rc::new(RwLock::new(Vec::new())); + let clients_cloned = clients.clone(); + let num_clients_cloned = num_clients.clone(); + + let executor_cloned = executor.clone(); + let psconnect_handle = //task::Builder::new() + //.name("ps-connect".to_owned()) + executor.spawn(async move { + log::trace!("Start listening on {bind_addr}"); + let tcp_listener = TcpListener::bind(bind_addr) + .await + .context("binding server TCP socket")?; + + let mut ps_connection = PuffinServerConnection { + executor: executor_cloned, + tcp_listener, + clients: clients_cloned, + num_clients: num_clients_cloned, + }; + ps_connection.accept_new_clients().await + }); + + let pssend_handle = //task::Builder::new() + //.name("ps-send".to_owned()) + executor.spawn(async move { + let mut ps_send = PuffinServerSend { + clients, + num_clients, + scope_collection: Default::default(), + }; + + log::trace!("Wait frame to send"); + while let Ok(frame) = rx.recv_async().await { + if let Err(err) = ps_send.send(&frame).await { + log::warn!("puffin server failure: {}", err); + } + } + log::trace!("End to Wait frame to send"); + }); + //.context("Couldn't spawn ps-send task")?; + + let (con_res, _) = future::block_on( + executor.run(async { future::zip(psconnect_handle, pssend_handle).await }), + ); + con_res.context("Client connection management task") + } + /// Number of clients currently connected. pub fn num_clients(&self) -> usize { self.num_clients.load(Ordering::SeqCst) @@ -296,12 +333,13 @@ impl Drop for Server { } } -type Packet = Arc<[u8]>; +type Packet = Rc<[u8]>; struct Client { client_addr: SocketAddr, - packet_tx: Option>, - join_handle: Option>, + packet_tx: Option>, + join_handle: Option>, + send_all_scopes: bool, } impl Drop for Client { @@ -313,47 +351,47 @@ impl Drop for Client { // Wait for the shutdown: if let Some(join_handle) = self.join_handle.take() { - join_handle.join().ok(); + future::block_on(join_handle); // .ok() } } } /// Listens for incoming connections -/// and streams them puffin profiler data. -struct PuffinServerImpl { +struct PuffinServerConnection<'a> { + executor: Rc>, tcp_listener: TcpListener, - clients: Vec, + clients: Rc>>, num_clients: Arc, - send_all_scopes: bool, - frame_view: FrameView, } -impl PuffinServerImpl { - fn accept_new_clients(&mut self) -> anyhow::Result<()> { +impl<'a> PuffinServerConnection<'a> { + async fn accept_new_clients(&mut self) -> anyhow::Result<()> { loop { - match self.tcp_listener.accept() { + match self.tcp_listener.accept().await { Ok((tcp_stream, client_addr)) => { - tcp_stream - .set_nonblocking(false) - .context("stream.set_nonblocking")?; - + puffin::profile_scope!("accept_client"); log::info!("{} connected", client_addr); - let (packet_tx, packet_rx) = crossbeam_channel::bounded(MAX_FRAMES_IN_QUEUE); + let (packet_tx, packet_rx) = flume::bounded(MAX_FRAMES_IN_QUEUE); - let join_handle = std::thread::Builder::new() - .name("puffin-server-client".to_owned()) - .spawn(move || client_loop(packet_rx, client_addr, tcp_stream)) - .context("Couldn't spawn thread")?; + let join_handle = //task::Builder::new() + //.name("ps-client".to_owned()) + self.executor.spawn(async move { + client_loop(packet_rx, client_addr, tcp_stream).await; + }); + //.context("Couldn't spawn ps-client task")?; // Send all scopes when new client connects. - self.send_all_scopes = true; - self.clients.push(Client { + // TODO: send all previous scopes at connection, not on regular send + //self.send_all_scopes = true; + self.clients.write().await.push(Client { client_addr, packet_tx: Some(packet_tx), join_handle: Some(join_handle), + send_all_scopes: true, }); - self.num_clients.store(self.clients.len(), Ordering::SeqCst); + self.num_clients + .store(self.clients.read().await.len(), Ordering::SeqCst); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { break; // Nothing to do for now. @@ -365,64 +403,110 @@ impl PuffinServerImpl { } Ok(()) } +} + +/// streams to client puffin profiler data. +struct PuffinServerSend { + clients: Rc>>, + num_clients: Arc, + scope_collection: ScopeCollection, +} - pub fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> { - if self.clients.is_empty() { +impl PuffinServerSend { + pub async fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> { + if self.clients.read().await.is_empty() { return Ok(()); } puffin::profile_function!(); - let mut packet = vec![]; + // Keep scope_collection up-to-date + frame.scope_delta.iter().for_each(|new_scope| { + self.scope_collection.insert(new_scope.clone()); + }); - packet - .write_all(&crate::PROTOCOL_VERSION.to_le_bytes()) - .unwrap(); + let mut packet = Vec::with_capacity(2048); + std::io::Write::write_all(&mut packet, &crate::PROTOCOL_VERSION.to_le_bytes()).unwrap(); frame - .write_into( - self.frame_view.scope_collection(), - self.send_all_scopes, - &mut packet, - ) + .write_into(None, &mut packet) .context("Encode puffin frame")?; - self.send_all_scopes = false; - let packet: Packet = packet.into(); - self.clients.retain(|client| match &client.packet_tx { + //TODO: compute packet_all_scopes only if a client need it + let mut packet_all_scopes = Vec::with_capacity(2048); + std::io::Write::write_all( + &mut packet_all_scopes, + &crate::PROTOCOL_VERSION.to_le_bytes(), + ) + .unwrap(); + frame + .write_into(Some(&self.scope_collection), &mut packet_all_scopes) + .context("Encode puffin frame")?; + let packet_all_scopes: Packet = packet_all_scopes.into(); + + // Send frame to clients, remove disconnected clients and update num_clients var + let mut idx_to_remove = Vec::new(); + for (idx, client) in self.clients.read().await.iter().enumerate() { + let packet = if client.send_all_scopes { + packet_all_scopes.clone() + } else { + packet.clone() + }; + if !Self::send_to_client(client, packet).await { + idx_to_remove.push(idx); + } + } + + let mut clients = self.clients.write().await; + idx_to_remove.iter().rev().for_each(|idx| { + clients.remove(*idx); + }); + clients + .iter_mut() + .for_each(|client| client.send_all_scopes = false); + self.num_clients.store(clients.len(), Ordering::SeqCst); + + Ok(()) + } + + async fn send_to_client(client: &Client, packet: Packet) -> bool { + puffin::profile_function!(); + match &client.packet_tx { None => false, - Some(packet_tx) => match packet_tx.try_send(packet.clone()) { + Some(packet_tx) => match packet_tx.send_async(packet).await { Ok(()) => true, - Err(crossbeam_channel::TrySendError::Disconnected(_)) => false, - Err(crossbeam_channel::TrySendError::Full(_)) => { - log::info!( - "puffin client {} is not accepting data fast enough; dropping a frame", - client.client_addr - ); + Err(err) => { + log::info!("puffin send error: {} for '{}'", err, client.client_addr); true } }, - }); - self.num_clients.store(self.clients.len(), Ordering::SeqCst); - - Ok(()) + } } } -fn client_loop( - packet_rx: crossbeam_channel::Receiver, +async fn client_loop( + packet_rx: flume::Receiver, client_addr: SocketAddr, mut tcp_stream: TcpStream, ) { - while let Ok(packet) = packet_rx.recv() { - if let Err(err) = tcp_stream.write_all(&packet) { - log::info!( - "puffin server failed sending to {}: {} (kind: {:?})", - client_addr, - err, - err.kind() - ); - break; + loop { + match packet_rx.recv_async().await { + Ok(packet) => { + puffin::profile_scope!("write frame to client"); + if let Err(err) = tcp_stream.write_all(&packet).await { + log::info!( + "puffin server failed sending to {}: {} (kind: {:?})", + client_addr, + err, + err.kind() + ); + break; + } + } + Err(err) => { + log::info!("Error in client_loop: {}", err); + break; + } } } }