Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 72 additions & 34 deletions puffin_http/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use anyhow::Context as _;
use crossbeam_channel::{Receiver, Sender};
use parking_lot::RwLock;
use puffin::{FrameSinkId, GlobalProfiler, ScopeCollection};
use std::{
io::Write as _,
Expand All @@ -22,6 +24,7 @@ pub struct Server {
join_handle: Option<std::thread::JoinHandle<()>>,
num_clients: Arc<AtomicUsize>,
sink_remove: fn(FrameSinkId) -> (),
wait_client: Receiver<()>,
}

impl Server {
Expand Down Expand Up @@ -246,32 +249,46 @@ impl Server {
// but `crossbeam_channel` will continue until the channel is empty.
let (tx, rx): (crossbeam_channel::Sender<Arc<puffin::FrameData>>, _) =
crossbeam_channel::unbounded();
let (client_connected, wait_client) = crossbeam_channel::bounded(0);

let clients = Arc::new(RwLock::new(Vec::new()));
let num_clients = Arc::new(AtomicUsize::default());
let num_clients_cloned = num_clients.clone();

let clients_send = clients.clone();
let num_clients_send = num_clients.clone();
let join_handle = std::thread::Builder::new()
.name("puffin-server".to_owned())
.name("pf-server-send".to_owned())
.spawn(move || {
let mut server_impl = PuffinServerImpl {
tcp_listener,
clients: Default::default(),
num_clients: num_clients_cloned,
send_all_scopes: false,
clients: clients_send,
num_clients: num_clients_send,
//send_all_scopes: false,
scope_collection: Default::default(),
};

while let Ok(frame) = rx.recv() {
if let Err(err) = server_impl.accept_new_clients() {
log::warn!("puffin server failure: {err}");
}

if let Err(err) = server_impl.send(&frame) {
log::warn!("puffin server failure: {err}");
}
}
})
.context("Couldn't spawn thread")?;
.context("spawn thread `pf-server-send`")?;

let num_clients_wait = num_clients.clone();
let _handle_accept = std::thread::Builder::new()
.name("pf-server-client".to_owned())
.spawn(move || {
let rdv_channel_client = client_connected;
let ps_connection = PuffinServerConnection {
tcp_listener,
clients: clients.clone(),
num_clients: num_clients_wait,
};
if let Err(err) = ps_connection.accept_new_clients(&rdv_channel_client) {
log::warn!("pf-server-client failure: {err}");
}
})
.context("spawn pf-server-client thread")?;

// Call the `install` function to add ourselves as a sink
let sink_id = sink_install(Box::new(move |frame| {
Expand All @@ -283,13 +300,23 @@ impl Server {
join_handle: Some(join_handle),
num_clients,
sink_remove,
wait_client,
})
}

/// Number of clients currently connected.
pub fn num_clients(&self) -> usize {
self.num_clients.load(Ordering::SeqCst)
}

/// Block thread to wait at least a puffin client.
///
/// # Errors
/// Return error from channel.
pub fn wait_client(&self) -> Result<(), crossbeam_channel::RecvError> {
log::info!("Wait puffin_http client");
self.wait_client.recv()
}
}

impl Drop for Server {
Expand Down Expand Up @@ -327,17 +354,13 @@ impl Drop for Client {
}

/// Listens for incoming connections
/// and streams them puffin profiler data.
struct PuffinServerImpl {
struct PuffinServerConnection {
tcp_listener: TcpListener,
clients: Vec<Client>,
clients: Arc<RwLock<Vec<Client>>>,
num_clients: Arc<AtomicUsize>,
send_all_scopes: bool,
scope_collection: ScopeCollection,
}

impl PuffinServerImpl {
fn accept_new_clients(&mut self) -> anyhow::Result<()> {
impl PuffinServerConnection {
fn accept_new_clients(&self, rdv_channel: &Sender<()>) -> anyhow::Result<()> {
loop {
match self.tcp_listener.accept() {
Ok((tcp_stream, client_addr)) => {
Expand All @@ -350,30 +373,42 @@ impl PuffinServerImpl {
let (packet_tx, packet_rx) = crossbeam_channel::bounded(MAX_FRAMES_IN_QUEUE);

let join_handle = std::thread::Builder::new()
.name("puffin-server-client".to_owned())
.name("pf-server-client".to_owned())
.spawn(move || client_loop(packet_rx, client_addr, tcp_stream))
.context("Couldn't spawn thread")?;
.context("Couldn't spawn thread `pf-server-client`")?;

// Send all scopes when new client connects.
self.send_all_scopes = true;
self.clients.push(Client {
// TODO: send all previous scopes at connection, not on regular send
//self.send_all_scopes = true;
self.clients.write().push(Client {
client_addr,
packet_tx: Some(packet_tx),
join_handle: Some(join_handle),
});
self.num_clients.store(self.clients.len(), Ordering::SeqCst);
self.num_clients
.store(self.clients.read().len(), Ordering::SeqCst);
rdv_channel.send(())?;
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
break; // Nothing to do for now.
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
// Nothing to do for now. Continue looping
}
Err(e) => {
anyhow::bail!("puffin server TCP error: {:?}", e);
Err(err) => {
log::error!("puffin server TCP error: {err:?}");
}
}
}
Ok(())
}
}

/// streams to client puffin profiler data.
struct PuffinServerImpl {
clients: Arc<RwLock<Vec<Client>>>,
num_clients: Arc<AtomicUsize>,
//send_all_scopes: bool,
scope_collection: ScopeCollection,
}

impl PuffinServerImpl {
pub fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> {
puffin::profile_function!();

Expand All @@ -383,7 +418,7 @@ impl PuffinServerImpl {
}

// Nothing to send if no clients => Early return.
if self.clients.is_empty() {
if self.clients.read().is_empty() {
return Ok(());
}

Expand All @@ -393,7 +428,8 @@ impl PuffinServerImpl {
.write_all(&crate::PROTOCOL_VERSION.to_le_bytes())
.context("Encode puffin `PROTOCOL_VERSION` in packet to be send to client.")?;

let scope_collection = if self.send_all_scopes {
let scope_collection = if true {
// self.send_all_scopes
Some(&self.scope_collection)
} else {
None
Expand All @@ -402,11 +438,12 @@ impl PuffinServerImpl {
frame
.write_into(scope_collection, &mut packet)
.context("Encode puffin frame")?;
self.send_all_scopes = false;
//self.send_all_scopes = false;

let packet: Packet = packet.into();

self.clients.retain(|client| match &client.packet_tx {
let mut clients = self.clients.write();
clients.retain(|client| match &client.packet_tx {
None => false,
Some(packet_tx) => match packet_tx.try_send(packet.clone()) {
Ok(()) => true,
Expand All @@ -420,7 +457,8 @@ impl PuffinServerImpl {
}
},
});
self.num_clients.store(self.clients.len(), Ordering::SeqCst);
self.num_clients.store(clients.len(), Ordering::SeqCst);
drop(clients);

Ok(())
}
Expand Down
Loading