Skip to content

Commit bb6215b

Browse files
committed
ch04: Implement kqueue poll for macOS/BSD
1 parent 47b4f7d commit bb6215b

File tree

5 files changed

+322
-0
lines changed

5 files changed

+322
-0
lines changed

ch04/c-kqueue/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "c-kqueue"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]

ch04/c-kqueue/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# c-kqueue
2+
3+
This create contains a kqueue adaptation of the example code for chapter 4.
4+
It should work if you're running on macOS or BSD, but I've only tested
5+
on macOS.
6+
7+
Rather than attempt to duplicate, or re-explain the basic principles the
8+
comments in this code attempt to explain the differences between the epoll
9+
implementation and this one. My objective here was to enable the reader (and
10+
myself) to understand this code in the context of the book rather than
11+
standalone.
12+
13+
You can run the example by simply writing `cargo run`.
14+
15+
Otherwise see the [../a-epoll/README.md](../a-epoll/README.md) for further details.

ch04/c-kqueue/src/ffi.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/// FFI Functions for interacting with kqueue.
2+
/// Details on constants, functions, and parameters can be found on the
3+
/// kqueue(2) manual page.
4+
/// https://man.freebsd.org/cgi/man.cgi?query=kevent&sektion=2&manpath=macOS+15.7
5+
6+
pub const EVFILT_READ: i16 = -1; // Analagous to the EPOLLIN flag in the original.
7+
pub const EV_ADD: u16 = 0x1; // Add the event to the queue, analagous to EPOLL_CTL_ADD in the original.
8+
pub const EV_CLEAR: u16 = 0x20; // Reset the state after the user retrieves it, similar to EPOLLET.
9+
10+
#[derive(Debug)]
11+
#[repr(C)]
12+
// Timespec struct is a required input to the kevent syscall.
13+
// Used for setting timeout values.
14+
pub struct Timespec {
15+
pub tv_sec: isize,
16+
pub tv_nsec: usize,
17+
}
18+
19+
#[derive(Debug, Clone, Default)]
20+
#[repr(C)]
21+
// We use a Kevent for kqueue instead of the Event struct for epoll.
22+
// Consistent with Issue #5 in the repository this is not a packed struct on ARM Macs.
23+
pub struct Kevent {
24+
pub ident: u64,
25+
pub filter: i16,
26+
pub flags: u16,
27+
pub fflags: u32,
28+
pub data: isize,
29+
pub udata: u64,
30+
}
31+
32+
impl Kevent {
33+
pub fn token(&self) -> Option<usize> {
34+
// udata stores the token, analagous to the epoll_data field in the original.
35+
Some(self.udata as usize)
36+
}
37+
}
38+
39+
#[link(name = "c")]
40+
extern "C" {
41+
// Creates the event queue, analagous to epoll_create in the original.
42+
pub fn kqueue() -> i32;
43+
// Registers events with the queue and also used to check for pending events.
44+
// This performs the functionality of epoll_ctl and epoll_wait in the original,
45+
// depending on the parameters. See poll.rs for usage.
46+
pub fn kevent(
47+
kq: i32,
48+
changelist: *const Kevent,
49+
nchanges: i32,
50+
eventlist: *mut Kevent,
51+
nevents: i32,
52+
timeout: *const Timespec,
53+
) -> i32;
54+
// Closes the event queue, analagous to close in the original.
55+
pub fn close(d: i32) -> i32;
56+
}

ch04/c-kqueue/src/main.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/// From a pedagogical perspective the changes in this file from the book are uninteresting,
2+
/// they're simply adapted to the new ffi and poll implementations.
3+
use std::{
4+
collections::HashSet,
5+
env,
6+
io::{self, Read, Result, Write},
7+
net::TcpStream,
8+
};
9+
10+
use ffi::Kevent;
11+
use poll::Poll;
12+
13+
mod ffi;
14+
mod poll;
15+
16+
/// Not the entire url, but everyhing after the domain addr
17+
/// i.e. http://localhost/1000/hello => /1000/hello
18+
fn get_req(path: &str) -> String {
19+
format!(
20+
"GET {path} HTTP/1.1\r\n\
21+
Host: localhost\r\n\
22+
Connection: close\r\n\
23+
\r\n"
24+
)
25+
}
26+
27+
fn handle_events(
28+
events: &[Kevent],
29+
streams: &mut [TcpStream],
30+
handled: &mut HashSet<usize>,
31+
) -> Result<usize> {
32+
let mut handled_events = 0;
33+
for event in events {
34+
let index = event.token().unwrap();
35+
let mut data = vec![0u8; 4096];
36+
37+
loop {
38+
match streams[index].read(&mut data) {
39+
Ok(0) => {
40+
// FIX #4
41+
// `insert` returns false if the value already existed in the set.
42+
if !handled.insert(index) {
43+
break;
44+
}
45+
handled_events += 1;
46+
break;
47+
}
48+
Ok(n) => {
49+
let txt = String::from_utf8_lossy(&data[..n]);
50+
51+
println!("RECEIVED: {:?}", event);
52+
println!("{txt}\n------\n");
53+
}
54+
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
55+
// this was not in the book example, but it's a error condition
56+
// you probably want to handle in some way (either by breaking
57+
// out of the loop or trying a new read call immediately)
58+
Err(e) if e.kind() == io::ErrorKind::Interrupted => break,
59+
Err(e) => return Err(e),
60+
}
61+
}
62+
}
63+
64+
Ok(handled_events)
65+
}
66+
67+
fn main() -> Result<()> {
68+
let mut poll = Poll::new()?;
69+
let n_events = 5;
70+
71+
let mut streams = vec![];
72+
73+
// FIX #19: Allow to override the base URL by passing it as a command line argument
74+
let base_url = env::args()
75+
.nth(1)
76+
.unwrap_or_else(|| String::from("localhost"));
77+
78+
let addr = format!("{}:8080", &base_url);
79+
80+
for i in 0..n_events {
81+
let delay = (n_events - i) * 1000;
82+
let url_path = format!("/{delay}/request-{i}");
83+
let request = get_req(&url_path);
84+
let mut stream = std::net::TcpStream::connect(&addr)?;
85+
stream.set_nonblocking(true)?;
86+
87+
stream.write_all(request.as_bytes())?;
88+
// NB! Token is equal to index in Vec
89+
poll.registry()
90+
.register(&stream, i, (ffi::EV_ADD | ffi::EV_CLEAR) as i32)?;
91+
92+
streams.push(stream);
93+
}
94+
95+
// FIX #4: store the handled IDs
96+
let mut handled_ids = HashSet::new();
97+
98+
let mut handled_events = 0;
99+
while handled_events < n_events {
100+
let mut events = Vec::with_capacity(10);
101+
poll.poll(&mut events, None)?;
102+
103+
if events.is_empty() {
104+
println!("TIMEOUT (OR SPURIOUS EVENT NOTIFICATION)");
105+
continue;
106+
}
107+
108+
// ------------------------------------------------------⌄ FIX #4 (new signature)
109+
handled_events += handle_events(&events, &mut streams, &mut handled_ids)?;
110+
}
111+
112+
println!("FINISHED");
113+
Ok(())
114+
}

ch04/c-kqueue/src/poll.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
use std::{
2+
io::{self, Result},
3+
net::TcpStream,
4+
os::fd::AsRawFd,
5+
};
6+
7+
use crate::ffi;
8+
9+
type Events = Vec<ffi::Kevent>;
10+
11+
pub struct Poll {
12+
registry: Registry,
13+
}
14+
15+
impl Poll {
16+
pub fn new() -> Result<Self> {
17+
let res = unsafe { ffi::kqueue() };
18+
if res < 0 {
19+
return Err(io::Error::last_os_error());
20+
}
21+
22+
Ok(Self {
23+
registry: Registry { raw_fd: res },
24+
})
25+
}
26+
27+
pub fn registry(&self) -> &Registry {
28+
&self.registry
29+
}
30+
31+
/// Makes a blocking call to the OS parking the calling thread. It will wake up
32+
/// when one or more events we've registered interest in have occurred or
33+
/// the timeout duration has elapsed, whichever occurs first.
34+
///
35+
/// # Note
36+
/// If the number of events returned is 0, the wakeup was due to an elapsed
37+
/// timeout
38+
pub fn poll(&mut self, events: &mut Events, timeout: Option<i32>) -> Result<()> {
39+
let fd = self.registry.raw_fd;
40+
41+
// Timeout differs in kqueue from epoll in that it is a timespec struct
42+
// Instead of sending -1 for no timeout we send the null pointer.
43+
let timeout = timeout.map(|t| ffi::Timespec {
44+
tv_sec: t as isize / 1000,
45+
tv_nsec: (t as usize % 1000) * 1_000_000,
46+
});
47+
48+
let timeout_ptr = match timeout {
49+
Some(ref t) => t as *const ffi::Timespec,
50+
None => std::ptr::null(),
51+
};
52+
53+
let max_events = events.capacity() as i32;
54+
55+
// To poll with kevent we need to call kevent
56+
// with a null changelist, and 0 changes.
57+
// Critically we provide an out pointer to the
58+
// eventlist where kevent will store the events
59+
// for which we're being notified.
60+
let res = unsafe {
61+
ffi::kevent(
62+
fd,
63+
std::ptr::null(),
64+
0,
65+
events.as_mut_ptr(),
66+
max_events,
67+
timeout_ptr,
68+
)
69+
};
70+
71+
if res < 0 {
72+
return Err(io::Error::last_os_error());
73+
};
74+
75+
// This is safe because kevent ensures that `res` events are assigned.
76+
unsafe { events.set_len(res as usize) };
77+
Ok(())
78+
}
79+
}
80+
81+
pub struct Registry {
82+
raw_fd: i32,
83+
}
84+
85+
impl Registry {
86+
pub fn register(&self, source: &TcpStream, token: usize, interests: i32) -> Result<()> {
87+
let flags = interests as u16;
88+
89+
let event = ffi::Kevent {
90+
ident: source.as_raw_fd() as u64,
91+
filter: ffi::EVFILT_READ,
92+
flags,
93+
fflags: 0,
94+
data: 0,
95+
udata: token as u64,
96+
};
97+
98+
// To register an event we need to call kevent
99+
// with a Kevent struct, 1 change and null eventlist
100+
// and timeout.
101+
// This is analagous to the epoll_ctl call in the original.
102+
let res = unsafe {
103+
ffi::kevent(
104+
self.raw_fd,
105+
&event,
106+
1,
107+
std::ptr::null_mut(),
108+
0,
109+
std::ptr::null(),
110+
)
111+
};
112+
113+
if res < 0 {
114+
return Err(io::Error::last_os_error());
115+
}
116+
Ok(())
117+
}
118+
}
119+
120+
impl Drop for Registry {
121+
fn drop(&mut self) {
122+
let res = unsafe { ffi::close(self.raw_fd) };
123+
124+
if res < 0 {
125+
let err = io::Error::last_os_error();
126+
println!("ERROR: {err:?}");
127+
}
128+
}
129+
}

0 commit comments

Comments
 (0)