Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions profiling/src/profiling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ static mut PROFILER: OnceLock<Profiler> = OnceLock::new();

pub static STACK_WALK_COUNT: AtomicU64 = AtomicU64::new(0);
pub static STACK_WALK_CPU_TIME_NS: AtomicU64 = AtomicU64::new(0);
pub static BACKGROUND_THREAD_CPU_TIME_NS: AtomicU64 = AtomicU64::new(0);
pub static DDPROF_TIME_CPU_TIME_NS: AtomicU64 = AtomicU64::new(0);
pub static DDPROF_UPLOAD_CPU_TIME_NS: AtomicU64 = AtomicU64::new(0);

fn cpu_time_delta_ns(now: ThreadTime, prev: ThreadTime) -> u64 {
match now.as_duration().checked_sub(prev.as_duration()) {
Expand All @@ -73,14 +74,14 @@ fn cpu_time_delta_ns(now: ThreadTime, prev: ThreadTime) -> u64 {
}
}

pub(crate) fn update_background_cpu_time(last: &mut Option<ThreadTime>) {
pub(crate) fn update_cpu_time_counter(last: &mut Option<ThreadTime>, counter: &AtomicU64) {
let Some(prev) = last.take() else {
*last = ThreadTime::try_now().ok();
return;
};
if let Ok(now) = ThreadTime::try_now() {
let elapsed_ns = cpu_time_delta_ns(now, prev);
BACKGROUND_THREAD_CPU_TIME_NS.fetch_add(elapsed_ns, Ordering::Relaxed);
counter.fetch_add(elapsed_ns, Ordering::Relaxed);
*last = Some(now);
} else {
*last = Some(prev);
Expand Down Expand Up @@ -595,6 +596,7 @@ impl TimeCollector {
let upload_tick = crossbeam_channel::tick(self.upload_period);
let never = crossbeam_channel::never();
let mut running = true;
let mut last_cpu = ThreadTime::try_now().ok();

while running {
// The crossbeam_channel::select! doesn't have the ability to
Expand All @@ -620,6 +622,7 @@ impl TimeCollector {
Self::handle_resource_message(message, &mut profiles),
ProfilerMessage::Cancel => {
// flush what we have before exiting
update_cpu_time_counter(&mut last_cpu, &DDPROF_TIME_CPU_TIME_NS);
last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export);
running = false;
},
Expand Down Expand Up @@ -657,6 +660,7 @@ impl TimeCollector {

recv(upload_tick) -> message => {
if message.is_ok() {
update_cpu_time_counter(&mut last_cpu, &DDPROF_TIME_CPU_TIME_NS);
last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export);
}
},
Expand Down
39 changes: 29 additions & 10 deletions profiling/src/profiling/uploader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::config::AgentEndpoint;
use crate::profiling::{
update_background_cpu_time, UploadMessage, UploadRequest, BACKGROUND_THREAD_CPU_TIME_NS,
STACK_WALK_COUNT, STACK_WALK_CPU_TIME_NS,
update_cpu_time_counter, UploadMessage, UploadRequest, DDPROF_TIME_CPU_TIME_NS,
DDPROF_UPLOAD_CPU_TIME_NS, STACK_WALK_COUNT, STACK_WALK_CPU_TIME_NS,
};
use crate::{PROFILER_NAME_STR, PROFILER_VERSION_STR};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -48,7 +48,7 @@ impl Uploader {
/// This function will not only create the internal metadata JSON representation, but is also
/// in charge to reset all those counters back to 0.
fn create_internal_metadata() -> Option<serde_json::Value> {
let capacity = 3 + cfg!(feature = "debug_stats") as usize * 3;
let capacity = 4 + cfg!(feature = "debug_stats") as usize * 3;
let mut metadata = serde_json::Map::with_capacity(capacity);
metadata.insert(
"stack_walk_count".to_string(),
Expand All @@ -59,8 +59,12 @@ impl Uploader {
json!(STACK_WALK_CPU_TIME_NS.swap(0, Ordering::Relaxed)),
);
metadata.insert(
"background_threads_cpu_time_ns".to_string(),
json!(BACKGROUND_THREAD_CPU_TIME_NS.swap(0, Ordering::Relaxed)),
"ddprof_time_cpu_time_ns".to_string(),
json!(DDPROF_TIME_CPU_TIME_NS.swap(0, Ordering::Relaxed)),
);
metadata.insert(
"ddprof_upload_cpu_time_ns".to_string(),
json!(DDPROF_UPLOAD_CPU_TIME_NS.swap(0, Ordering::Relaxed)),
);
#[cfg(feature = "debug_stats")]
{
Expand Down Expand Up @@ -89,7 +93,11 @@ impl Uploader {
Some(metadata)
}

fn upload(&self, message: Box<UploadRequest>) -> anyhow::Result<u16> {
fn upload(
&self,
message: Box<UploadRequest>,
last_cpu: &mut Option<ThreadTime>,
) -> anyhow::Result<u16> {
let index = message.index;
let profile = message.profile;

Expand All @@ -110,6 +118,11 @@ impl Uploader {
let serialized =
profile.serialize_into_compressed_pprof(Some(message.end_time), message.duration)?;
exporter.set_timeout(10000); // 10 seconds in milliseconds

// Capture CPU time up to this point. Note: metadata generation, exporter
// building, and HTTP request time will be attributed to the next profile.
update_cpu_time_counter(last_cpu, &DDPROF_UPLOAD_CPU_TIME_NS);

let request = exporter.build(
serialized,
&[],
Expand Down Expand Up @@ -147,7 +160,6 @@ impl Uploader {
},

Ok(UploadMessage::Upload(request)) => {
update_background_cpu_time(&mut last_cpu);
match pprof_filename {
Some(filename) => {
let filename_prefix = filename.as_ref();
Expand All @@ -157,7 +169,7 @@ impl Uploader {
std::fs::write(&name, r.buffer).expect("write to succeed");
info!("Successfully wrote profile to {name}");
},
None => match self.upload(request) {
None => match self.upload(request, &mut last_cpu) {
Ok(status) => {
if status >= 400 {
warn!("Unexpected HTTP status when sending profile (HTTP {status}).")
Expand Down Expand Up @@ -196,7 +208,8 @@ mod tests {
// Set up all counters with known values
STACK_WALK_COUNT.store(7, Ordering::Relaxed);
STACK_WALK_CPU_TIME_NS.store(9000, Ordering::Relaxed);
BACKGROUND_THREAD_CPU_TIME_NS.store(1234, Ordering::Relaxed);
DDPROF_TIME_CPU_TIME_NS.store(1234, Ordering::Relaxed);
DDPROF_UPLOAD_CPU_TIME_NS.store(5678, Ordering::Relaxed);
EXCEPTION_PROFILING_EXCEPTION_COUNT.store(42, Ordering::Relaxed);
ALLOCATION_PROFILING_COUNT.store(100, Ordering::Relaxed);
ALLOCATION_PROFILING_SIZE.store(1024, Ordering::Relaxed);
Expand All @@ -221,10 +234,16 @@ mod tests {
);
assert_eq!(
metadata
.get("background_threads_cpu_time_ns")
.get("ddprof_time_cpu_time_ns")
.and_then(|v| v.as_u64()),
Some(1234)
);
assert_eq!(
metadata
.get("ddprof_upload_cpu_time_ns")
.and_then(|v| v.as_u64()),
Some(5678)
);

assert_eq!(
metadata.get("exceptions_count").and_then(|v| v.as_u64()),
Expand Down
Loading