diff --git a/.gitignore b/.gitignore index f8b258817..e7b4be30c 100644 --- a/.gitignore +++ b/.gitignore @@ -34,8 +34,8 @@ semantic.cache sems sems_tests sems-logfile-callextract -sems-stats -apps/monitoring/tools/target/ -apps/monitoring/tools/vendor/ -apps/monitoring/tools/.cargo/ -apps/monitoring/tools/Cargo.lock +/core/plug-in/stats/sems-stats +/target/ +/vendor/ +/.cargo/ +/Cargo.lock diff --git a/CMakeLists.txt b/CMakeLists.txt index a03af8073..25e8a9a4a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -511,6 +511,42 @@ set(CMAKE_SHARED_LIBRARY_PREFIX "") enable_testing() +# Rust toolchain detection (used by tools/sems-stats and +# apps/monitoring/tools/*). A single workspace at the repo root means a +# single detection; SEMS_RUST_OK is plain (non-cache) so it propagates to +# every add_subdirectory() below. +set(SEMS_RUST_OK FALSE) +find_program(CARGO cargo) +if(CARGO) + execute_process( + COMMAND rustc --version + OUTPUT_VARIABLE RUSTC_VERSION_OUTPUT + OUTPUT_STRIP_TRAILING_WHITESPACE + RESULT_VARIABLE RUSTC_RESULT) + if(RUSTC_RESULT EQUAL 0) + string(REGEX MATCH "[0-9]+\\.[0-9]+" RUSTC_VERSION "${RUSTC_VERSION_OUTPUT}") + if(RUSTC_VERSION VERSION_GREATER_EQUAL "1.38") + set(SEMS_RUST_OK TRUE) + message(STATUS "Found cargo: ${CARGO}, rustc ${RUSTC_VERSION}") + else() + message(WARNING "rustc ${RUSTC_VERSION} found but >= 1.38 required; skipping Rust tools") + endif() + endif() +endif() + +# Shared cargo build settings for any subdir that wants to invoke the +# root workspace. CARGO_PROFILE matches the subdir that `target/` lands +# in; CARGO_RELEASE_FLAG is the `--release` flag when in a non-Debug build. +if(CMAKE_BUILD_TYPE STREQUAL "Debug") + set(CARGO_PROFILE "debug") + set(CARGO_RELEASE_FLAG "") +else() + set(CARGO_PROFILE "release") + set(CARGO_RELEASE_FLAG "--release") +endif() +set(SEMS_CARGO_WORKSPACE "${CMAKE_SOURCE_DIR}") +set(SEMS_CARGO_TARGET_DIR "${SEMS_CARGO_WORKSPACE}/target") + add_subdirectory(apps) add_subdirectory(core) add_subdirectory(tools) diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 000000000..a38adb6e4 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,11 @@ +[workspace] +resolver = "2" +members = [ + "apps/monitoring/tools/sems-monitoring-lib", + "apps/monitoring/tools/sems-list-calls", + "apps/monitoring/tools/sems-list-active-calls", + "apps/monitoring/tools/sems-list-finished-calls", + "apps/monitoring/tools/sems-get-callproperties", + "apps/monitoring/tools/sems-prometheus-exporter", + "tools/sems-stats", +] diff --git a/apps/monitoring/CMakeLists.txt b/apps/monitoring/CMakeLists.txt index f1343be2e..090e1afce 100644 --- a/apps/monitoring/CMakeLists.txt +++ b/apps/monitoring/CMakeLists.txt @@ -5,37 +5,10 @@ install( tools/sems_list_calls.py tools/sems_list_finished_calls.py DESTINATION ${SEMS_EXEC_PREFIX}/sbin) -# Rust-based monitoring tools (require rustc >= 1.38 for edition 2018) -find_program(CARGO cargo) -set(SEMS_RUST_OK FALSE) -if(CARGO) - execute_process( - COMMAND rustc --version - OUTPUT_VARIABLE RUSTC_VERSION_OUTPUT - OUTPUT_STRIP_TRAILING_WHITESPACE - RESULT_VARIABLE RUSTC_RESULT) - if(RUSTC_RESULT EQUAL 0) - string(REGEX MATCH "[0-9]+\\.[0-9]+" RUSTC_VERSION "${RUSTC_VERSION_OUTPUT}") - if(RUSTC_VERSION VERSION_GREATER_EQUAL "1.38") - set(SEMS_RUST_OK TRUE) - message(STATUS "Found cargo: ${CARGO}, rustc ${RUSTC_VERSION}") - else() - message(WARNING "rustc ${RUSTC_VERSION} found but >= 1.38 required; skipping Rust monitoring tools") - endif() - endif() -endif() - +# Rust-based monitoring tools (require rustc >= 1.38 for edition 2018). +# Rust detection + shared cargo settings live in the root CMakeLists.txt so +# tools/ and apps/monitoring/tools/ can share one workspace and one target/. if(SEMS_RUST_OK) - set(SEMS_MONITORING_DIR ${CMAKE_CURRENT_SOURCE_DIR}/tools) - - if(CMAKE_BUILD_TYPE STREQUAL "Debug") - set(CARGO_PROFILE "debug") - set(CARGO_RELEASE_FLAG "") - else() - set(CARGO_PROFILE "release") - set(CARGO_RELEASE_FLAG "--release") - endif() - set(SEMS_MONITORING_BINARIES sems-list-calls sems-list-active-calls @@ -43,13 +16,19 @@ if(SEMS_RUST_OK) sems-get-callproperties sems-prometheus-exporter) + set(SEMS_MONITORING_CARGO_ARGS "") + foreach(bin ${SEMS_MONITORING_BINARIES}) + list(APPEND SEMS_MONITORING_CARGO_ARGS "-p" "${bin}") + endforeach() + add_custom_target(sems-monitoring-tools ALL COMMAND ${CARGO} build ${CARGO_RELEASE_FLAG} - --manifest-path ${SEMS_MONITORING_DIR}/Cargo.toml + --manifest-path ${SEMS_CARGO_WORKSPACE}/Cargo.toml + ${SEMS_MONITORING_CARGO_ARGS} COMMENT "Building sems monitoring tools (Rust)") foreach(bin ${SEMS_MONITORING_BINARIES}) - install(PROGRAMS ${SEMS_MONITORING_DIR}/target/${CARGO_PROFILE}/${bin} + install(PROGRAMS ${SEMS_CARGO_TARGET_DIR}/${CARGO_PROFILE}/${bin} DESTINATION ${SEMS_EXEC_PREFIX}/sbin) endforeach() else() diff --git a/apps/monitoring/tools/Cargo.toml b/apps/monitoring/tools/Cargo.toml deleted file mode 100644 index d8041d8d7..000000000 --- a/apps/monitoring/tools/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[workspace] -members = [ - "sems-monitoring-lib", - "sems-list-calls", - "sems-list-active-calls", - "sems-list-finished-calls", - "sems-get-callproperties", - "sems-prometheus-exporter", -] diff --git a/core/plug-in/stats/CMakeLists.txt b/core/plug-in/stats/CMakeLists.txt index 937bc2f86..5f94638e0 100644 --- a/core/plug-in/stats/CMakeLists.txt +++ b/core/plug-in/stats/CMakeLists.txt @@ -5,7 +5,13 @@ set(sems_stats_SRCS query_stats.cxx) add_executable(sems-stats ${sems_stats_SRCS}) target_link_libraries(sems-stats ${CMAKE_DL_LIBS} stdc++) -install(TARGETS sems-stats RUNTIME DESTINATION ${SEMS_EXEC_PREFIX}/sbin) +# When the Rust port (tools/sems-stats) is built it installs its own binary +# to the same path, so skip installing this C++ version to avoid the +# collision. The C++ target is still built so it remains a working fallback +# and a reference implementation. SEMS_RUST_OK is set in the root CMakeLists. +if(NOT SEMS_RUST_OK) + install(TARGETS sems-stats RUNTIME DESTINATION ${SEMS_EXEC_PREFIX}/sbin) +endif() # library set(stats_SRCS Statistics.cpp StatsUDPServer.cpp) diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 0845423fa..63eabcd56 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -12,4 +12,25 @@ install( sems-rtp-mux-set-max-frame-age-ms sems-rtp-mux-set-mtu-threshold DESTINATION ${SEMS_EXEC_PREFIX}/sbin) +# sems-stats: Rust port of core/plug-in/stats/query_stats.cxx. Builds as +# part of the root Cargo workspace; the install is only wired when the +# Rust toolchain is available, which is also the signal for +# core/plug-in/stats/CMakeLists.txt to skip its C++ install. +if(SEMS_RUST_OK) + add_custom_target(sems-stats-rust ALL + COMMAND ${CARGO} build ${CARGO_RELEASE_FLAG} + --manifest-path ${SEMS_CARGO_WORKSPACE}/Cargo.toml + -p sems-stats + COMMENT "Building sems-stats (Rust)") + + install(PROGRAMS ${SEMS_CARGO_TARGET_DIR}/${CARGO_PROFILE}/sems-stats + DESTINATION ${SEMS_EXEC_PREFIX}/sbin) + + add_test( + NAME sems-stats-cargo-tests + COMMAND ${CARGO} test ${CARGO_RELEASE_FLAG} + --manifest-path ${SEMS_CARGO_WORKSPACE}/Cargo.toml + -p sems-stats) +endif() + include(${CMAKE_SOURCE_DIR}/cmake/config.rules.txt) diff --git a/tools/sems-stats/Cargo.toml b/tools/sems-stats/Cargo.toml new file mode 100644 index 000000000..50fdd630e --- /dev/null +++ b/tools/sems-stats/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "sems-stats" +version = "0.1.0" +edition = "2018" +description = "UDP client for the SEMS stats plug-in (port of query_stats.cxx)" +license = "GPL-2.0-or-later" + +[[bin]] +name = "sems-stats" +path = "src/main.rs" + +[lib] +name = "sems_stats" +path = "src/lib.rs" diff --git a/tools/sems-stats/src/lib.rs b/tools/sems-stats/src/lib.rs new file mode 100644 index 000000000..f16679c88 --- /dev/null +++ b/tools/sems-stats/src/lib.rs @@ -0,0 +1,297 @@ +//! Pure helpers for sems-stats: argument parsing, reply trimming, and the +//! UDP roundtrip. No global state, no unsafe. + +#![forbid(unsafe_code)] + +use std::fmt; +use std::io; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket}; +use std::time::Duration; + +pub const DEFAULT_SERVER: &str = "127.0.0.1"; +pub const DEFAULT_PORT: u16 = 5040; +pub const DEFAULT_CMD: &str = "calls"; +pub const DEFAULT_TIMEOUT_SECS: u64 = 5; +pub const MSG_BUF_SIZE: usize = 2048; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Args { + pub server: String, + pub port: u16, + pub cmd: String, + pub timeout_secs: u64, + pub quiet: bool, + pub help: bool, +} + +impl Default for Args { + fn default() -> Self { + Args { + server: DEFAULT_SERVER.to_string(), + port: DEFAULT_PORT, + cmd: DEFAULT_CMD.to_string(), + timeout_secs: DEFAULT_TIMEOUT_SECS, + quiet: false, + help: false, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ArgError { + MissingValue(String), + Unknown(String), + InvalidPort(String), + InvalidTimeout(String), + InvalidServer(String), +} + +impl fmt::Display for ArgError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ArgError::MissingValue(flag) => { + write!(f, "missing argument for parameter '{}'", flag) + } + ArgError::Unknown(arg) => write!(f, "unknown parameter '{}'", arg), + ArgError::InvalidPort(s) => write!(f, "port '{}' is not a valid port number", s), + ArgError::InvalidTimeout(s) => write!(f, "timeout '{}' not understood", s), + ArgError::InvalidServer(s) => write!(f, "server '{}' is an invalid IP address", s), + } + } +} + +impl std::error::Error for ArgError {} + +/// Parse argv into `Args`. The first element is treated as the program name +/// and skipped, matching the usual Unix convention. +pub fn parse_args(argv: I) -> Result +where + I: IntoIterator, + S: AsRef, +{ + let mut args = Args::default(); + let mut it = argv.into_iter(); + let _progname = it.next(); + + while let Some(raw) = it.next() { + let arg = raw.as_ref(); + match arg { + "-h" | "--help" => args.help = true, + "-q" | "--quiet" => args.quiet = true, + "-s" => { + let v = it + .next() + .ok_or_else(|| ArgError::MissingValue("-s".to_string()))?; + args.server = v.as_ref().to_string(); + } + "-p" => { + let v = it + .next() + .ok_or_else(|| ArgError::MissingValue("-p".to_string()))?; + let s = v.as_ref(); + let port = s + .parse::() + .map_err(|_| ArgError::InvalidPort(s.to_string()))?; + if port == 0 { + return Err(ArgError::InvalidPort(s.to_string())); + } + args.port = port; + } + "-c" => { + let v = it + .next() + .ok_or_else(|| ArgError::MissingValue("-c".to_string()))?; + args.cmd = v.as_ref().to_string(); + } + "-t" => { + let v = it + .next() + .ok_or_else(|| ArgError::MissingValue("-t".to_string()))?; + let s = v.as_ref(); + args.timeout_secs = s + .parse::() + .map_err(|_| ArgError::InvalidTimeout(s.to_string()))?; + } + other => return Err(ArgError::Unknown(other.to_string())), + } + } + + Ok(args) +} + +/// Resolve a user-supplied server string as an IPv4 literal. The SEMS stats +/// server binds an IPv4 socket, so hostnames and IPv6 are intentionally not +/// accepted here (matching the C++ client). +pub fn resolve_server(server: &str) -> Result { + server + .parse::() + .map_err(|_| ArgError::InvalidServer(server.to_string())) +} + +/// Render help text. Kept as a pure function so it can be asserted on in tests. +pub fn usage(progname: &str) -> String { + format!( + "SIP Express Media Server stats query\n\ + \n\ + Syntax: {p} []\n\ + \n\ + where :\n\ + \u{0020}-s : server name|ip (default: {server})\n\ + \u{0020}-p : server port (default: {port})\n\ + \u{0020}-c : command (default: {cmd})\n\ + \u{0020}-t : timeout (default: {timeout}s)\n\ + \u{0020}-q : quiet: print only the server reply\n\ + \u{0020}-h : show this help\n\ + \n\ + Tips:\n\ + \u{0020}o quote the command if it has arguments (e.g. {p} -c \"set_loglevel 1\")\n\ + \u{0020}o \"which\" prints available commands\n", + p = progname, + server = DEFAULT_SERVER, + port = DEFAULT_PORT, + cmd = DEFAULT_CMD, + timeout = DEFAULT_TIMEOUT_SECS, + ) +} + +/// Strip a single trailing NUL byte (the SEMS stats server terminates replies +/// with '\0'). Non-UTF-8 bytes are decoded lossily. +pub fn trim_reply(buf: &[u8]) -> String { + let end = if buf.last() == Some(&0) { + buf.len() - 1 + } else { + buf.len() + }; + String::from_utf8_lossy(&buf[..end]).into_owned() +} + +/// Send `cmd` (plus a trailing newline) to the stats server at `addr` and +/// return the decoded reply. `timeout` applies to the receive step only. +pub fn query(addr: SocketAddrV4, cmd: &str, timeout: Duration) -> io::Result { + let sock = UdpSocket::bind("0.0.0.0:0")?; + sock.set_read_timeout(Some(timeout))?; + + let mut msg = String::with_capacity(cmd.len() + 1); + msg.push_str(cmd); + msg.push('\n'); + sock.send_to(msg.as_bytes(), SocketAddr::V4(addr))?; + + let mut buf = [0u8; MSG_BUF_SIZE]; + let (n, _) = sock.recv_from(&mut buf)?; + Ok(trim_reply(&buf[..n])) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse(v: &[&str]) -> Result { + parse_args(v.iter().copied()) + } + + #[test] + fn defaults_when_no_flags() { + let a = parse(&["sems-stats"]).unwrap(); + assert_eq!(a, Args::default()); + assert_eq!(a.server, DEFAULT_SERVER); + assert_eq!(a.port, DEFAULT_PORT); + assert_eq!(a.cmd, DEFAULT_CMD); + assert_eq!(a.timeout_secs, DEFAULT_TIMEOUT_SECS); + assert!(!a.quiet); + assert!(!a.help); + } + + #[test] + fn parses_all_flags() { + let a = parse(&[ + "sems-stats", "-s", "10.0.0.1", "-p", "6000", "-c", "get_callsavg", "-t", "15", "-q", + ]) + .unwrap(); + assert_eq!(a.server, "10.0.0.1"); + assert_eq!(a.port, 6000); + assert_eq!(a.cmd, "get_callsavg"); + assert_eq!(a.timeout_secs, 15); + assert!(a.quiet); + } + + #[test] + fn long_flags() { + let a = parse(&["sems-stats", "--quiet", "--help"]).unwrap(); + assert!(a.quiet); + assert!(a.help); + } + + #[test] + fn missing_value_errors() { + assert_eq!(parse(&["p", "-s"]), Err(ArgError::MissingValue("-s".into()))); + assert_eq!(parse(&["p", "-p"]), Err(ArgError::MissingValue("-p".into()))); + assert_eq!(parse(&["p", "-c"]), Err(ArgError::MissingValue("-c".into()))); + assert_eq!(parse(&["p", "-t"]), Err(ArgError::MissingValue("-t".into()))); + } + + #[test] + fn unknown_flag_errors() { + let err = parse(&["p", "-Z"]).unwrap_err(); + assert_eq!(err, ArgError::Unknown("-Z".into())); + } + + #[test] + fn invalid_port_rejected() { + assert!(matches!( + parse(&["p", "-p", "abc"]), + Err(ArgError::InvalidPort(_)) + )); + assert!(matches!( + parse(&["p", "-p", "0"]), + Err(ArgError::InvalidPort(_)) + )); + assert!(matches!( + parse(&["p", "-p", "70000"]), + Err(ArgError::InvalidPort(_)) + )); + } + + #[test] + fn invalid_timeout_rejected() { + assert!(matches!( + parse(&["p", "-t", "soon"]), + Err(ArgError::InvalidTimeout(_)) + )); + } + + #[test] + fn resolve_server_ipv4() { + assert_eq!(resolve_server("127.0.0.1").unwrap(), Ipv4Addr::LOCALHOST); + assert_eq!(resolve_server("10.1.2.3").unwrap(), Ipv4Addr::new(10, 1, 2, 3)); + } + + #[test] + fn resolve_server_rejects_hostname_and_ipv6() { + assert!(resolve_server("localhost").is_err()); + assert!(resolve_server("::1").is_err()); + } + + #[test] + fn trim_reply_drops_trailing_nul_only() { + assert_eq!(trim_reply(b"Active calls: 0\n\0"), "Active calls: 0\n"); + assert_eq!(trim_reply(b"hello\0"), "hello"); + assert_eq!(trim_reply(b"no-nul"), "no-nul"); + assert_eq!(trim_reply(b""), ""); + assert_eq!(trim_reply(b"\0"), ""); + } + + #[test] + fn trim_reply_lossy_on_invalid_utf8() { + let bytes = [0xffu8, b'x', 0]; + let s = trim_reply(&bytes); + assert!(s.ends_with('x')); + } + + #[test] + fn usage_mentions_all_switches() { + let u = usage("sems-stats"); + for token in ["-s", "-p", "-c", "-t", "-q", "-h"] { + assert!(u.contains(token), "usage missing `{}`", token); + } + } +} diff --git a/tools/sems-stats/src/main.rs b/tools/sems-stats/src/main.rs new file mode 100644 index 000000000..bf95b408b --- /dev/null +++ b/tools/sems-stats/src/main.rs @@ -0,0 +1,74 @@ +#![forbid(unsafe_code)] + +use std::io; +use std::net::SocketAddrV4; +use std::process::ExitCode; +use std::time::Duration; + +use sems_stats::{parse_args, query, resolve_server, usage, ArgError}; + +fn main() -> ExitCode { + let argv: Vec = std::env::args().collect(); + let progname = argv + .first() + .cloned() + .unwrap_or_else(|| "sems-stats".to_string()); + + let args = match parse_args(&argv) { + Ok(a) => a, + Err(e) => { + eprintln!("{}: {}", progname, e); + eprint!("{}", usage(&progname)); + return ExitCode::from(1); + } + }; + + if args.help { + print!("{}", usage(&progname)); + return ExitCode::from(1); + } + + let ip = match resolve_server(&args.server) { + Ok(ip) => ip, + Err(ArgError::InvalidServer(s)) => { + eprintln!("server '{}' is an invalid IP address", s); + return ExitCode::from(1); + } + Err(e) => { + eprintln!("{}", e); + return ExitCode::from(1); + } + }; + let addr = SocketAddrV4::new(ip, args.port); + + if !args.quiet { + println!("sending '{}\\n' to {}:{}", args.cmd, args.server, args.port); + } + + match query(addr, &args.cmd, Duration::from_secs(args.timeout_secs)) { + Ok(reply) => { + if args.quiet { + // Emit the reply with a single trailing newline so shell + // captures (`$(sems-stats -q ...)`) produce one clean line. + println!("{}", reply.trim_end_matches('\n')); + } else { + println!("received:"); + print!("{}", reply); + if !reply.ends_with('\n') { + println!(); + } + } + ExitCode::from(0) + } + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => { + eprintln!("read timeout!"); + ExitCode::from(1) + } + _ => { + eprintln!("socket error: {}", e); + ExitCode::from(2) + } + }, + } +} diff --git a/tools/sems-stats/tests/udp_roundtrip.rs b/tools/sems-stats/tests/udp_roundtrip.rs new file mode 100644 index 000000000..194dfa862 --- /dev/null +++ b/tools/sems-stats/tests/udp_roundtrip.rs @@ -0,0 +1,79 @@ +//! End-to-end test: spin up a local UDP server mimicking the SEMS stats +//! plug-in and verify that `sems_stats::query` round-trips correctly. + +use std::net::{Ipv4Addr, SocketAddrV4, UdpSocket}; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +use sems_stats::{query, MSG_BUF_SIZE}; + +/// Start a one-shot UDP responder on 127.0.0.1. Returns the bound port and a +/// channel that delivers the request bytes once received. The responder will +/// reply with `reply` (plus a trailing NUL byte, like the real server). +fn spawn_responder(reply: &'static [u8]) -> (u16, mpsc::Receiver>) { + let sock = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) + .expect("bind responder socket"); + let port = sock.local_addr().unwrap().port(); + let (tx, rx) = mpsc::channel(); + + thread::spawn(move || { + sock.set_read_timeout(Some(Duration::from_secs(5))).ok(); + let mut buf = vec![0u8; MSG_BUF_SIZE]; + if let Ok((n, peer)) = sock.recv_from(&mut buf) { + let _ = tx.send(buf[..n].to_vec()); + let mut out = Vec::with_capacity(reply.len() + 1); + out.extend_from_slice(reply); + out.push(0); + let _ = sock.send_to(&out, peer); + } + }); + + (port, rx) +} + +#[test] +fn query_sends_command_and_returns_reply() { + let (port, rx) = spawn_responder(b"Active calls: 42\n"); + let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port); + + let reply = query(addr, "calls", Duration::from_secs(2)).expect("query ok"); + assert_eq!(reply, "Active calls: 42\n"); + + let received = rx + .recv_timeout(Duration::from_secs(2)) + .expect("responder saw request"); + assert_eq!(received, b"calls\n"); +} + +#[test] +fn query_preserves_command_with_spaces() { + let (port, rx) = spawn_responder(b"loglevel set to 1.\n"); + let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port); + + let reply = query(addr, "set_loglevel 1", Duration::from_secs(2)).unwrap(); + assert_eq!(reply, "loglevel set to 1.\n"); + + let received = rx.recv_timeout(Duration::from_secs(2)).unwrap(); + assert_eq!(received, b"set_loglevel 1\n"); +} + +#[test] +fn query_times_out_when_no_reply() { + // Bind a socket but never read from it, so the client's recv must time out. + let sock = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).unwrap(); + let port = sock.local_addr().unwrap().port(); + let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port); + + let err = query(addr, "calls", Duration::from_millis(200)).unwrap_err(); + assert!( + matches!( + err.kind(), + std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut + ), + "expected timeout, got {:?}", + err.kind() + ); + + drop(sock); +}