From bb6215b1208d90cfaad0c3b42092f1452e3aa22c Mon Sep 17 00:00:00 2001 From: Cody Rioux Date: Tue, 18 Nov 2025 17:44:42 -0400 Subject: [PATCH] ch04: Implement kqueue poll for macOS/BSD --- ch04/c-kqueue/Cargo.toml | 8 +++ ch04/c-kqueue/README.md | 15 +++++ ch04/c-kqueue/src/ffi.rs | 56 +++++++++++++++++ ch04/c-kqueue/src/main.rs | 114 +++++++++++++++++++++++++++++++++ ch04/c-kqueue/src/poll.rs | 129 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 322 insertions(+) create mode 100644 ch04/c-kqueue/Cargo.toml create mode 100644 ch04/c-kqueue/README.md create mode 100644 ch04/c-kqueue/src/ffi.rs create mode 100644 ch04/c-kqueue/src/main.rs create mode 100644 ch04/c-kqueue/src/poll.rs diff --git a/ch04/c-kqueue/Cargo.toml b/ch04/c-kqueue/Cargo.toml new file mode 100644 index 0000000..53e2fc6 --- /dev/null +++ b/ch04/c-kqueue/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "c-kqueue" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/ch04/c-kqueue/README.md b/ch04/c-kqueue/README.md new file mode 100644 index 0000000..ebe28bf --- /dev/null +++ b/ch04/c-kqueue/README.md @@ -0,0 +1,15 @@ +# c-kqueue + +This create contains a kqueue adaptation of the example code for chapter 4. +It should work if you're running on macOS or BSD, but I've only tested +on macOS. + +Rather than attempt to duplicate, or re-explain the basic principles the +comments in this code attempt to explain the differences between the epoll +implementation and this one. My objective here was to enable the reader (and +myself) to understand this code in the context of the book rather than +standalone. + +You can run the example by simply writing `cargo run`. + +Otherwise see the [../a-epoll/README.md](../a-epoll/README.md) for further details. diff --git a/ch04/c-kqueue/src/ffi.rs b/ch04/c-kqueue/src/ffi.rs new file mode 100644 index 0000000..f04fbf6 --- /dev/null +++ b/ch04/c-kqueue/src/ffi.rs @@ -0,0 +1,56 @@ +/// FFI Functions for interacting with kqueue. +/// Details on constants, functions, and parameters can be found on the +/// kqueue(2) manual page. +/// https://man.freebsd.org/cgi/man.cgi?query=kevent&sektion=2&manpath=macOS+15.7 + +pub const EVFILT_READ: i16 = -1; // Analagous to the EPOLLIN flag in the original. +pub const EV_ADD: u16 = 0x1; // Add the event to the queue, analagous to EPOLL_CTL_ADD in the original. +pub const EV_CLEAR: u16 = 0x20; // Reset the state after the user retrieves it, similar to EPOLLET. + +#[derive(Debug)] +#[repr(C)] +// Timespec struct is a required input to the kevent syscall. +// Used for setting timeout values. +pub struct Timespec { + pub tv_sec: isize, + pub tv_nsec: usize, +} + +#[derive(Debug, Clone, Default)] +#[repr(C)] +// We use a Kevent for kqueue instead of the Event struct for epoll. +// Consistent with Issue #5 in the repository this is not a packed struct on ARM Macs. +pub struct Kevent { + pub ident: u64, + pub filter: i16, + pub flags: u16, + pub fflags: u32, + pub data: isize, + pub udata: u64, +} + +impl Kevent { + pub fn token(&self) -> Option { + // udata stores the token, analagous to the epoll_data field in the original. + Some(self.udata as usize) + } +} + +#[link(name = "c")] +extern "C" { + // Creates the event queue, analagous to epoll_create in the original. + pub fn kqueue() -> i32; + // Registers events with the queue and also used to check for pending events. + // This performs the functionality of epoll_ctl and epoll_wait in the original, + // depending on the parameters. See poll.rs for usage. + pub fn kevent( + kq: i32, + changelist: *const Kevent, + nchanges: i32, + eventlist: *mut Kevent, + nevents: i32, + timeout: *const Timespec, + ) -> i32; + // Closes the event queue, analagous to close in the original. + pub fn close(d: i32) -> i32; +} diff --git a/ch04/c-kqueue/src/main.rs b/ch04/c-kqueue/src/main.rs new file mode 100644 index 0000000..639005d --- /dev/null +++ b/ch04/c-kqueue/src/main.rs @@ -0,0 +1,114 @@ +/// From a pedagogical perspective the changes in this file from the book are uninteresting, +/// they're simply adapted to the new ffi and poll implementations. +use std::{ + collections::HashSet, + env, + io::{self, Read, Result, Write}, + net::TcpStream, +}; + +use ffi::Kevent; +use poll::Poll; + +mod ffi; +mod poll; + +/// Not the entire url, but everyhing after the domain addr +/// i.e. http://localhost/1000/hello => /1000/hello +fn get_req(path: &str) -> String { + format!( + "GET {path} HTTP/1.1\r\n\ + Host: localhost\r\n\ + Connection: close\r\n\ + \r\n" + ) +} + +fn handle_events( + events: &[Kevent], + streams: &mut [TcpStream], + handled: &mut HashSet, +) -> Result { + let mut handled_events = 0; + for event in events { + let index = event.token().unwrap(); + let mut data = vec![0u8; 4096]; + + loop { + match streams[index].read(&mut data) { + Ok(0) => { + // FIX #4 + // `insert` returns false if the value already existed in the set. + if !handled.insert(index) { + break; + } + handled_events += 1; + break; + } + Ok(n) => { + let txt = String::from_utf8_lossy(&data[..n]); + + println!("RECEIVED: {:?}", event); + println!("{txt}\n------\n"); + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + // this was not in the book example, but it's a error condition + // you probably want to handle in some way (either by breaking + // out of the loop or trying a new read call immediately) + Err(e) if e.kind() == io::ErrorKind::Interrupted => break, + Err(e) => return Err(e), + } + } + } + + Ok(handled_events) +} + +fn main() -> Result<()> { + let mut poll = Poll::new()?; + let n_events = 5; + + let mut streams = vec![]; + + // FIX #19: Allow to override the base URL by passing it as a command line argument + let base_url = env::args() + .nth(1) + .unwrap_or_else(|| String::from("localhost")); + + let addr = format!("{}:8080", &base_url); + + for i in 0..n_events { + let delay = (n_events - i) * 1000; + let url_path = format!("/{delay}/request-{i}"); + let request = get_req(&url_path); + let mut stream = std::net::TcpStream::connect(&addr)?; + stream.set_nonblocking(true)?; + + stream.write_all(request.as_bytes())?; + // NB! Token is equal to index in Vec + poll.registry() + .register(&stream, i, (ffi::EV_ADD | ffi::EV_CLEAR) as i32)?; + + streams.push(stream); + } + + // FIX #4: store the handled IDs + let mut handled_ids = HashSet::new(); + + let mut handled_events = 0; + while handled_events < n_events { + let mut events = Vec::with_capacity(10); + poll.poll(&mut events, None)?; + + if events.is_empty() { + println!("TIMEOUT (OR SPURIOUS EVENT NOTIFICATION)"); + continue; + } + + // ------------------------------------------------------⌄ FIX #4 (new signature) + handled_events += handle_events(&events, &mut streams, &mut handled_ids)?; + } + + println!("FINISHED"); + Ok(()) +} diff --git a/ch04/c-kqueue/src/poll.rs b/ch04/c-kqueue/src/poll.rs new file mode 100644 index 0000000..9b2039e --- /dev/null +++ b/ch04/c-kqueue/src/poll.rs @@ -0,0 +1,129 @@ +use std::{ + io::{self, Result}, + net::TcpStream, + os::fd::AsRawFd, +}; + +use crate::ffi; + +type Events = Vec; + +pub struct Poll { + registry: Registry, +} + +impl Poll { + pub fn new() -> Result { + let res = unsafe { ffi::kqueue() }; + if res < 0 { + return Err(io::Error::last_os_error()); + } + + Ok(Self { + registry: Registry { raw_fd: res }, + }) + } + + pub fn registry(&self) -> &Registry { + &self.registry + } + + /// Makes a blocking call to the OS parking the calling thread. It will wake up + /// when one or more events we've registered interest in have occurred or + /// the timeout duration has elapsed, whichever occurs first. + /// + /// # Note + /// If the number of events returned is 0, the wakeup was due to an elapsed + /// timeout + pub fn poll(&mut self, events: &mut Events, timeout: Option) -> Result<()> { + let fd = self.registry.raw_fd; + + // Timeout differs in kqueue from epoll in that it is a timespec struct + // Instead of sending -1 for no timeout we send the null pointer. + let timeout = timeout.map(|t| ffi::Timespec { + tv_sec: t as isize / 1000, + tv_nsec: (t as usize % 1000) * 1_000_000, + }); + + let timeout_ptr = match timeout { + Some(ref t) => t as *const ffi::Timespec, + None => std::ptr::null(), + }; + + let max_events = events.capacity() as i32; + + // To poll with kevent we need to call kevent + // with a null changelist, and 0 changes. + // Critically we provide an out pointer to the + // eventlist where kevent will store the events + // for which we're being notified. + let res = unsafe { + ffi::kevent( + fd, + std::ptr::null(), + 0, + events.as_mut_ptr(), + max_events, + timeout_ptr, + ) + }; + + if res < 0 { + return Err(io::Error::last_os_error()); + }; + + // This is safe because kevent ensures that `res` events are assigned. + unsafe { events.set_len(res as usize) }; + Ok(()) + } +} + +pub struct Registry { + raw_fd: i32, +} + +impl Registry { + pub fn register(&self, source: &TcpStream, token: usize, interests: i32) -> Result<()> { + let flags = interests as u16; + + let event = ffi::Kevent { + ident: source.as_raw_fd() as u64, + filter: ffi::EVFILT_READ, + flags, + fflags: 0, + data: 0, + udata: token as u64, + }; + + // To register an event we need to call kevent + // with a Kevent struct, 1 change and null eventlist + // and timeout. + // This is analagous to the epoll_ctl call in the original. + let res = unsafe { + ffi::kevent( + self.raw_fd, + &event, + 1, + std::ptr::null_mut(), + 0, + std::ptr::null(), + ) + }; + + if res < 0 { + return Err(io::Error::last_os_error()); + } + Ok(()) + } +} + +impl Drop for Registry { + fn drop(&mut self) { + let res = unsafe { ffi::close(self.raw_fd) }; + + if res < 0 { + let err = io::Error::last_os_error(); + println!("ERROR: {err:?}"); + } + } +}