From 9f0303ba523182d0ff3cd46e7e7e5fb513e9d134 Mon Sep 17 00:00:00 2001 From: bruingineer Date: Sun, 4 Jan 2026 01:31:44 -0800 Subject: [PATCH] refactor recv to a loop instead of recursion --- src/receive.rs | 205 +++++++++++++++++++++++-------------------------- 1 file changed, 95 insertions(+), 110 deletions(-) diff --git a/src/receive.rs b/src/receive.rs index 3e15eee..5d2990b 100644 --- a/src/receive.rs +++ b/src/receive.rs @@ -32,10 +32,10 @@ use uuid::Uuid; use std::cmp::{Ordering, max}; use std::collections::HashMap; -use std::fmt; use std::io::Read; use std::net::{Ipv4Addr, SocketAddr}; use std::time::{Duration, Instant}; +use std::{fmt, io}; /// Extra net imports required for the IPv6 handling on the linux side. #[cfg(not(target_os = "windows"))] @@ -504,128 +504,113 @@ impl SacnReceiver { return Err(SacnError::NoDataUniversesRegistered()); } - self.sequences.check_timeouts(self.announce_timeout)?; - self.check_waiting_data_timeouts(); - + // if timeout is 0, then it's time to return if timeout == Some(Duration::from_secs(0)) { - if cfg!(target_os = "windows") { + // always check timeouts + self.sequences.check_timeouts(self.announce_timeout)?; + self.check_waiting_data_timeouts(); + return Err(io::Error::new( // Use the right expected error for the operating system. - return Err(std::io::Error::new( - std::io::ErrorKind::TimedOut, - "No data available in given timeout", - ) - .into()); - } else { - return Err(std::io::Error::new( - std::io::ErrorKind::WouldBlock, - "No data available in given timeout", - ) - .into()); - } + if cfg!(target_os = "windows") { + io::ErrorKind::TimedOut + } else { + io::ErrorKind::WouldBlock + }, + "No data available in given timeout", + ) + .into()); } - // Forces the actual timeout used for receiving from the underlying network to never exceed E131_NETWORK_DATA_LOSS_TIMEOUT. - // This means that the timeouts for the sequence numbers will be checked at least every E131_NETWORK_DATA_LOSS_TIMEOUT even if - // recv is called with a longer timeout. - let actual_timeout = - if timeout.is_some() && timeout.unwrap() < E131_NETWORK_DATA_LOSS_TIMEOUT { - timeout - } else { - Some(E131_NETWORK_DATA_LOSS_TIMEOUT) - }; - - self.receiver.set_timeout(actual_timeout)?; // "Failed to sent a timeout value for the receiver" - let start_time = Instant::now(); + // Fixed instant that should return the whole recv call + let deadline = timeout.and_then(|t| Instant::now().checked_add(t)); + // shared buf through loop iterations let mut buf: [u8; RCV_BUF_DEFAULT_SIZE] = [0; RCV_BUF_DEFAULT_SIZE]; - match self.receiver.recv(&mut buf) { - Ok(pkt) => { - let pdu: E131RootLayer = pkt.pdu; - let data: E131RootLayerData = pdu.data; - let res = match data { - DataPacket(d) => self.handle_data_packet(pdu.cid, d)?, - SynchronizationPacket(s) => self.handle_sync_packet(pdu.cid, s)?, - UniverseDiscoveryPacket(u) => { - let discovered_src: Option = - self.handle_universe_discovery_packet(pdu.cid, u); - if let Some(src) = discovered_src - && self.announce_source_discovery - { - return Err(SacnError::SourceDiscovered(src)); - }; - None + + loop { + self.sequences.check_timeouts(self.announce_timeout)?; + self.check_waiting_data_timeouts(); + + // In the case of `timeout` being longer than `E131_NETWORK_DATA_LOSS_TIMEOUT`: + // Forces the actual timeout used for receiving from the underlying network to never exceed E131_NETWORK_DATA_LOSS_TIMEOUT. + // This means that the timeouts for the sequence numbers will be checked at least every E131_NETWORK_DATA_LOSS_TIMEOUT even if + // recv is called with a longer timeout. + let remaining = match deadline { + None => None, // set to data loss timeout below so timeouts are checked again. + Some(dl) => { + let now = Instant::now(); + if now >= dl { + // timeout expired + return Err(io::Error::new( + if cfg!(target_os = "windows") { + io::ErrorKind::TimedOut + } else { + io::ErrorKind::WouldBlock + }, + "No data available in given timeout", + ) + .into()); } - }; + Some(dl - now) + } + }; + + let actual_timeout = if let Some(rem) = remaining { + rem.min(E131_NETWORK_DATA_LOSS_TIMEOUT) + } else { + E131_NETWORK_DATA_LOSS_TIMEOUT + }; - match res { - Some(r) => Ok(r), - None => { - // Indicates that there is no data ready to pass up yet even if a packet was received. - // To stop recv blocking forever with a non-None timeout due to packets being received consistently (that reset the timeout) - // within the receive timeout (e.g. universe discovery packets if the discovery interval < timeout) the timeout needs to be - // adjusted to account for the time already taken. - if let Some(timeout) = timeout { - let elapsed = start_time.elapsed(); - match timeout.checked_sub(elapsed) { - None => { - // Indicates that elapsed is bigger than timeout so its time to return. - Err(std::io::Error::new( - std::io::ErrorKind::WouldBlock, - "No data available in given timeout", - ) - .into()) - } - Some(new_timeout) => self.recv(Some(new_timeout)), + self.receiver.set_timeout(Some(actual_timeout))?; // "Failed to set a timeout value for the receiver" + + // Zero out the buffer before receiving. This may be redundant since recv should pack the whole buffer. + buf.fill(0); + + match self.receiver.recv(&mut buf) { + Ok(pkt) => { + let pdu: E131RootLayer = pkt.pdu; + let data: E131RootLayerData = pdu.data; + let res = match data { + DataPacket(d) => self.handle_data_packet(pdu.cid, d)?, + SynchronizationPacket(s) => self.handle_sync_packet(pdu.cid, s)?, + UniverseDiscoveryPacket(u) => { + let discovered_src: Option = + self.handle_universe_discovery_packet(pdu.cid, u); + if let Some(src) = discovered_src + && self.announce_source_discovery + { + return Err(SacnError::SourceDiscovered(src)); } - } else { - // If the timeout was none then would keep looping till data is returned as the method should keep blocking till then. - self.recv(timeout) + None } + }; + + // return the data, otherwise continue if no data is ready + if let Some(r) = res { + return Ok(r); + } else { + continue; } } - } - Err(err) => { - match err { - SacnError::Io(ref s) => { - match s.kind() { - // Windows and Unix use different error types (WouldBlock/TimedOut) for the same error. - std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { - if let Some(timeout) = timeout { - let elapsed = start_time.elapsed(); - match timeout.checked_sub(elapsed) { - None => { - // Indicates that elapsed is bigger than timeout so its time to return. - if cfg!(target_os = "windows") { - // Use the right expected error for the operating system. - Err(std::io::Error::new( - std::io::ErrorKind::TimedOut, - "No data available in given timeout", - ) - .into()) - } else { - Err(std::io::Error::new( - std::io::ErrorKind::WouldBlock, - "No data available in given timeout", - ) - .into()) - } - } - Some(new_timeout) => self.recv(Some(new_timeout)), - } - } else { - // If the timeout was none then would keep looping till data is returned as the method should keep blocking till then. - self.recv(timeout) - } - } - _ => { - // Not a timeout/wouldblock error meaning the recv should stop with the given error. - Err(err) - } + + Err(err) => + // This could be the socket-level timeout error or other socket recv error. + { + match err { + // Windows and Unix use different error types (WouldBlock/TimedOut) for the same error. + SacnError::Io(ref s) + if matches!( + s.kind(), + std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut + ) => + { + // socket read timedout. start new loop to compute new remaining which will return if deadline has passed + continue; + } + _ => { + // Not a timeout/wouldblock error meaning the recv should stop with the given error. + return Err(err); } - } - _ => { - // Not a timeout/wouldblock error meaning the recv should stop with the given error. - Err(err) } } }