Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 95 additions & 110 deletions src/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ use uuid::Uuid;

use std::cmp::{Ordering, max};
use std::collections::HashMap;
use std::fmt;
use std::io::Read;
use std::net::{Ipv4Addr, SocketAddr};
use std::time::{Duration, Instant};
use std::{fmt, io};

/// Extra net imports required for the IPv6 handling on the linux side.
#[cfg(not(target_os = "windows"))]
Expand Down Expand Up @@ -504,128 +504,113 @@ impl SacnReceiver {
return Err(SacnError::NoDataUniversesRegistered());
}

self.sequences.check_timeouts(self.announce_timeout)?;
self.check_waiting_data_timeouts();

// if timeout is 0, then it's time to return
if timeout == Some(Duration::from_secs(0)) {
if cfg!(target_os = "windows") {
// always check timeouts
self.sequences.check_timeouts(self.announce_timeout)?;
self.check_waiting_data_timeouts();
return Err(io::Error::new(
// Use the right expected error for the operating system.
return Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"No data available in given timeout",
)
.into());
} else {
return Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"No data available in given timeout",
)
.into());
}
if cfg!(target_os = "windows") {
io::ErrorKind::TimedOut
} else {
io::ErrorKind::WouldBlock
},
"No data available in given timeout",
)
.into());
}

// Forces the actual timeout used for receiving from the underlying network to never exceed E131_NETWORK_DATA_LOSS_TIMEOUT.
// This means that the timeouts for the sequence numbers will be checked at least every E131_NETWORK_DATA_LOSS_TIMEOUT even if
// recv is called with a longer timeout.
let actual_timeout =
if timeout.is_some() && timeout.unwrap() < E131_NETWORK_DATA_LOSS_TIMEOUT {
timeout
} else {
Some(E131_NETWORK_DATA_LOSS_TIMEOUT)
};

self.receiver.set_timeout(actual_timeout)?; // "Failed to sent a timeout value for the receiver"
let start_time = Instant::now();
// Fixed instant that should return the whole recv call
let deadline = timeout.and_then(|t| Instant::now().checked_add(t));

// shared buf through loop iterations
let mut buf: [u8; RCV_BUF_DEFAULT_SIZE] = [0; RCV_BUF_DEFAULT_SIZE];
match self.receiver.recv(&mut buf) {
Ok(pkt) => {
let pdu: E131RootLayer = pkt.pdu;
let data: E131RootLayerData = pdu.data;
let res = match data {
DataPacket(d) => self.handle_data_packet(pdu.cid, d)?,
SynchronizationPacket(s) => self.handle_sync_packet(pdu.cid, s)?,
UniverseDiscoveryPacket(u) => {
let discovered_src: Option<String> =
self.handle_universe_discovery_packet(pdu.cid, u);
if let Some(src) = discovered_src
&& self.announce_source_discovery
{
return Err(SacnError::SourceDiscovered(src));
};
None

loop {
self.sequences.check_timeouts(self.announce_timeout)?;
self.check_waiting_data_timeouts();

// In the case of `timeout` being longer than `E131_NETWORK_DATA_LOSS_TIMEOUT`:
// Forces the actual timeout used for receiving from the underlying network to never exceed E131_NETWORK_DATA_LOSS_TIMEOUT.
// This means that the timeouts for the sequence numbers will be checked at least every E131_NETWORK_DATA_LOSS_TIMEOUT even if
// recv is called with a longer timeout.
let remaining = match deadline {
None => None, // set to data loss timeout below so timeouts are checked again.
Some(dl) => {
let now = Instant::now();
if now >= dl {
// timeout expired
return Err(io::Error::new(
if cfg!(target_os = "windows") {
io::ErrorKind::TimedOut
} else {
io::ErrorKind::WouldBlock
},
"No data available in given timeout",
)
.into());
}
};
Some(dl - now)
}
};

let actual_timeout = if let Some(rem) = remaining {
rem.min(E131_NETWORK_DATA_LOSS_TIMEOUT)
} else {
E131_NETWORK_DATA_LOSS_TIMEOUT
};

match res {
Some(r) => Ok(r),
None => {
// Indicates that there is no data ready to pass up yet even if a packet was received.
// To stop recv blocking forever with a non-None timeout due to packets being received consistently (that reset the timeout)
// within the receive timeout (e.g. universe discovery packets if the discovery interval < timeout) the timeout needs to be
// adjusted to account for the time already taken.
if let Some(timeout) = timeout {
let elapsed = start_time.elapsed();
match timeout.checked_sub(elapsed) {
None => {
// Indicates that elapsed is bigger than timeout so its time to return.
Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"No data available in given timeout",
)
.into())
}
Some(new_timeout) => self.recv(Some(new_timeout)),
self.receiver.set_timeout(Some(actual_timeout))?; // "Failed to set a timeout value for the receiver"

// Zero out the buffer before receiving. This may be redundant since recv should pack the whole buffer.
buf.fill(0);

match self.receiver.recv(&mut buf) {
Ok(pkt) => {
let pdu: E131RootLayer = pkt.pdu;
let data: E131RootLayerData = pdu.data;
let res = match data {
DataPacket(d) => self.handle_data_packet(pdu.cid, d)?,
SynchronizationPacket(s) => self.handle_sync_packet(pdu.cid, s)?,
UniverseDiscoveryPacket(u) => {
let discovered_src: Option<String> =
self.handle_universe_discovery_packet(pdu.cid, u);
if let Some(src) = discovered_src
&& self.announce_source_discovery
{
return Err(SacnError::SourceDiscovered(src));
}
} else {
// If the timeout was none then would keep looping till data is returned as the method should keep blocking till then.
self.recv(timeout)
None
}
};

// return the data, otherwise continue if no data is ready
if let Some(r) = res {
return Ok(r);
} else {
continue;
}
}
}
Err(err) => {
match err {
SacnError::Io(ref s) => {
match s.kind() {
// Windows and Unix use different error types (WouldBlock/TimedOut) for the same error.
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
if let Some(timeout) = timeout {
let elapsed = start_time.elapsed();
match timeout.checked_sub(elapsed) {
None => {
// Indicates that elapsed is bigger than timeout so its time to return.
if cfg!(target_os = "windows") {
// Use the right expected error for the operating system.
Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"No data available in given timeout",
)
.into())
} else {
Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"No data available in given timeout",
)
.into())
}
}
Some(new_timeout) => self.recv(Some(new_timeout)),
}
} else {
// If the timeout was none then would keep looping till data is returned as the method should keep blocking till then.
self.recv(timeout)
}
}
_ => {
// Not a timeout/wouldblock error meaning the recv should stop with the given error.
Err(err)
}

Err(err) =>
// This could be the socket-level timeout error or other socket recv error.
{
match err {
// Windows and Unix use different error types (WouldBlock/TimedOut) for the same error.
SacnError::Io(ref s)
if matches!(
s.kind(),
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
) =>
{
// socket read timedout. start new loop to compute new remaining which will return if deadline has passed
continue;
}
_ => {
// Not a timeout/wouldblock error meaning the recv should stop with the given error.
return Err(err);
}
}
_ => {
// Not a timeout/wouldblock error meaning the recv should stop with the given error.
Err(err)
}
}
}
Expand Down