Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions one_collect/src/perf_event/rb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,21 @@ pub fn cpu_count() -> u32 {
}
}

/// Size in bytes of the per-CPU ring's data area for a user-supplied page
/// count.
///
/// The kernel requires the data area to be a power-of-two number of
/// pages, so the user's `page_count` is rounded up via
/// `next_power_of_two()`. The actual `mmap` is one additional metadata
/// page on top of this (`ring_data_bytes(n) + PAGE_SIZE`).
///
/// This is the bound the `wakeup_events_watermark` must be strictly
/// smaller than for the kernel to ever wake the perf fd.
pub(super) fn ring_data_bytes(page_count: usize) -> usize {
let page_size = unsafe { sysconf(_SC_PAGE_SIZE) as usize };
page_count.next_power_of_two() * page_size
}

pub fn perf_timestamp(
attr: &perf_event_attr) -> u64 {
unsafe {
Expand Down Expand Up @@ -287,6 +302,19 @@ impl RingBufBuilder {
}
}

impl<T> RingBufBuilder<T> {
/// Configure the kernel to wake up the perf fd only after the ring
/// buffer has accumulated at least `bytes` of data.
///
/// Sets `FLAG_WATERMARK` and `wakeup_events_watermark` on the
/// underlying `perf_event_attr`. A value of `0` keeps the kernel
/// default of waking on every event.
pub(crate) fn set_wakeup_watermark(&mut self, bytes: u32) {
self.attributes.flags |= FLAG_WATERMARK;
self.attributes.wakeup_events_watermark = bytes;
}
}

impl RingBufOptions for RingBufBuilder<Profiling> {
fn clone_options(&self) -> Self {
Self {
Expand Down Expand Up @@ -843,11 +871,11 @@ impl CpuRingBuf {
"Ring buffer is not open."));
}

let page_count = page_count.next_power_of_two() + 1;
let data_bytes = ring_data_bytes(page_count);

unsafe {
let page_size = sysconf(_SC_PAGE_SIZE) as usize;
let pages_len = page_count * page_size;
let pages_len = data_bytes + page_size; /* + metadata page */

let pages = mmap(
std::ptr::null_mut::<u8>() as *mut c_void,
Expand Down
160 changes: 160 additions & 0 deletions one_collect/src/perf_event/rb/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct RingBufSessionBuilder {
pages: usize,
target_pids: Option<Vec<i32>>,
target_cpus: Option<Vec<u16>>,
wakeup_watermark: Option<u32>,
kernel_builder: Option<RingBufBuilder<Kernel>>,
event_builder: Option<RingBufBuilder<Tracepoint>>,
profiling_builder: Option<RingBufBuilder<Profiling>>,
Expand All @@ -58,6 +59,7 @@ impl RingBufSessionBuilder {
pages: 1,
target_pids: None,
target_cpus: None,
wakeup_watermark: None,
kernel_builder: None,
event_builder: None,
profiling_builder: None,
Expand Down Expand Up @@ -88,6 +90,7 @@ impl RingBufSessionBuilder {
pages: self.pages,
target_pids: pids,
target_cpus: self.target_cpus.take(),
wakeup_watermark: self.wakeup_watermark,
kernel_builder: self.kernel_builder.take(),
event_builder: self.event_builder.take(),
profiling_builder: self.profiling_builder.take(),
Expand Down Expand Up @@ -118,6 +121,7 @@ impl RingBufSessionBuilder {
pages: self.pages,
target_pids: self.target_pids.take(),
target_cpus: cpus,
wakeup_watermark: self.wakeup_watermark,
kernel_builder: self.kernel_builder.take(),
event_builder: self.event_builder.take(),
profiling_builder: self.profiling_builder.take(),
Expand All @@ -136,6 +140,48 @@ impl RingBufSessionBuilder {
pages,
target_pids: self.target_pids.take(),
target_cpus: self.target_cpus.take(),
wakeup_watermark: self.wakeup_watermark,
kernel_builder: self.kernel_builder.take(),
event_builder: self.event_builder.take(),
profiling_builder: self.profiling_builder.take(),
cswitch_builder: self.cswitch_builder.take(),
bpf_builder: self.bpf_builder.take(),
soft_page_faults_builder: self.soft_page_faults_builder.take(),
hard_page_faults_builder: self.hard_page_faults_builder.take(),
hooks: self.hooks.take(),
}
}

/// Configure the kernel to defer marking a per-CPU perf fd readable
/// until at least `bytes` of data have accumulated in its ring
/// buffer.
///
/// By default the kernel marks the fd readable on every event.
/// Raising the watermark amortizes wakeup cost in exchange for
/// additional latency.
///
/// This setting only takes effect for consumers that wait on the
/// per-CPU perf fds (for example via `epoll(2)` or
/// `tokio::io::unix::AsyncFd`). The default [`PerfSession`] read
/// loop sleeps on a timeout and does not consult the fd's readable
/// state, so it sees no behavior change from this method.
///
/// `bytes` must be strictly smaller than the per-CPU ring data area
/// configured by [`Self::with_page_count`]; otherwise the kernel
/// would never reach the watermark and the fd would never be marked
/// readable. [`Self::build`] returns an error if this constraint is
/// violated.
///
/// A value of `0` keeps the kernel default of marking the fd
/// readable on every event.
pub fn with_wakeup_watermark(
&mut self,
bytes: u32) -> Self {
Self {
pages: self.pages,
target_pids: self.target_pids.take(),
target_cpus: self.target_cpus.take(),
wakeup_watermark: Some(bytes),
kernel_builder: self.kernel_builder.take(),
event_builder: self.event_builder.take(),
profiling_builder: self.profiling_builder.take(),
Expand All @@ -154,6 +200,7 @@ impl RingBufSessionBuilder {
pages: self.pages,
target_pids: self.target_pids.take(),
target_cpus: self.target_cpus.take(),
wakeup_watermark: self.wakeup_watermark,
kernel_builder: Some(builder),
event_builder: self.event_builder.take(),
profiling_builder: self.profiling_builder.take(),
Expand Down Expand Up @@ -183,6 +230,7 @@ impl RingBufSessionBuilder {
pages: self.pages,
target_pids: self.target_pids.take(),
target_cpus: self.target_cpus.take(),
wakeup_watermark: self.wakeup_watermark,
kernel_builder: self.kernel_builder.take(),
event_builder: Some(builder),
profiling_builder: self.profiling_builder.take(),
Expand Down Expand Up @@ -212,6 +260,7 @@ impl RingBufSessionBuilder {
pages: self.pages,
target_pids: self.target_pids.take(),
target_cpus: self.target_cpus.take(),
wakeup_watermark: self.wakeup_watermark,
kernel_builder: self.kernel_builder.take(),
event_builder: self.event_builder.take(),
profiling_builder: Some(builder),
Expand Down Expand Up @@ -241,6 +290,7 @@ impl RingBufSessionBuilder {
pages: self.pages,
target_pids: self.target_pids.take(),
target_cpus: self.target_cpus.take(),
wakeup_watermark: self.wakeup_watermark,
kernel_builder: self.kernel_builder.take(),
event_builder: self.event_builder.take(),
profiling_builder: self.profiling_builder.take(),
Expand Down Expand Up @@ -270,6 +320,7 @@ impl RingBufSessionBuilder {
pages: self.pages,
target_pids: self.target_pids.take(),
target_cpus: self.target_cpus.take(),
wakeup_watermark: self.wakeup_watermark,
kernel_builder: self.kernel_builder.take(),
event_builder: self.event_builder.take(),
profiling_builder: self.profiling_builder.take(),
Expand Down Expand Up @@ -299,6 +350,7 @@ impl RingBufSessionBuilder {
pages: self.pages,
target_pids: self.target_pids.take(),
target_cpus: self.target_cpus.take(),
wakeup_watermark: self.wakeup_watermark,
kernel_builder: self.kernel_builder.take(),
event_builder: self.event_builder.take(),
profiling_builder: self.profiling_builder.take(),
Expand Down Expand Up @@ -328,6 +380,7 @@ impl RingBufSessionBuilder {
pages: self.pages,
target_pids: self.target_pids.take(),
target_cpus: self.target_cpus.take(),
wakeup_watermark: self.wakeup_watermark,
kernel_builder: self.kernel_builder.take(),
event_builder: self.event_builder.take(),
profiling_builder: self.profiling_builder.take(),
Expand Down Expand Up @@ -365,6 +418,7 @@ impl RingBufSessionBuilder {
pages: self.pages,
target_pids: self.target_pids.take(),
target_cpus: self.target_cpus.take(),
wakeup_watermark: self.wakeup_watermark,
kernel_builder: self.kernel_builder.take(),
event_builder: self.event_builder.take(),
profiling_builder: self.profiling_builder.take(),
Expand Down Expand Up @@ -392,6 +446,41 @@ impl RingBufSessionBuilder {
}
}

/* Apply wakeup watermark to the kernel/leader perf_event_attr. The
* leader fd is the one the kernel wakes when data lands in its
* per-CPU ring; all other event types redirect into the same
* ring, so the leader's watermark covers them.
*
* The watermark must be strictly smaller than the per-CPU ring
* data area (see `ring_data_bytes`). The kernel does not validate
* this at `perf_event_open()` time and silently never wakes the fd
* if the watermark cannot be reached, so we reject the
* misconfiguration here. */
if let Some(bytes) = self.wakeup_watermark.take() {
let ring_bytes = ring_data_bytes(self.pages);

if (bytes as usize) >= ring_bytes {
let msg = format!(
"wakeup_watermark ({} bytes) must be smaller than the \
per-CPU ring data area ({} bytes from {} pages); the \
kernel would never wake the perf fds",
bytes, ring_bytes, self.pages);
warn!("RingBufSessionBuilder::build: {}", msg);
return Err(io_error(&msg));
}

let mut kernel = self.kernel_builder
.take()
.unwrap_or_else(RingBufBuilder::for_kernel);

kernel.set_wakeup_watermark(bytes);
self.kernel_builder = Some(kernel);

debug!(
"RingBufSessionBuilder::build: wakeup_watermark applied, bytes={}, ring_bytes={}",
bytes, ring_bytes);
}

let mut source = RingBufDataSource::new(
self.pages,
self.target_pids.take(),
Expand Down Expand Up @@ -1340,4 +1429,75 @@ mod tests {
println!("Got {} samples", count);
assert!(count >= 100);
}

#[test]
fn wakeup_watermark_rejected_when_equal_to_ring_size() {
/* A watermark equal to the data area must be rejected by build();
* the kernel would otherwise never reach it. */
let pages = 1;
let ring_bytes = ring_data_bytes(pages);
let watermark = ring_bytes as u32;
let err = match RingBufSessionBuilder::new()
.with_page_count(pages)
.with_wakeup_watermark(watermark)
.build()
{
Ok(_) => panic!("build() must reject watermark == ring size"),
Err(err) => err,
};

let expected = format!(
"wakeup_watermark ({} bytes) must be smaller than the per-CPU \
ring data area ({} bytes from {} pages); the kernel would \
never wake the perf fds",
watermark, ring_bytes, pages);
assert_eq!(format!("{}", err), expected);
assert_eq!(err.kind(), std::io::ErrorKind::Other);
}

#[test]
fn wakeup_watermark_rejected_when_above_ring_size() {
/* A watermark strictly larger than the data area must also be
* rejected. */
let pages = 1;
let ring_bytes = ring_data_bytes(pages);
let watermark = (ring_bytes + 1) as u32;
let err = match RingBufSessionBuilder::new()
.with_page_count(pages)
.with_wakeup_watermark(watermark)
.build()
{
Ok(_) => panic!("build() must reject watermark > ring size"),
Err(err) => err,
};

let expected = format!(
"wakeup_watermark ({} bytes) must be smaller than the per-CPU \
ring data area ({} bytes from {} pages); the kernel would \
never wake the perf fds",
watermark, ring_bytes, pages);
assert_eq!(format!("{}", err), expected);
assert_eq!(err.kind(), std::io::ErrorKind::Other);
}

#[test]
fn wakeup_watermark_accepted_when_below_ring_size() {
/* A watermark strictly smaller than the data area must pass the
* builder-level validation. */
let pages = 1;
let ring_bytes = ring_data_bytes(pages);
let watermark = (ring_bytes - 1) as u32;
if let Err(err) = RingBufSessionBuilder::new()
.with_page_count(pages)
.with_wakeup_watermark(watermark)
.build()
{
let msg = format!("{}", err);
assert!(
!msg.contains("wakeup_watermark"),
"watermark < ring size must not trigger validation error, \
got: {}",
msg);
}
}
}
Loading