diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index e3421dfd8d..d683e9df0b 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -24,6 +24,7 @@ use datadog_sidecar::config::LogMethod; use datadog_sidecar::crashtracker::crashtracker_unix_socket_path; use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener}; use datadog_sidecar::service::agent_info::AgentInfoReader; +use datadog_sidecar::service::telemetry::InternalTelemetryAction; use datadog_sidecar::service::{ blocking::{self, SidecarTransport}, DynamicInstrumentationConfigState, InstanceId, QueueId, RuntimeMetadata, @@ -39,6 +40,8 @@ use libdd_common_ffi::{self as ffi, MaybeError}; #[cfg(windows)] use libdd_crashtracker_ffi::Metadata; use libdd_dogstatsd_client::DogStatsDActionOwned; +use libdd_telemetry::data::metrics::{MetricNamespace, MetricType}; +use libdd_telemetry::metrics::MetricContext; use libdd_telemetry::{ data::{self, Dependency, Integration}, worker::{LifecycleAction, LogIdentifier, TelemetryActions}, @@ -683,6 +686,60 @@ fn char_slice_to_string(slice: CharSlice) -> Result { .map_err(|e| format!("Failed to convert CharSlice to String: {e}")) } +struct TelemetryContext { + instance_id: InstanceId, + service_name: String, + env_name: String, +} + +impl TelemetryContext { + fn from_ffi( + session_id_ffi: CharSlice, + runtime_id_ffi: CharSlice, + service_name_ffi: CharSlice, + env_name_ffi: CharSlice, + ) -> Result { + if session_id_ffi.is_empty() { + return Err("Null or empty session_id".into()); + } + if runtime_id_ffi.is_empty() { + return Err("Null or empty runtime_id".into()); + } + if service_name_ffi.is_empty() { + return Err("Null or empty service_name".into()); + } + if env_name_ffi.is_empty() { + return Err("Null or empty env_name".into()); + } + + Ok(Self { + instance_id: InstanceId::new( + char_slice_to_string(session_id_ffi)?, + char_slice_to_string(runtime_id_ffi)?, + ), + service_name: char_slice_to_string(service_name_ffi)?, + env_name: char_slice_to_string(env_name_ffi)?, + }) + } + + /// Sends a telemetry action through the internal telemetry channel + fn send_action(self, action: InternalTelemetryAction) -> Result<(), String> { + let sender = get_telemetry_action_sender() + .map_err(|e| format!("Failed to get telemetry action sender: {e}"))?; + + let msg = InternalTelemetryActions { + instance_id: self.instance_id, + service_name: self.service_name, + env_name: self.env_name, + actions: vec![action], + }; + + sender + .try_send(msg) + .map_err(|e| format!("Failed to send telemetry action: {e}")) + } +} + #[allow(clippy::too_many_arguments)] fn ddog_sidecar_enqueue_telemetry_log_impl( session_id_ffi: CharSlice, @@ -696,31 +753,19 @@ fn ddog_sidecar_enqueue_telemetry_log_impl( tags_ffi: Option>, is_sensitive: bool, ) -> Result<(), String> { - if session_id_ffi.is_empty() - || runtime_id_ffi.is_empty() - || service_name_ffi.is_empty() - || env_name_ffi.is_empty() - || identifier_ffi.is_empty() - || message_ffi.is_empty() - { + if identifier_ffi.is_empty() || message_ffi.is_empty() { return Err("Null or empty required arguments".into()); } - let sender = match get_telemetry_action_sender() { - Ok(s) => s, - Err(e) => { - return Err(format!("Failed to get telemetry action sender: {e}")); - } - }; + let ctx = TelemetryContext::from_ffi( + session_id_ffi, + runtime_id_ffi, + service_name_ffi, + env_name_ffi, + )?; - let instance_id = InstanceId::new( - char_slice_to_string(session_id_ffi)?, - char_slice_to_string(runtime_id_ffi)?, - ); - let service_name: String = char_slice_to_string(service_name_ffi)?; - let env_name: String = char_slice_to_string(env_name_ffi)?; - let identifier: String = char_slice_to_string(identifier_ffi)?; - let message: String = char_slice_to_string(message_ffi)?; + let identifier = char_slice_to_string(identifier_ffi)?; + let message = char_slice_to_string(message_ffi)?; let stack_trace = stack_trace_ffi .map(|s| char_slice_to_string(*unsafe { s.as_ref() })) @@ -746,17 +791,136 @@ fn ddog_sidecar_enqueue_telemetry_log_impl( }; let log_action = TelemetryActions::AddLog((log_id, log_data)); - let msg = InternalTelemetryActions { - instance_id, - service_name, - env_name, - actions: vec![log_action], + ctx.send_action(InternalTelemetryAction::TelemetryAction(log_action)) +} + +/// Enqueues a telemetry point to be processed internally. +/// +/// # Safety +/// Pointers must be valid, strings must be null-terminated if not null. +#[no_mangle] +pub unsafe extern "C" fn ddog_sidecar_enqueue_telemetry_point( + session_id_ffi: CharSlice, + runtime_id_ffi: CharSlice, + service_name_ffi: CharSlice, + env_name_ffi: CharSlice, + metric_name_ffi: CharSlice, + value: f64, + tags_ffi: Option>, +) -> MaybeError { + try_c!(ddog_sidecar_enqueue_telemetry_point_impl( + session_id_ffi, + runtime_id_ffi, + service_name_ffi, + env_name_ffi, + metric_name_ffi, + value, + tags_ffi, + )); + MaybeError::None +} + +fn ddog_sidecar_enqueue_telemetry_point_impl( + session_id_ffi: CharSlice, + runtime_id_ffi: CharSlice, + service_name_ffi: CharSlice, + env_name_ffi: CharSlice, + metric_name_ffi: CharSlice, + value: f64, + tags_ffi: Option>, +) -> Result<(), String> { + if metric_name_ffi.is_empty() { + return Err("Null or empty metric_name".into()); + } + + let ctx = TelemetryContext::from_ffi( + session_id_ffi, + runtime_id_ffi, + service_name_ffi, + env_name_ffi, + )?; + + let metric_name = char_slice_to_string(metric_name_ffi)?; + + fn get_tags(tags_slice: CharSlice) -> Result, String> { + let tags = char_slice_to_string(tags_slice)?; + let (tags, error) = libdd_common::tag::parse_tags(tags.as_str()); + if let Some(error) = error { + return Err(error.to_string()); + } + Ok(tags) + } + + let tags = match tags_ffi { + Some(tags_slice) => get_tags(*unsafe { tags_slice.as_ref() })?, + None => Vec::default(), }; - match sender.try_send(msg) { - Ok(_) => Ok(()), - Err(err) => Err(format!("Failed to send telemetry action: {err}")), + ctx.send_action(InternalTelemetryAction::AddMetricPoint(( + value, + metric_name, + tags, + ))) +} + +/// Registers a telemetry metric to be processed internally. +/// +/// # Safety +/// Pointers must be valid, strings must be null-terminated if not null. +#[no_mangle] +pub unsafe extern "C" fn ddog_sidecar_enqueue_telemetry_metric( + session_id_ffi: CharSlice, + runtime_id_ffi: CharSlice, + service_name_ffi: CharSlice, + env_name_ffi: CharSlice, + metric_name_ffi: CharSlice, + metric_type: MetricType, + metric_namespace: MetricNamespace, +) -> MaybeError { + try_c!(ddog_sidecar_enqueue_telemetry_metric_impl( + session_id_ffi, + runtime_id_ffi, + service_name_ffi, + env_name_ffi, + metric_name_ffi, + metric_type, + metric_namespace, + )); + MaybeError::None +} + +#[allow(clippy::too_many_arguments)] +fn ddog_sidecar_enqueue_telemetry_metric_impl( + session_id_ffi: CharSlice, + runtime_id_ffi: CharSlice, + service_name_ffi: CharSlice, + env_name_ffi: CharSlice, + metric_name_ffi: CharSlice, + metric_type: MetricType, + metric_namespace: MetricNamespace, +) -> Result<(), String> { + if metric_name_ffi.is_empty() { + return Err("Null or empty metric_name".into()); } + + let ctx = TelemetryContext::from_ffi( + session_id_ffi, + runtime_id_ffi, + service_name_ffi, + env_name_ffi, + )?; + + let metric_name = char_slice_to_string(metric_name_ffi)?; + + ctx.send_action(InternalTelemetryAction::RegisterTelemetryMetric( + MetricContext { + name: metric_name, + tags: Vec::default(), + metric_type, + common: true, + namespace: metric_namespace, + }, + )) } /// Sends a trace to the sidecar via shared memory. diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index 33bf7a1951..2e7814d53c 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -364,12 +364,19 @@ pub fn path_for_telemetry(service: &str, env: &str) -> CString { CString::new(path).unwrap() } +#[derive(Debug)] +pub enum InternalTelemetryAction { + TelemetryAction(TelemetryActions), + RegisterTelemetryMetric(MetricContext), + AddMetricPoint((f64, String, Vec)), +} + #[derive(Debug)] pub struct InternalTelemetryActions { pub instance_id: InstanceId, pub service_name: String, pub env_name: String, - pub actions: Vec, + pub actions: Vec, } pub fn get_telemetry_action_sender() -> Result> { @@ -398,14 +405,33 @@ pub(crate) async fn telemetry_action_receiver_task(sidecar: SidecarServer) { ); let client = telemetry_client.lock_or_panic().worker.clone(); - for action in actions.actions { - let action_str = format!("{action:?}"); - match client.send_msg(action).await { - Ok(_) => { - debug!("Sent telemetry action to TelemetryWorker: {action_str}"); + for it_action in actions.actions { + match it_action { + InternalTelemetryAction::TelemetryAction(action) => { + let action_str = format!("{action:?}"); + match client.send_msg(action).await { + Ok(_) => { + debug!("Sent telemetry action to TelemetryWorker: {action_str}"); + } + Err(e) => { + warn!("Failed to send telemetry action {action_str} to TelemetryWorker: {e}"); + } + } } - Err(e) => { - warn!("Failed to send telemetry action {action_str} to TelemetryWorker: {e}"); + InternalTelemetryAction::RegisterTelemetryMetric(metric) => { + debug!("Registered telemetry metric: {metric:?}"); + telemetry_client.lock_or_panic().register_metric(metric); + } + InternalTelemetryAction::AddMetricPoint((value, name, tags)) => { + let actions_point = telemetry_client + .lock_or_panic() + .to_telemetry_point((name, value, tags)); + match client.send_msg(actions_point).await { + Ok(_) => {} + Err(e) => { + warn!("Failed to send telemetry point to TelemetryWorker: {e}"); + } + } } } }