From e9a52c0923552ce8a9ba8a35583bf504e42cabc5 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Wed, 20 May 2026 22:39:09 +0000 Subject: [PATCH] Add wakeup watermark --- one_collect/src/perf_event/rb/mod.rs | 32 ++++- one_collect/src/perf_event/rb/source.rs | 160 ++++++++++++++++++++++++ 2 files changed, 190 insertions(+), 2 deletions(-) diff --git a/one_collect/src/perf_event/rb/mod.rs b/one_collect/src/perf_event/rb/mod.rs index 6de5e89..8115397 100644 --- a/one_collect/src/perf_event/rb/mod.rs +++ b/one_collect/src/perf_event/rb/mod.rs @@ -111,6 +111,21 @@ pub fn cpu_count() -> u32 { } } +/// Size in bytes of the per-CPU ring's data area for a user-supplied page +/// count. +/// +/// The kernel requires the data area to be a power-of-two number of +/// pages, so the user's `page_count` is rounded up via +/// `next_power_of_two()`. The actual `mmap` is one additional metadata +/// page on top of this (`ring_data_bytes(n) + PAGE_SIZE`). +/// +/// This is the bound the `wakeup_events_watermark` must be strictly +/// smaller than for the kernel to ever wake the perf fd. +pub(super) fn ring_data_bytes(page_count: usize) -> usize { + let page_size = unsafe { sysconf(_SC_PAGE_SIZE) as usize }; + page_count.next_power_of_two() * page_size +} + pub fn perf_timestamp( attr: &perf_event_attr) -> u64 { unsafe { @@ -287,6 +302,19 @@ impl RingBufBuilder { } } +impl RingBufBuilder { + /// Configure the kernel to wake up the perf fd only after the ring + /// buffer has accumulated at least `bytes` of data. + /// + /// Sets `FLAG_WATERMARK` and `wakeup_events_watermark` on the + /// underlying `perf_event_attr`. A value of `0` keeps the kernel + /// default of waking on every event. + pub(crate) fn set_wakeup_watermark(&mut self, bytes: u32) { + self.attributes.flags |= FLAG_WATERMARK; + self.attributes.wakeup_events_watermark = bytes; + } +} + impl RingBufOptions for RingBufBuilder { fn clone_options(&self) -> Self { Self { @@ -843,11 +871,11 @@ impl CpuRingBuf { "Ring buffer is not open.")); } - let page_count = page_count.next_power_of_two() + 1; + let data_bytes = ring_data_bytes(page_count); unsafe { let page_size = sysconf(_SC_PAGE_SIZE) as usize; - let pages_len = page_count * page_size; + let pages_len = data_bytes + page_size; /* + metadata page */ let pages = mmap( std::ptr::null_mut::() as *mut c_void, diff --git a/one_collect/src/perf_event/rb/source.rs b/one_collect/src/perf_event/rb/source.rs index 62be1e7..4d8454f 100644 --- a/one_collect/src/perf_event/rb/source.rs +++ b/one_collect/src/perf_event/rb/source.rs @@ -36,6 +36,7 @@ pub struct RingBufSessionBuilder { pages: usize, target_pids: Option>, target_cpus: Option>, + wakeup_watermark: Option, kernel_builder: Option>, event_builder: Option>, profiling_builder: Option>, @@ -58,6 +59,7 @@ impl RingBufSessionBuilder { pages: 1, target_pids: None, target_cpus: None, + wakeup_watermark: None, kernel_builder: None, event_builder: None, profiling_builder: None, @@ -88,6 +90,7 @@ impl RingBufSessionBuilder { pages: self.pages, target_pids: pids, target_cpus: self.target_cpus.take(), + wakeup_watermark: self.wakeup_watermark, kernel_builder: self.kernel_builder.take(), event_builder: self.event_builder.take(), profiling_builder: self.profiling_builder.take(), @@ -118,6 +121,7 @@ impl RingBufSessionBuilder { pages: self.pages, target_pids: self.target_pids.take(), target_cpus: cpus, + wakeup_watermark: self.wakeup_watermark, kernel_builder: self.kernel_builder.take(), event_builder: self.event_builder.take(), profiling_builder: self.profiling_builder.take(), @@ -136,6 +140,48 @@ impl RingBufSessionBuilder { pages, target_pids: self.target_pids.take(), target_cpus: self.target_cpus.take(), + wakeup_watermark: self.wakeup_watermark, + kernel_builder: self.kernel_builder.take(), + event_builder: self.event_builder.take(), + profiling_builder: self.profiling_builder.take(), + cswitch_builder: self.cswitch_builder.take(), + bpf_builder: self.bpf_builder.take(), + soft_page_faults_builder: self.soft_page_faults_builder.take(), + hard_page_faults_builder: self.hard_page_faults_builder.take(), + hooks: self.hooks.take(), + } + } + + /// Configure the kernel to defer marking a per-CPU perf fd readable + /// until at least `bytes` of data have accumulated in its ring + /// buffer. + /// + /// By default the kernel marks the fd readable on every event. + /// Raising the watermark amortizes wakeup cost in exchange for + /// additional latency. + /// + /// This setting only takes effect for consumers that wait on the + /// per-CPU perf fds (for example via `epoll(2)` or + /// `tokio::io::unix::AsyncFd`). The default [`PerfSession`] read + /// loop sleeps on a timeout and does not consult the fd's readable + /// state, so it sees no behavior change from this method. + /// + /// `bytes` must be strictly smaller than the per-CPU ring data area + /// configured by [`Self::with_page_count`]; otherwise the kernel + /// would never reach the watermark and the fd would never be marked + /// readable. [`Self::build`] returns an error if this constraint is + /// violated. + /// + /// A value of `0` keeps the kernel default of marking the fd + /// readable on every event. + pub fn with_wakeup_watermark( + &mut self, + bytes: u32) -> Self { + Self { + pages: self.pages, + target_pids: self.target_pids.take(), + target_cpus: self.target_cpus.take(), + wakeup_watermark: Some(bytes), kernel_builder: self.kernel_builder.take(), event_builder: self.event_builder.take(), profiling_builder: self.profiling_builder.take(), @@ -154,6 +200,7 @@ impl RingBufSessionBuilder { pages: self.pages, target_pids: self.target_pids.take(), target_cpus: self.target_cpus.take(), + wakeup_watermark: self.wakeup_watermark, kernel_builder: Some(builder), event_builder: self.event_builder.take(), profiling_builder: self.profiling_builder.take(), @@ -183,6 +230,7 @@ impl RingBufSessionBuilder { pages: self.pages, target_pids: self.target_pids.take(), target_cpus: self.target_cpus.take(), + wakeup_watermark: self.wakeup_watermark, kernel_builder: self.kernel_builder.take(), event_builder: Some(builder), profiling_builder: self.profiling_builder.take(), @@ -212,6 +260,7 @@ impl RingBufSessionBuilder { pages: self.pages, target_pids: self.target_pids.take(), target_cpus: self.target_cpus.take(), + wakeup_watermark: self.wakeup_watermark, kernel_builder: self.kernel_builder.take(), event_builder: self.event_builder.take(), profiling_builder: Some(builder), @@ -241,6 +290,7 @@ impl RingBufSessionBuilder { pages: self.pages, target_pids: self.target_pids.take(), target_cpus: self.target_cpus.take(), + wakeup_watermark: self.wakeup_watermark, kernel_builder: self.kernel_builder.take(), event_builder: self.event_builder.take(), profiling_builder: self.profiling_builder.take(), @@ -270,6 +320,7 @@ impl RingBufSessionBuilder { pages: self.pages, target_pids: self.target_pids.take(), target_cpus: self.target_cpus.take(), + wakeup_watermark: self.wakeup_watermark, kernel_builder: self.kernel_builder.take(), event_builder: self.event_builder.take(), profiling_builder: self.profiling_builder.take(), @@ -299,6 +350,7 @@ impl RingBufSessionBuilder { pages: self.pages, target_pids: self.target_pids.take(), target_cpus: self.target_cpus.take(), + wakeup_watermark: self.wakeup_watermark, kernel_builder: self.kernel_builder.take(), event_builder: self.event_builder.take(), profiling_builder: self.profiling_builder.take(), @@ -328,6 +380,7 @@ impl RingBufSessionBuilder { pages: self.pages, target_pids: self.target_pids.take(), target_cpus: self.target_cpus.take(), + wakeup_watermark: self.wakeup_watermark, kernel_builder: self.kernel_builder.take(), event_builder: self.event_builder.take(), profiling_builder: self.profiling_builder.take(), @@ -365,6 +418,7 @@ impl RingBufSessionBuilder { pages: self.pages, target_pids: self.target_pids.take(), target_cpus: self.target_cpus.take(), + wakeup_watermark: self.wakeup_watermark, kernel_builder: self.kernel_builder.take(), event_builder: self.event_builder.take(), profiling_builder: self.profiling_builder.take(), @@ -392,6 +446,41 @@ impl RingBufSessionBuilder { } } + /* Apply wakeup watermark to the kernel/leader perf_event_attr. The + * leader fd is the one the kernel wakes when data lands in its + * per-CPU ring; all other event types redirect into the same + * ring, so the leader's watermark covers them. + * + * The watermark must be strictly smaller than the per-CPU ring + * data area (see `ring_data_bytes`). The kernel does not validate + * this at `perf_event_open()` time and silently never wakes the fd + * if the watermark cannot be reached, so we reject the + * misconfiguration here. */ + if let Some(bytes) = self.wakeup_watermark.take() { + let ring_bytes = ring_data_bytes(self.pages); + + if (bytes as usize) >= ring_bytes { + let msg = format!( + "wakeup_watermark ({} bytes) must be smaller than the \ + per-CPU ring data area ({} bytes from {} pages); the \ + kernel would never wake the perf fds", + bytes, ring_bytes, self.pages); + warn!("RingBufSessionBuilder::build: {}", msg); + return Err(io_error(&msg)); + } + + let mut kernel = self.kernel_builder + .take() + .unwrap_or_else(RingBufBuilder::for_kernel); + + kernel.set_wakeup_watermark(bytes); + self.kernel_builder = Some(kernel); + + debug!( + "RingBufSessionBuilder::build: wakeup_watermark applied, bytes={}, ring_bytes={}", + bytes, ring_bytes); + } + let mut source = RingBufDataSource::new( self.pages, self.target_pids.take(), @@ -1340,4 +1429,75 @@ mod tests { println!("Got {} samples", count); assert!(count >= 100); } + + #[test] + fn wakeup_watermark_rejected_when_equal_to_ring_size() { + /* A watermark equal to the data area must be rejected by build(); + * the kernel would otherwise never reach it. */ + let pages = 1; + let ring_bytes = ring_data_bytes(pages); + let watermark = ring_bytes as u32; + let err = match RingBufSessionBuilder::new() + .with_page_count(pages) + .with_wakeup_watermark(watermark) + .build() + { + Ok(_) => panic!("build() must reject watermark == ring size"), + Err(err) => err, + }; + + let expected = format!( + "wakeup_watermark ({} bytes) must be smaller than the per-CPU \ + ring data area ({} bytes from {} pages); the kernel would \ + never wake the perf fds", + watermark, ring_bytes, pages); + assert_eq!(format!("{}", err), expected); + assert_eq!(err.kind(), std::io::ErrorKind::Other); + } + + #[test] + fn wakeup_watermark_rejected_when_above_ring_size() { + /* A watermark strictly larger than the data area must also be + * rejected. */ + let pages = 1; + let ring_bytes = ring_data_bytes(pages); + let watermark = (ring_bytes + 1) as u32; + let err = match RingBufSessionBuilder::new() + .with_page_count(pages) + .with_wakeup_watermark(watermark) + .build() + { + Ok(_) => panic!("build() must reject watermark > ring size"), + Err(err) => err, + }; + + let expected = format!( + "wakeup_watermark ({} bytes) must be smaller than the per-CPU \ + ring data area ({} bytes from {} pages); the kernel would \ + never wake the perf fds", + watermark, ring_bytes, pages); + assert_eq!(format!("{}", err), expected); + assert_eq!(err.kind(), std::io::ErrorKind::Other); + } + + #[test] + fn wakeup_watermark_accepted_when_below_ring_size() { + /* A watermark strictly smaller than the data area must pass the + * builder-level validation. */ + let pages = 1; + let ring_bytes = ring_data_bytes(pages); + let watermark = (ring_bytes - 1) as u32; + if let Err(err) = RingBufSessionBuilder::new() + .with_page_count(pages) + .with_wakeup_watermark(watermark) + .build() + { + let msg = format!("{}", err); + assert!( + !msg.contains("wakeup_watermark"), + "watermark < ring size must not trigger validation error, \ + got: {}", + msg); + } + } }