diff --git a/examples/cxx/profiling.cpp b/examples/cxx/profiling.cpp index 32ece1610..fe62faac3 100644 --- a/examples/cxx/profiling.cpp +++ b/examples/cxx/profiling.cpp @@ -184,6 +184,11 @@ int main() { // Export to Datadog std::cout << "\n=== Exporting to Datadog ===" << std::endl; + // Create a cancellation token for the export + // In a real application, you could clone this and cancel from another thread + // Example: auto token_clone = cancel_token->clone_token(); token_clone->cancel(); + auto cancel_token = new_cancellation_token(); + try { // Example: Create an additional file to attach (e.g., application metadata) std::string app_metadata = R"({ @@ -217,7 +222,7 @@ int main() { std::cout << "Exporting profile to Datadog with additional metadata..." << std::endl; - exporter->send_profile( + exporter->send_profile_with_cancellation( *profile, // Files to compress and attach {AttachmentFile{ @@ -234,7 +239,8 @@ int main() { // Internal metadata (JSON string) R"({"profiler_version": "1.0", "custom_field": "demo"})", // System info (JSON string) - R"({"os": "macos", "arch": "arm64", "cores": 8})" + R"({"os": "macos", "arch": "arm64", "cores": 8})", + *cancel_token ); std::cout << "✅ Profile exported successfully!" << std::endl; } else { @@ -256,7 +262,7 @@ int main() { std::cout << "Exporting profile to Datadog with additional metadata..." << std::endl; - exporter->send_profile( + exporter->send_profile_with_cancellation( *profile, // Files to compress and attach {AttachmentFile{ @@ -273,7 +279,8 @@ int main() { // Internal metadata (JSON string) R"({"profiler_version": "1.0", "custom_field": "demo"})", // System info (JSON string) - R"({"os": "macos", "arch": "arm64", "cores": 8})" + R"({"os": "macos", "arch": "arm64", "cores": 8})", + *cancel_token ); std::cout << "✅ Profile exported successfully!" << std::endl; } diff --git a/libdd-profiling-ffi/cbindgen.toml b/libdd-profiling-ffi/cbindgen.toml index 187d1b8c8..d58ab11da 100644 --- a/libdd-profiling-ffi/cbindgen.toml +++ b/libdd-profiling-ffi/cbindgen.toml @@ -103,7 +103,7 @@ renaming_overrides_prefixing = true "Handle_EncodedProfile" = "ddog_prof_EncodedProfile" "Result_HandleEncodedProfile" = "ddog_prof_EncodedProfile_Result" -"CancellationToken" = "struct ddog_OpaqueCancellationToken" +"CancellationToken" = "ddog_OpaqueCancellationToken" "Handle_TokioCancellationToken" = "ddog_CancellationToken" "ArcHandle_ProfilesDictionary" = "ddog_prof_ProfilesDictionaryHandle" diff --git a/libdd-profiling/src/cxx.rs b/libdd-profiling/src/cxx.rs index 9c332f042..5432cd88d 100644 --- a/libdd-profiling/src/cxx.rs +++ b/libdd-profiling/src/cxx.rs @@ -74,6 +74,13 @@ pub mod ffi { extern "Rust" { type Profile; type ProfileExporter; + type CancellationToken; + + // CancellationToken factory and methods + fn new_cancellation_token() -> Box; + fn clone_token(self: &CancellationToken) -> Box; + fn cancel(self: &CancellationToken); + fn is_cancelled(self: &CancellationToken) -> bool; // Static factory methods for Profile #[Self = "Profile"] @@ -162,6 +169,36 @@ pub mod ffi { internal_metadata: &str, info: &str, ) -> Result<()>; + + /// Sends a profile to Datadog with cancellation support. + /// + /// This is the same as `send_profile`, but allows cancelling the operation from another + /// thread using a cancellation token. + /// + /// # Arguments + /// * `profile` - Profile to send (will be reset after sending) + /// * `files_to_compress` - Additional files to compress and attach (e.g., heap dumps) + /// * `additional_tags` - Per-profile tags (in addition to exporter-level tags) + /// * `process_tags` - Process-level tags as comma-separated string (e.g., + /// "runtime:native,profiler_version:1.0") Pass empty string "" if not needed + /// * `internal_metadata` - Internal metadata as JSON string (e.g., `{"key": "value"}`) See + /// Datadog-internal "RFC: Attaching internal metadata to pprof profiles" Pass empty + /// string "" if not needed + /// * `info` - System/environment info as JSON string (e.g., `{"os": "linux", "arch": + /// "x86_64"}`) See Datadog-internal "RFC: Pprof System Info Support" Pass empty string "" + /// if not needed + /// * `cancel` - Cancellation token to cancel the send operation + #[allow(clippy::too_many_arguments)] + fn send_profile_with_cancellation( + self: &ProfileExporter, + profile: &mut Profile, + files_to_compress: Vec, + additional_tags: Vec, + process_tags: &str, + internal_metadata: &str, + info: &str, + cancel: &CancellationToken, + ) -> Result<()>; } } @@ -245,6 +282,50 @@ impl<'a> TryFrom<&ffi::Tag<'a>> for exporter::Tag { } } +// ============================================================================ +// CancellationToken - Wrapper around tokio_util::sync::CancellationToken +// ============================================================================ + +pub struct CancellationToken { + inner: tokio_util::sync::CancellationToken, +} + +/// Creates a new cancellation token. +pub fn new_cancellation_token() -> Box { + Box::new(CancellationToken { + inner: tokio_util::sync::CancellationToken::new(), + }) +} + +impl CancellationToken { + /// Clones the cancellation token. + /// + /// A cloned token is connected to the original token - either can be used + /// to cancel or check cancellation status. The useful part is that they have + /// independent lifetimes and can be dropped separately. + /// + /// This is useful for multi-threaded scenarios where one thread performs the + /// send operation while another thread can cancel it. + pub fn clone_token(&self) -> Box { + Box::new(CancellationToken { + inner: self.inner.clone(), + }) + } + + /// Cancels the token. + /// + /// Note that cancellation is a terminal state; calling cancel multiple times + /// has no additional effect. + pub fn cancel(&self) { + self.inner.cancel(); + } + + /// Returns true if the token has been cancelled. + pub fn is_cancelled(&self) -> bool { + self.inner.is_cancelled() + } +} + // ============================================================================ // Profile - Wrapper around internal::Profile // ============================================================================ @@ -455,6 +536,63 @@ impl ProfileExporter { process_tags: &str, internal_metadata: &str, info: &str, + ) -> anyhow::Result<()> { + self.send_profile_impl( + profile, + files_to_compress, + additional_tags, + process_tags, + internal_metadata, + info, + None, + ) + } + + /// Sends a profile to Datadog with cancellation support. + /// + /// # Arguments + /// * `profile` - Profile to send (will be reset after sending) + /// * `files_to_compress` - Additional files to compress and attach + /// * `additional_tags` - Per-profile tags (in addition to exporter-level tags) + /// * `process_tags` - Process-level tags as comma-separated string. Empty string if not needed. + /// * `internal_metadata` - Internal metadata as JSON string. Empty string if not needed. + /// Example: `{"custom_field": "value", "version": "1.0"}` + /// * `info` - System/environment info as JSON string. Empty string if not needed. Example: + /// `{"os": "linux", "arch": "x86_64", "kernel": "5.15.0"}` + /// * `cancel` - Cancellation token to cancel the send operation + #[allow(clippy::too_many_arguments)] + pub fn send_profile_with_cancellation( + &self, + profile: &mut Profile, + files_to_compress: Vec, + additional_tags: Vec, + process_tags: &str, + internal_metadata: &str, + info: &str, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + self.send_profile_impl( + profile, + files_to_compress, + additional_tags, + process_tags, + internal_metadata, + info, + Some(&cancel.inner), + ) + } + + /// Internal implementation shared by send_profile and send_profile_with_cancellation + #[allow(clippy::too_many_arguments)] + fn send_profile_impl( + &self, + profile: &mut Profile, + files_to_compress: Vec, + additional_tags: Vec, + process_tags: &str, + internal_metadata: &str, + info: &str, + cancel: Option<&tokio_util::sync::CancellationToken>, ) -> anyhow::Result<()> { // Reset the profile and get the old one to export let old_profile = profile.inner.reset_and_return_previous()?; @@ -506,7 +644,7 @@ impl ProfileExporter { internal_metadata_json, info_json, )?; - let response = self.inner.send(request, None)?; + let response = self.inner.send(request, cancel)?; // Check response status if !response.status().is_success() {