Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ch04/c-kqueue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "c-kqueue"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
15 changes: 15 additions & 0 deletions ch04/c-kqueue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# c-kqueue

This create contains a kqueue adaptation of the example code for chapter 4.
It should work if you're running on macOS or BSD, but I've only tested
on macOS.

Rather than attempt to duplicate, or re-explain the basic principles the
comments in this code attempt to explain the differences between the epoll
implementation and this one. My objective here was to enable the reader (and
myself) to understand this code in the context of the book rather than
standalone.

You can run the example by simply writing `cargo run`.

Otherwise see the [../a-epoll/README.md](../a-epoll/README.md) for further details.
56 changes: 56 additions & 0 deletions ch04/c-kqueue/src/ffi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/// FFI Functions for interacting with kqueue.
/// Details on constants, functions, and parameters can be found on the
/// kqueue(2) manual page.
/// https://man.freebsd.org/cgi/man.cgi?query=kevent&sektion=2&manpath=macOS+15.7

pub const EVFILT_READ: i16 = -1; // Analagous to the EPOLLIN flag in the original.
pub const EV_ADD: u16 = 0x1; // Add the event to the queue, analagous to EPOLL_CTL_ADD in the original.
pub const EV_CLEAR: u16 = 0x20; // Reset the state after the user retrieves it, similar to EPOLLET.

#[derive(Debug)]
#[repr(C)]
// Timespec struct is a required input to the kevent syscall.
// Used for setting timeout values.
pub struct Timespec {
pub tv_sec: isize,
pub tv_nsec: usize,
}

#[derive(Debug, Clone, Default)]
#[repr(C)]
// We use a Kevent for kqueue instead of the Event struct for epoll.
// Consistent with Issue #5 in the repository this is not a packed struct on ARM Macs.
pub struct Kevent {
pub ident: u64,
pub filter: i16,
pub flags: u16,
pub fflags: u32,
pub data: isize,
pub udata: u64,
}

impl Kevent {
pub fn token(&self) -> Option<usize> {
// udata stores the token, analagous to the epoll_data field in the original.
Some(self.udata as usize)
}
}

#[link(name = "c")]
extern "C" {
// Creates the event queue, analagous to epoll_create in the original.
pub fn kqueue() -> i32;
// Registers events with the queue and also used to check for pending events.
// This performs the functionality of epoll_ctl and epoll_wait in the original,
// depending on the parameters. See poll.rs for usage.
pub fn kevent(
kq: i32,
changelist: *const Kevent,
nchanges: i32,
eventlist: *mut Kevent,
nevents: i32,
timeout: *const Timespec,
) -> i32;
// Closes the event queue, analagous to close in the original.
pub fn close(d: i32) -> i32;
}
114 changes: 114 additions & 0 deletions ch04/c-kqueue/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/// From a pedagogical perspective the changes in this file from the book are uninteresting,
/// they're simply adapted to the new ffi and poll implementations.
use std::{
collections::HashSet,
env,
io::{self, Read, Result, Write},
net::TcpStream,
};

use ffi::Kevent;
use poll::Poll;

mod ffi;
mod poll;

/// Not the entire url, but everyhing after the domain addr
/// i.e. http://localhost/1000/hello => /1000/hello
fn get_req(path: &str) -> String {
format!(
"GET {path} HTTP/1.1\r\n\
Host: localhost\r\n\
Connection: close\r\n\
\r\n"
)
}

fn handle_events(
events: &[Kevent],
streams: &mut [TcpStream],
handled: &mut HashSet<usize>,
) -> Result<usize> {
let mut handled_events = 0;
for event in events {
let index = event.token().unwrap();
let mut data = vec![0u8; 4096];

loop {
match streams[index].read(&mut data) {
Ok(0) => {
// FIX #4
// `insert` returns false if the value already existed in the set.
if !handled.insert(index) {
break;
}
handled_events += 1;
break;
}
Ok(n) => {
let txt = String::from_utf8_lossy(&data[..n]);

println!("RECEIVED: {:?}", event);
println!("{txt}\n------\n");
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
// this was not in the book example, but it's a error condition
// you probably want to handle in some way (either by breaking
// out of the loop or trying a new read call immediately)
Err(e) if e.kind() == io::ErrorKind::Interrupted => break,
Err(e) => return Err(e),
}
}
}

Ok(handled_events)
}

fn main() -> Result<()> {
let mut poll = Poll::new()?;
let n_events = 5;

let mut streams = vec![];

// FIX #19: Allow to override the base URL by passing it as a command line argument
let base_url = env::args()
.nth(1)
.unwrap_or_else(|| String::from("localhost"));

let addr = format!("{}:8080", &base_url);

for i in 0..n_events {
let delay = (n_events - i) * 1000;
let url_path = format!("/{delay}/request-{i}");
let request = get_req(&url_path);
let mut stream = std::net::TcpStream::connect(&addr)?;
stream.set_nonblocking(true)?;

stream.write_all(request.as_bytes())?;
// NB! Token is equal to index in Vec
poll.registry()
.register(&stream, i, (ffi::EV_ADD | ffi::EV_CLEAR) as i32)?;

streams.push(stream);
}

// FIX #4: store the handled IDs
let mut handled_ids = HashSet::new();

let mut handled_events = 0;
while handled_events < n_events {
let mut events = Vec::with_capacity(10);
poll.poll(&mut events, None)?;

if events.is_empty() {
println!("TIMEOUT (OR SPURIOUS EVENT NOTIFICATION)");
continue;
}

// ------------------------------------------------------⌄ FIX #4 (new signature)
handled_events += handle_events(&events, &mut streams, &mut handled_ids)?;
}

println!("FINISHED");
Ok(())
}
129 changes: 129 additions & 0 deletions ch04/c-kqueue/src/poll.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use std::{
io::{self, Result},
net::TcpStream,
os::fd::AsRawFd,
};

use crate::ffi;

type Events = Vec<ffi::Kevent>;

pub struct Poll {
registry: Registry,
}

impl Poll {
pub fn new() -> Result<Self> {
let res = unsafe { ffi::kqueue() };
if res < 0 {
return Err(io::Error::last_os_error());
}

Ok(Self {
registry: Registry { raw_fd: res },
})
}

pub fn registry(&self) -> &Registry {
&self.registry
}

/// Makes a blocking call to the OS parking the calling thread. It will wake up
/// when one or more events we've registered interest in have occurred or
/// the timeout duration has elapsed, whichever occurs first.
///
/// # Note
/// If the number of events returned is 0, the wakeup was due to an elapsed
/// timeout
pub fn poll(&mut self, events: &mut Events, timeout: Option<i32>) -> Result<()> {
let fd = self.registry.raw_fd;

// Timeout differs in kqueue from epoll in that it is a timespec struct
// Instead of sending -1 for no timeout we send the null pointer.
let timeout = timeout.map(|t| ffi::Timespec {
tv_sec: t as isize / 1000,
tv_nsec: (t as usize % 1000) * 1_000_000,
});

let timeout_ptr = match timeout {
Some(ref t) => t as *const ffi::Timespec,
None => std::ptr::null(),
};

let max_events = events.capacity() as i32;

// To poll with kevent we need to call kevent
// with a null changelist, and 0 changes.
// Critically we provide an out pointer to the
// eventlist where kevent will store the events
// for which we're being notified.
let res = unsafe {
ffi::kevent(
fd,
std::ptr::null(),
0,
events.as_mut_ptr(),
max_events,
timeout_ptr,
)
};

if res < 0 {
return Err(io::Error::last_os_error());
};

// This is safe because kevent ensures that `res` events are assigned.
unsafe { events.set_len(res as usize) };
Ok(())
}
}

pub struct Registry {
raw_fd: i32,
}

impl Registry {
pub fn register(&self, source: &TcpStream, token: usize, interests: i32) -> Result<()> {
let flags = interests as u16;

let event = ffi::Kevent {
ident: source.as_raw_fd() as u64,
filter: ffi::EVFILT_READ,
flags,
fflags: 0,
data: 0,
udata: token as u64,
};

// To register an event we need to call kevent
// with a Kevent struct, 1 change and null eventlist
// and timeout.
// This is analagous to the epoll_ctl call in the original.
let res = unsafe {
ffi::kevent(
self.raw_fd,
&event,
1,
std::ptr::null_mut(),
0,
std::ptr::null(),
)
};

if res < 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
}

impl Drop for Registry {
fn drop(&mut self) {
let res = unsafe { ffi::close(self.raw_fd) };

if res < 0 {
let err = io::Error::last_os_error();
println!("ERROR: {err:?}");
}
}
}