diff --git a/Cargo.lock b/Cargo.lock index 7b6e9fbe57..7c69112752 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2859,6 +2859,7 @@ dependencies = [ name = "libdd-capabilities-impl" version = "2.0.0" dependencies = [ + "anyhow", "bytes", "http", "http-body-util", @@ -2943,6 +2944,8 @@ dependencies = [ "goblin", "http", "libc", + "libdd-capabilities", + "libdd-capabilities-impl", "libdd-common", "libdd-libunwind-sys", "libdd-telemetry", @@ -3026,6 +3029,7 @@ dependencies = [ "tokio-util", "tracing", "uuid", + "web-time", ] [[package]] @@ -3339,11 +3343,13 @@ dependencies = [ "base64 0.22.1", "bytes", "futures", + "getrandom 0.2.15", "hashbrown 0.15.1", "http", - "http-body-util", "httpmock", "libc", + "libdd-capabilities", + "libdd-capabilities-impl", "libdd-common", "libdd-ddsketch", "libdd-shared-runtime", @@ -3355,6 +3361,7 @@ dependencies = [ "tracing", "tracing-subscriber", "uuid", + "web-time", "winver", ] @@ -3365,6 +3372,7 @@ dependencies = [ "build_common", "function_name", "libc", + "libdd-capabilities-impl", "libdd-common", "libdd-common-ffi", "libdd-telemetry", @@ -3457,6 +3465,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "web-time", ] [[package]] @@ -6532,6 +6541,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-root-certs" version = "1.0.5" diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 0e794bc021..ef04646f30 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -498,6 +498,7 @@ wasm-bindgen-macro,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/m wasm-bindgen-macro-support,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/macro-support,MIT OR Apache-2.0,The wasm-bindgen Developers wasm-bindgen-shared,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/shared,MIT OR Apache-2.0,The wasm-bindgen Developers web-sys,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/web-sys,MIT OR Apache-2.0,The wasm-bindgen Developers +web-time,https://github.com/daxpedda/web-time,MIT OR Apache-2.0,The web-time Authors webpki-root-certs,https://github.com/rustls/webpki-roots,CDLA-Permissive-2.0,The webpki-root-certs Authors webpki-roots,https://github.com/rustls/webpki-roots,CDLA-Permissive-2.0,The webpki-roots Authors widestring,https://github.com/VoidStarKat/widestring-rs,MIT OR Apache-2.0,The widestring Authors diff --git a/datadog-sidecar/src/self_telemetry.rs b/datadog-sidecar/src/self_telemetry.rs index 14f94ebf7e..f82c468619 100644 --- a/datadog-sidecar/src/self_telemetry.rs +++ b/datadog-sidecar/src/self_telemetry.rs @@ -4,12 +4,15 @@ use crate::config::Config; use crate::log; use crate::service::SidecarServer; use crate::watchdog::WatchdogHandle; +use libdd_capabilities_impl::NativeCapabilities; use libdd_common::{tag, tag::Tag, MutexExt}; use libdd_telemetry::data::metrics::{MetricNamespace, MetricType}; use libdd_telemetry::metrics::ContextKey; -use libdd_telemetry::worker::{ - LifecycleAction, TelemetryActions, TelemetryWorkerBuilder, TelemetryWorkerHandle, -}; +use libdd_telemetry::worker::{LifecycleAction, TelemetryActions, TelemetryWorkerBuilder}; + +/// The sidecar runs the telemetry worker on native, so its handle is pinned to +/// [`NativeCapabilities`]. +type TelemetryWorkerHandle = libdd_telemetry::worker::TelemetryWorkerHandle; use manual_future::ManualFuture; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; @@ -205,7 +208,7 @@ impl SelfTelemetry { crate::sidecar_version!().to_string(), ); builder.config = self.config.clone(); - let (worker, join_handle) = builder.spawn(); + let (worker, join_handle) = builder.spawn::(); let metrics = MetricData { worker: &worker, diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index 3e29f1c95d..02d6d5c8c0 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant}; use tokio::task::JoinHandle; use zwohash::ZwoHasher; +use libdd_capabilities_impl::NativeCapabilities; use libdd_common::tag::Tag; use libdd_telemetry::worker::TelemetryWorkerBuilder; use serde::{Deserialize, Serialize}; @@ -33,7 +34,11 @@ use std::time::SystemTime; use libdd_telemetry::config::Config; use libdd_telemetry::data::{self, Integration}; use libdd_telemetry::metrics::{ContextKey, MetricContext}; -use libdd_telemetry::worker::{LifecycleAction, TelemetryActions, TelemetryWorkerHandle}; +use libdd_telemetry::worker::{LifecycleAction, TelemetryActions}; + +/// Sidecar's telemetry worker is native-only, so its handle is pinned to +/// [`NativeCapabilities`]. +type TelemetryWorkerHandle = libdd_telemetry::worker::TelemetryWorkerHandle; use manual_future::ManualFuture; use serde_with::{serde_as, VecSkipError}; use tokio::time::{sleep, sleep_until, Instant as TokioInstant}; diff --git a/libdd-capabilities-impl/Cargo.toml b/libdd-capabilities-impl/Cargo.toml index 498989972e..9156cdeaa6 100644 --- a/libdd-capabilities-impl/Cargo.toml +++ b/libdd-capabilities-impl/Cargo.toml @@ -16,6 +16,7 @@ crate-type = ["lib"] bench = false [dependencies] +anyhow = "1" bytes = "1" http = "1" libdd-capabilities = { path = "../libdd-capabilities", version = "2.0.0" } diff --git a/libdd-capabilities-impl/src/http.rs b/libdd-capabilities-impl/src/http.rs index e6af5b853c..6ac252cb5a 100644 --- a/libdd-capabilities-impl/src/http.rs +++ b/libdd-capabilities-impl/src/http.rs @@ -4,6 +4,8 @@ //! Native HTTP client implementation backed by hyper. mod native { + use std::fs::OpenOptions; + use std::io::Write; use std::sync::{Arc, OnceLock}; use libdd_capabilities::http::{HttpClientCapability, HttpError}; @@ -26,6 +28,32 @@ mod native { } } + /// Write `body` as a newline-terminated record to the file referenced by `uri` (which must + /// have a `file://` scheme), then return a synthetic 202 response. The same on-disk format + /// the pre-capability telemetry worker used when configured with a `file://` endpoint, so + /// downstream tests that diff against the recorded payload bytes keep working. + fn write_to_file_endpoint( + uri: &http::Uri, + body: bytes::Bytes, + ) -> Result, HttpError> { + let path = libdd_common::decode_uri_path_in_authority(uri) + .map_err(|e| HttpError::Other(anyhow::anyhow!("invalid file:// URI: {e}")))?; + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .map_err(|e| HttpError::Other(anyhow::anyhow!("opening {path:?}: {e}")))?; + let mut record = body.to_vec(); + record.push(b'\n'); + file.write_all(&record) + .map_err(|e| HttpError::Other(anyhow::anyhow!("writing {path:?}: {e}")))?; + + http::Response::builder() + .status(http::StatusCode::ACCEPTED) + .body(bytes::Bytes::new()) + .map_err(|e| HttpError::Other(e.into())) + } + impl HttpClientCapability for NativeHttpClient { fn new_client() -> Self { Self { @@ -39,8 +67,15 @@ mod native { req: http::Request, ) -> impl std::future::Future, HttpError>> + MaybeSend { - let client = self.client.get_or_init(new_default_client).clone(); + let client_lock = self.client.clone(); async move { + // file:// URIs short-circuit to the on-disk recorder used by tests. + if req.uri().scheme_str() == Some("file") { + let (parts, body) = req.into_parts(); + return write_to_file_endpoint(&parts.uri, body); + } + + let client = client_lock.get_or_init(new_default_client).clone(); let hyper_req = req.map(Body::from_bytes); let response = client diff --git a/libdd-crashtracker/Cargo.toml b/libdd-crashtracker/Cargo.toml index 5e88a070c4..d42d101f67 100644 --- a/libdd-crashtracker/Cargo.toml +++ b/libdd-crashtracker/Cargo.toml @@ -49,6 +49,8 @@ anyhow = "1.0" chrono = {version = "0.4", default-features = false, features = ["std", "clock", "serde"]} cxx = { version = "1.0", optional = true } errno = "0.3" +libdd-capabilities = { version = "2.0.0", path = "../libdd-capabilities" } +libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl" } libdd-common = { version = "5.0.0", path = "../libdd-common" } libdd-telemetry = { version = "5.0.1", path = "../libdd-telemetry" } http = "1.1" diff --git a/libdd-crashtracker/src/crash_info/telemetry.rs b/libdd-crashtracker/src/crash_info/telemetry.rs index 3d92b6efa8..a6aa3895ad 100644 --- a/libdd-crashtracker/src/crash_info/telemetry.rs +++ b/libdd-crashtracker/src/crash_info/telemetry.rs @@ -7,6 +7,8 @@ use crate::{ErrorKind, SigInfo}; use super::{CrashInfo, Metadata, TARGET_TRIPLE}; use anyhow::Context; use chrono::{DateTime, Utc}; +use libdd_capabilities::HttpClientCapability; +use libdd_capabilities_impl::NativeCapabilities; use libdd_common::Endpoint; use libdd_telemetry::{ build_host, @@ -352,9 +354,9 @@ impl TelemetryCrashUploader { self.send_telemetry_payload(&payload).await } - /// Helper to perform actual HTTP (or file) submission via configured telemetry client + /// Helper to perform actual HTTP submission via the native HTTP capability. async fn send_telemetry_payload(&self, payload: &data::Telemetry<'_>) -> anyhow::Result<()> { - let client = libdd_telemetry::worker::http_client::from_config(&self.cfg); + let client = NativeCapabilities::new_client(); let req = request_builder(&self.cfg)? .method(http::Method::POST) .header( @@ -369,19 +371,20 @@ impl TelemetryCrashUploader { libdd_telemetry::worker::http_client::header::REQUEST_TYPE, "logs", ) - .body(serde_json::to_string(&payload)?.into())?; - - tokio::time::timeout( - std::time::Duration::from_millis({ - if let Some(endp) = self.cfg.endpoint() { - endp.timeout_ms - } else { - Endpoint::DEFAULT_TIMEOUT - } - }), - client.request(req), - ) - .await??; + .body(libdd_capabilities::Bytes::from(serde_json::to_vec( + &payload, + )?))?; + + let timeout = std::time::Duration::from_millis({ + if let Some(endp) = self.cfg.endpoint() { + endp.timeout_ms + } else { + Endpoint::DEFAULT_TIMEOUT + } + }); + tokio::time::timeout(timeout, client.request(req)) + .await + .map_err(|_| anyhow::anyhow!("Telemetry crash report timed out"))??; Ok(()) } diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index 46bc35a720..b25381a3d5 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -46,6 +46,7 @@ libdd-tinybytes = { version = "1.1.1", path = "../libdd-tinybytes", features = [ "bytes_string", "serialization", ] } +web-time = "1" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { version = "1.23", features = ["time", "test-util"], default-features = false } diff --git a/libdd-data-pipeline/src/otlp/metrics.rs b/libdd-data-pipeline/src/otlp/metrics.rs index 46104b12f1..2a81293579 100644 --- a/libdd-data-pipeline/src/otlp/metrics.rs +++ b/libdd-data-pipeline/src/otlp/metrics.rs @@ -16,8 +16,9 @@ use libdd_trace_utils::otlp_encoder::mapper::status_code; use libdd_trace_utils::otlp_encoder::OtlpResourceInfo; use serde_json::{json, Value}; use std::sync::{Arc, Mutex}; -use std::time::{Duration, SystemTime}; +use std::time::Duration; use tracing::error; +use web_time::SystemTime; const METRIC_NAME: &str = "traces.span.sdk.metrics.duration"; const NANOS_PER_SECOND: f64 = 1_000_000_000.0; diff --git a/libdd-data-pipeline/src/telemetry/metrics.rs b/libdd-data-pipeline/src/telemetry/metrics.rs index f610c7e619..c003dfbecd 100644 --- a/libdd-data-pipeline/src/telemetry/metrics.rs +++ b/libdd-data-pipeline/src/telemetry/metrics.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 //! Provides an abstraction layer to hold metrics that comes from 'SendDataResult'. +use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; use libdd_common::tag; use libdd_telemetry::data::metrics::{MetricNamespace, MetricType}; use libdd_telemetry::metrics::ContextKey; @@ -146,7 +147,9 @@ impl Index for Metrics { impl Metrics { /// Creates a new Metrics instance - pub fn new(worker: &TelemetryWorkerHandle) -> Self { + pub fn new( + worker: &TelemetryWorkerHandle, + ) -> Self { let mut keys = Vec::new(); for metric in METRICS { let key = worker.register_metric_context( @@ -171,6 +174,7 @@ impl Metrics { #[cfg(test)] mod tests { use super::*; + use libdd_capabilities_impl::NativeCapabilities; use libdd_telemetry::worker::TelemetryWorkerBuilder; #[cfg_attr(miri, ignore)] @@ -182,7 +186,7 @@ mod tests { "0.1".to_string(), "1.0".to_string(), ) - .spawn(); + .spawn::(); let metrics = Metrics::new(&worker); diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index 70bd51e32c..3ddeee5927 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -6,6 +6,7 @@ pub mod error; pub mod metrics; use crate::telemetry::error::TelemetryError; use crate::telemetry::metrics::Metrics; +use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; use libdd_common::tag::Tag; use libdd_telemetry::worker::{ LifecycleAction, TelemetryActions, TelemetryWorker, TelemetryWorkerBuilder, @@ -120,13 +121,28 @@ impl TelemetryClientBuilder { } /// Builds the telemetry client. - pub fn build(self) -> (TelemetryClient, TelemetryWorker) { - #[allow(clippy::unwrap_used)] + /// + /// `C` is the capability bundle (`NativeCapabilities` on native, `WasmCapabilities` on wasm). + pub fn build( + self, + ) -> Result<(TelemetryClient, TelemetryWorker), TelemetryError> { + let service_name = self + .service_name + .ok_or_else(|| TelemetryError::Builder("service_name is required".into()))?; + let language = self + .language + .ok_or_else(|| TelemetryError::Builder("language is required".into()))?; + let language_version = self + .language_version + .ok_or_else(|| TelemetryError::Builder("language_version is required".into()))?; + let tracer_version = self + .tracer_version + .ok_or_else(|| TelemetryError::Builder("tracer_version is required".into()))?; let mut builder = TelemetryWorkerBuilder::new_fetch_host( - self.service_name.unwrap(), - self.language.unwrap(), - self.language_version.unwrap(), - self.tracer_version.unwrap(), + service_name, + language, + language_version, + tracer_version, ); builder.config = self.config; // Send only metrics and logs and drop lifecycle events @@ -138,23 +154,40 @@ impl TelemetryClientBuilder { builder.runtime_id = Some(id); } - let (worker_handle, worker) = builder.build_worker(None); + // No cancellation runtime handle: telemetry workers driven by SharedRuntime + // handle shutdown via WorkerHandle::stop, not the per-handle deadline path. + #[cfg(not(target_arch = "wasm32"))] + let (worker_handle, worker) = builder.build_worker::(None); + #[cfg(target_arch = "wasm32")] + let (worker_handle, worker) = builder.build_worker::(); - ( + Ok(( TelemetryClient { metrics: Metrics::new(&worker_handle), worker: worker_handle, }, worker, - ) + )) } } -/// Telemetry handle used to send metrics to the agent -#[derive(Debug)] -pub struct TelemetryClient { +/// Telemetry handle used to send metrics to the agent. +/// +/// `C` is the capability bundle (`NativeCapabilities` on native, `WasmCapabilities` on wasm). +pub struct TelemetryClient { metrics: Metrics, - worker: TelemetryWorkerHandle, + worker: TelemetryWorkerHandle, +} + +impl std::fmt::Debug + for TelemetryClient +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TelemetryClient") + .field("metrics", &self.metrics) + .field("worker", &self.worker) + .finish() + } } /// Telemetry describing the sending of a trace payload @@ -240,7 +273,7 @@ impl SendPayloadTelemetry { } } -impl TelemetryClient { +impl TelemetryClient { /// Sends metrics to the agent using a telemetry worker handle. /// /// # Arguments: @@ -321,12 +354,16 @@ impl TelemetryClient { Ok(()) } - /// Starts the client - pub async fn start(&self) { - _ = self - .worker - .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) - .await; + /// Starts the client. + /// + /// Sync-by-design: `Start` is dispatched via `try_send_msg` so the same + /// call site works from non-async constructors (e.g. wasm-bindgen's + /// `#[wasm_bindgen(constructor)]`, which cannot be async). The mailbox is + /// sized at `mpsc::channel(5000)` so a sync send is safe under normal load. + pub fn start(&self) -> Result<(), TelemetryError> { + self.worker + .try_send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))?; + Ok(()) } } @@ -337,6 +374,7 @@ mod tests { use httpmock::Method::POST; use httpmock::MockServer; use libdd_capabilities::HttpError; + use libdd_capabilities_impl::NativeCapabilities; use libdd_shared_runtime::{BlockingRuntime, ForkSafeRuntime, SharedRuntime, WorkerHandle}; use libdd_trace_utils::test_utils::poll_for_mock_hits; @@ -346,7 +384,10 @@ mod tests { use regex::Regex; use tokio::time::sleep; - fn get_test_client(url: &str, runtime: &ForkSafeRuntime) -> (TelemetryClient, WorkerHandle) { + fn get_test_client( + url: &str, + runtime: &ForkSafeRuntime, + ) -> (TelemetryClient, WorkerHandle) { let (client, worker) = TelemetryClientBuilder::default() .set_service_name("test_service") .set_service_version("test_version") @@ -358,7 +399,8 @@ mod tests { .set_url(url) .set_heartbeat(100) .set_debug_enabled(true) - .build(); + .build::() + .expect("TelemetryClientBuilder::build failed"); let handle = runtime .spawn_worker(worker, true) .expect("Failed to spawn worker"); @@ -411,7 +453,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); let _ = client.send(&data); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -442,7 +484,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); let _ = client.send(&data); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -473,7 +515,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); let _ = client.send(&data); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -504,7 +546,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); let _ = client.send(&data); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -535,7 +577,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); let _ = client.send(&data); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -566,7 +608,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); let _ = client.send(&data); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -597,7 +639,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); let _ = client.send(&data); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -628,7 +670,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); let _ = client.send(&data); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -658,7 +700,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); client.send_client_side_stats_drops(3, 5).unwrap(); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -689,7 +731,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); let _ = client.send(&data); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -834,7 +876,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); client .send(&SendPayloadTelemetry { requests_count: 1, @@ -867,7 +909,7 @@ mod tests { let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); client .send(&SendPayloadTelemetry { requests_count: 1, @@ -917,13 +959,14 @@ mod tests { .set_session_id("sess-e2e") .set_root_session_id("root-e2e") .set_parent_session_id("parent-e2e") - .build(); + .build::() + .expect("TelemetryClientBuilder::build failed"); let handle = shared_runtime .spawn_worker(worker, true) .expect("Failed to spawn worker"); shared_runtime .block_on(async { - client.start().await; + let _ = client.start(); client .send(&SendPayloadTelemetry { requests_count: 1, diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 9c2970a475..78493e9edd 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -5,14 +5,13 @@ use crate::agent_info::AgentInfoFetcher; use crate::agentless::config::{AgentlessTraceConfig, DEFAULT_AGENTLESS_TIMEOUT}; use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; use crate::otlp::{OtlpMetricsConfig, OtlpResourceInfo, OtlpTraceConfig}; -#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] +#[cfg(feature = "telemetry")] use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; use crate::trace_exporter::error::BuilderErrorKind; use crate::trace_exporter::log_writer::DEFAULT_LOG_MAX_LINE_SIZE; -#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] +#[cfg(feature = "telemetry")] use crate::trace_exporter::TelemetryConfig; -#[cfg(not(target_arch = "wasm32"))] use crate::trace_exporter::TraceExporterWorkers; use crate::trace_exporter::{ add_path, StatsComputationStatus, TelemetryInstrumentationSessions, TraceExporter, @@ -609,31 +608,23 @@ impl TraceExporterBuilder { })?, ) }; - // The handle is currently only tracked for shutdown on native; on wasm - // it is dropped here (the worker keeps running on the JS event loop - // until the page/module is torn down). - #[cfg(target_arch = "wasm32")] - drop(info_fetcher_handle); + // The handle is stored in TraceExporterWorkers on both native and wasm so + // that shutdown_async can stop the worker symmetrically on both targets. - #[allow(unused_mut)] let mut stats = StatsComputationStatus::Disabled; - #[cfg(not(target_arch = "wasm32"))] if let Some(bucket_size) = self.stats_bucket_size { stats = StatsComputationStatus::DisabledByAgent { bucket_size }; } - #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] + #[cfg(feature = "telemetry")] let (telemetry_client, telemetry_handle) = { let sessions = self.telemetry_instrumentation_sessions; + // Telemetry talks to the agent; disable it in agentless and log-export modes. let telemetry = self .telemetry - .filter(|_| { - // no agent endpoint to talk to, so we skip the - // telemetry worker - !(agentless_enabled || self.output_to_log) - }) - .map(|telemetry_config| { - let mut builder = TelemetryClientBuilder::default() + .filter(|_| !(agentless_enabled || self.output_to_log)) + .map(|telemetry_config| -> Result<_, TraceExporterError> { + let mut tb = TelemetryClientBuilder::default() .set_language(&self.language) .set_language_version(&self.language_version) .set_service_name(&self.service) @@ -644,30 +635,36 @@ impl TraceExporterBuilder { .set_url(base_url) .set_debug_enabled(telemetry_config.debug_enabled); if let Some(id) = telemetry_config.runtime_id { - builder = builder.set_runtime_id(&id); + tb = tb.set_runtime_id(&id); } if let Some(ref id) = sessions.session_id { - builder = builder.set_session_id(id); + tb = tb.set_session_id(id); } if let Some(ref id) = sessions.root_session_id { - builder = builder.set_root_session_id(id); + tb = tb.set_root_session_id(id); } if let Some(ref id) = sessions.parent_session_id { - builder = builder.set_parent_session_id(id); + tb = tb.set_parent_session_id(id); } - Ok(builder.build()) - }); + tb.build::().map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( + e.to_string(), + )) + }) + }) + .transpose()?; match telemetry { - Some(Ok((client_tel, worker))) => { + Some((client_tel, worker)) => { let handle = shared_runtime.spawn_worker(worker, false).map_err(|e| { TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( e.to_string(), )) })?; - client_tel.start().await; + if let Err(e) = client_tel.start() { + tracing::warn!("Failed to start telemetry: {e}"); + } (Some(client_tel), Some(handle)) } - Some(Err(e)) => return Err(e), None => (None, None), } }; @@ -718,9 +715,7 @@ impl TraceExporterBuilder { // OTLP metrics + stats bucket size: start the concentrator unconditionally (bypass the // agent gate) so `check_agent_info` cannot later disable stats. - #[allow(unused_mut)] let mut otlp_stats_enabled = false; - #[cfg(not(target_arch = "wasm32"))] if let (Some(metrics_config), Some(bucket_size)) = (otlp_metrics_config.clone(), self.stats_bucket_size) { @@ -733,7 +728,7 @@ impl TraceExporterBuilder { .collect(); let concentrator = Arc::new(Mutex::new(SpanConcentrator::new( bucket_size, - std::time::SystemTime::now(), + web_time::SystemTime::now(), span_kinds, self.peer_tags.clone(), None, @@ -816,11 +811,10 @@ impl TraceExporterBuilder { }, previous_info_state: arc_swap::ArcSwapOption::new(None), info_response_observer, - #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] + #[cfg(feature = "telemetry")] telemetry: telemetry_client, health_metrics_enabled: self.health_metrics_enabled, capabilities, - #[cfg(not(target_arch = "wasm32"))] workers: TraceExporterWorkers { info_fetcher: info_fetcher_handle, #[cfg(feature = "telemetry")] diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 52e42dcd67..6291c47d3f 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -57,6 +57,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Once}; use std::time::Duration; use std::{borrow::Borrow, str::FromStr}; +#[cfg(not(target_arch = "wasm32"))] use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{debug, error, warn}; @@ -186,7 +187,6 @@ fn add_path(url: &Uri, path: &str) -> Uri { pub use libdd_trace_utils::tracer_metadata::TracerMetadata; /// Handles for the background workers owned by a [`TraceExporter`]. -#[cfg(not(target_arch = "wasm32"))] #[derive(Debug)] pub(crate) struct TraceExporterWorkers { /// `None` when no background `/info` fetcher is started (agentless trace @@ -257,14 +257,12 @@ pub struct TraceExporter< common_stats_tags: Vec, client_computed_top_level: bool, client_side_stats: StatsComputationConfig, - #[cfg_attr(target_arch = "wasm32", allow(dead_code))] previous_info_state: ArcSwapOption, info_response_observer: ResponseObserver, - #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] - telemetry: Option, + #[cfg(feature = "telemetry")] + telemetry: Option>, health_metrics_enabled: bool, capabilities: C, - #[cfg(not(target_arch = "wasm32"))] workers: TraceExporterWorkers, agent_payload_response_version: Option, /// When set, traces are exported via OTLP HTTP/JSON instead of the Datadog agent. @@ -276,7 +274,6 @@ pub struct TraceExporter< /// When true, span stats are computed and exported as OTLP metrics. The concentrator is /// started at build time, so agent-driven stats (de)activation in `check_agent_info` is /// skipped. - #[cfg_attr(target_arch = "wasm32", allow(dead_code))] otlp_stats_enabled: bool, /// When `Some(max_line_size)`, traces are written as newline-delimited JSON /// through the [`LogWriterCapability`] (the Datadog Forwarder "log exporter" @@ -317,53 +314,66 @@ impl< /// # Errors /// Returns [`TraceExporterError::Shutdown(ShutdownError::TimedOut)`] if a timeout was /// given and elapsed before all workers finished. - #[cfg(not(target_arch = "wasm32"))] pub async fn shutdown_async(self, timeout: Option) -> Result<(), TraceExporterError> { - if let Some(timeout) = timeout { - match tokio::time::timeout(timeout, self.shutdown_workers()).await { - Ok(()) => Ok(()), - Err(_) => Err(TraceExporterError::Shutdown(ShutdownError::TimedOut( - timeout, - ))), - } - } else { + let Some(timeout) = timeout else { self.shutdown_workers().await; - Ok(()) + return Ok(()); + }; + // Runtime-agnostic timeout: race the shutdown work against a capability-driven + // sleep, same pattern as `worker::send_request` / `agent_info::fetcher`. + // `tokio::time::timeout` would tie us to a tokio reactor we don't have on wasm. + let sleeper = ::new(); + tokio::select! { + biased; + _ = self.shutdown_workers() => Ok(()), + _ = sleeper.sleep(timeout) => Err(TraceExporterError::Shutdown( + ShutdownError::TimedOut(timeout), + )), } } + #[cfg(not(target_arch = "wasm32"))] async fn shutdown_workers(self) { - #[cfg(not(target_arch = "wasm32"))] + let mut join_set = JoinSet::new(); + + if let StatsComputationStatus::Enabled { worker_handle, .. } = + &**self.client_side_stats.status.load() { - let mut join_set = JoinSet::new(); + let handle = worker_handle.clone(); + join_set.spawn(async move { handle.stop().await }); + } - // Extract the stats handle before moving other fields. - if let StatsComputationStatus::Enabled { worker_handle, .. } = - &**self.client_side_stats.status.load() - { - let handle = worker_handle.clone(); - join_set.spawn(async move { handle.stop().await }); - } + if let Some(info_fetcher) = self.workers.info_fetcher { + join_set.spawn(async move { info_fetcher.stop().await }); + } - if let Some(info_fetcher) = self.workers.info_fetcher { - join_set.spawn(async move { info_fetcher.stop().await }); - } + #[cfg(feature = "telemetry")] + if let Some(telemetry) = self.workers.telemetry { + join_set.spawn(async move { telemetry.stop().await }); + } - #[cfg(feature = "telemetry")] - if let Some(telemetry) = self.workers.telemetry { - join_set.spawn(async move { telemetry.stop().await }); + while let Some(result) = join_set.join_next().await { + if let Ok(Err(e)) = result { + error!("Worker failed to shutdown: {:?}", e); } + } + } - while let Some(result) = join_set.join_next().await { - if let Ok(Err(e)) = result { - error!("Worker failed to shutdown: {:?}", e); - } + // wasm32: no Send + no shared reactor, so stops run sequentially + #[cfg(target_arch = "wasm32")] + async fn shutdown_workers(self) { + if let Some(info_fetcher) = self.workers.info_fetcher { + if let Err(e) = info_fetcher.stop().await { + error!("Info fetcher failed to shutdown: {:?}", e); } } - // On wasm32 workers are no-ops, nothing to stop. - #[cfg(target_arch = "wasm32")] - let _ = self; + #[cfg(feature = "telemetry")] + if let Some(telemetry) = self.workers.telemetry { + if let Err(e) = telemetry.stop().await { + error!("Telemetry worker failed to shutdown: {:?}", e); + } + } } /// Send msgpack serialized traces to the agent. @@ -421,7 +431,6 @@ impl< Ok(res) } - #[cfg(not(target_arch = "wasm32"))] /// Check if agent info state has changed fn has_agent_info_state_changed(&self, agent_info: &Arc) -> bool { Some(agent_info.state_hash.as_str()) @@ -434,7 +443,6 @@ impl< /// Reconcile in-process stats state with the latest agent info. /// Async so the `Enabled` arm can await a stats-worker shutdown without `block_on`. - #[cfg(not(target_arch = "wasm32"))] async fn check_agent_info(&self) { let Some(agent_info) = agent_info::get_agent_info() else { return; @@ -492,18 +500,11 @@ impl< .store(Some(agent_info.state_hash.clone().into())) } - #[cfg(target_arch = "wasm32")] - async fn check_agent_info(&self) { - // No background workers on wasm — agent info is never fetched, stats are - // never computed. This is intentionally a no-op. - } - /// Reconcile `v1_active` with the agent's currently-advertised endpoints. Called only when /// V1 is configured and the agent info state has changed, so transitions are logged at most /// once per change. Note: `v1_active` can also transition `true → false` outside this path, /// via the fail-closed hook in `send_trace_chunks_inner` when the agent returns 404 on /// `/v1.0/traces` (the agent does not bump its state hash on 404). - #[cfg(not(target_arch = "wasm32"))] fn refresh_v1_active(&self, agent_info: &Arc) { let supports_v1 = agent_info .info @@ -728,7 +729,7 @@ impl< ) .await; - #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] + #[cfg(feature = "telemetry")] if let Some(telemetry) = &self.telemetry { if let Err(e) = telemetry.send(&SendPayloadTelemetry::from_retry_result( &result, @@ -799,7 +800,7 @@ impl< &self.client_side_stats.status, self.client_computed_top_level, &self.trace_filterer.load(), - #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] + #[cfg(feature = "telemetry")] self.telemetry.as_ref(), ); diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 81996006d6..4384b3a5d6 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -7,14 +7,11 @@ //! including starting/stopping stats workers, managing the span concentrator, //! and processing traces for stats collection. -#[cfg(not(target_arch = "wasm32"))] use super::add_path; use super::TracerMetadata; -#[cfg(not(target_arch = "wasm32"))] use crate::agent_info::schema::AgentInfo; use arc_swap::ArcSwap; use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; -#[cfg(not(target_arch = "wasm32"))] use libdd_common::Endpoint; use libdd_common::MutexExt; use libdd_shared_runtime::{SharedRuntime, WorkerHandle}; @@ -23,18 +20,16 @@ use libdd_trace_stats::span_concentrator::SpanConcentrator; use libdd_trace_stats::span_concentrator::{ SharedStatsComputationObfuscationConfig, StatsComputationObfuscationConfig, }; -#[cfg(not(target_arch = "wasm32"))] use libdd_trace_stats::stats_exporter::{StatsExporter, StatsMetadata}; use libdd_trace_utils::trace_filter::TraceFilterer; use std::sync::{Arc, Mutex}; use std::time::Duration; -#[cfg(not(target_arch = "wasm32"))] use tracing::{debug, error}; +// std::time::SystemTime::now() panics on wasm32. +use web_time::SystemTime; -#[cfg(not(target_arch = "wasm32"))] pub(crate) const DEFAULT_STATS_ELIGIBLE_SPAN_KINDS: [&str; 4] = ["client", "server", "producer", "consumer"]; -#[cfg(not(target_arch = "wasm32"))] pub(crate) const STATS_ENDPOINT: &str = "/v0.6/stats"; /// The maximum obfuscation version this tracer supports. @@ -43,7 +38,6 @@ pub(crate) const SUPPORTED_OBFUSCATION_VERSION: u32 = 1; #[cfg(feature = "stats-obfuscation")] pub(crate) const SUPPORTED_OBFUSCATION_VERSION_STR: &str = "1"; -#[cfg(not(target_arch = "wasm32"))] /// Context struct that groups immutable parameters used by stats functions pub(crate) struct StatsContext<'a, R: SharedRuntime> { pub metadata: &'a TracerMetadata, @@ -52,7 +46,6 @@ pub(crate) struct StatsContext<'a, R: SharedRuntime> { } #[derive(Debug)] -#[cfg_attr(target_arch = "wasm32", allow(dead_code))] pub(crate) enum StatsComputationStatus { /// Client-side stats has been disabled by the tracer Disabled, @@ -68,7 +61,6 @@ pub(crate) enum StatsComputationStatus { } #[derive(Debug)] -#[cfg_attr(target_arch = "wasm32", allow(dead_code))] pub(crate) struct StatsComputationConfig { pub(crate) status: ArcSwap, #[cfg(feature = "stats-obfuscation")] @@ -84,7 +76,6 @@ pub(crate) struct StatsComputationConfig { /// This requires: /// - `client_drop_p0s` to be enabled on the agent, /// - the `/v0.6/stats` endpoint to be advertised by the agent. -#[cfg(not(target_arch = "wasm32"))] fn is_stats_computation_supported(agent_info: &AgentInfo) -> bool { agent_info.info.client_drop_p0s.is_some_and(|v| v) && agent_info @@ -103,7 +94,6 @@ fn is_obfuscation_active(agent_info: &AgentInfo) -> bool { .is_some_and(|v| v >= 1 && v <= SUPPORTED_OBFUSCATION_VERSION) } -#[cfg(not(target_arch = "wasm32"))] /// Get span kinds for stats computation with default fallback fn get_span_kinds_for_stats(agent_info: &Arc) -> Vec { agent_info @@ -113,7 +103,6 @@ fn get_span_kinds_for_stats(agent_info: &Arc) -> Vec { .unwrap_or_else(|| DEFAULT_STATS_ELIGIBLE_SPAN_KINDS.map(String::from).to_vec()) } -#[cfg(not(target_arch = "wasm32"))] /// Start the stats exporter and enable stats computation /// /// Should only be used if the agent enabled stats computation @@ -132,7 +121,7 @@ pub(crate) fn start_stats_computation< { let stats_concentrator = Arc::new(Mutex::new(SpanConcentrator::new( bucket_size, - std::time::SystemTime::now(), + SystemTime::now(), span_kinds, peer_tags, None, @@ -144,7 +133,6 @@ pub(crate) fn start_stats_computation< Ok(()) } -#[cfg(not(target_arch = "wasm32"))] /// Create stats exporter and worker, start the worker, and update the state fn create_and_start_stats_worker< C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, @@ -186,7 +174,6 @@ fn create_and_start_stats_worker< Ok(()) } -#[cfg(not(target_arch = "wasm32"))] /// Transition from `Enabled` to `DisabledByAgent`, awaiting the stats worker shutdown. pub(crate) async fn stop_stats_computation(client_side_stats: &ArcSwap) { // load_full() avoids holding an ArcSwap Guard (!Send) across .await. @@ -207,7 +194,6 @@ pub(crate) async fn stop_stats_computation(client_side_stats: &ArcSwap, client_side_stats: &StatsComputationConfig, @@ -270,7 +255,6 @@ fn update_obfuscation_config( } } -#[cfg(not(target_arch = "wasm32"))] pub(crate) async fn handle_stats_enabled( agent_info: &Arc, stats_concentrator: &Arc>, @@ -309,15 +293,20 @@ fn add_spans_to_stats( /// /// If a telemetry client is provided and stats are enabled, dropped P0 counts /// will be sent to telemetry. -pub(crate) fn process_traces_for_stats( +pub(crate) fn process_traces_for_stats< + T: libdd_trace_utils::span::TraceData, + #[cfg(feature = "telemetry")] C: libdd_capabilities::HttpClientCapability + + libdd_capabilities::SleepCapability + + libdd_capabilities::MaybeSend + + Sync + + 'static, +>( traces: &mut Vec>>, header_tags: &mut libdd_trace_utils::trace_utils::TracerHeaderTags, client_side_stats: &ArcSwap, client_computed_top_level: bool, trace_filterer: &TraceFilterer, - #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] telemetry: Option< - &crate::telemetry::TelemetryClient, - >, + #[cfg(feature = "telemetry")] telemetry: Option<&crate::telemetry::TelemetryClient>, ) { let status = client_side_stats.load(); if let StatsComputationStatus::Enabled { @@ -344,7 +333,7 @@ pub(crate) fn process_traces_for_stats( header_tags.dropped_p0_spans = dropped_p0_stats.dropped_p0_spans; // Send dropped P0 stats directly to telemetry if available - #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] + #[cfg(feature = "telemetry")] if let Some(telemetry_client) = telemetry { if let Err(e) = telemetry_client.send_client_side_stats_drops( dropped_p0_stats.dropped_p0_traces, diff --git a/libdd-telemetry-ffi/Cargo.toml b/libdd-telemetry-ffi/Cargo.toml index 2c00428880..38ff6bf0a0 100644 --- a/libdd-telemetry-ffi/Cargo.toml +++ b/libdd-telemetry-ffi/Cargo.toml @@ -23,6 +23,7 @@ regex-lite = ["libdd-common-ffi/regex-lite"] build_common = { path = "../build-common" } [dependencies] +libdd-capabilities-impl = { path = "../libdd-capabilities-impl" } libdd-telemetry = { path = "../libdd-telemetry" } libdd-common = { path = "../libdd-common" } libdd-common-ffi = { path = "../libdd-common-ffi", default-features = false } diff --git a/libdd-telemetry-ffi/src/builder.rs b/libdd-telemetry-ffi/src/builder.rs index d8d132b978..ff29b8ce7d 100644 --- a/libdd-telemetry-ffi/src/builder.rs +++ b/libdd-telemetry-ffi/src/builder.rs @@ -2,13 +2,18 @@ // SPDX-License-Identifier: Apache-2.0 use ffi::slice::AsBytes; +use libdd_capabilities_impl::NativeCapabilities; use libdd_common_ffi as ffi; use libdd_telemetry::{ config, data, - worker::{TelemetryWorkerBuilder, TelemetryWorkerFlavor, TelemetryWorkerHandle}, + worker::{TelemetryWorkerBuilder, TelemetryWorkerFlavor}, }; use std::ptr::NonNull; +/// FFI-facing alias: the C ABI surface is native-only, so the worker handle is +/// always pinned to [`NativeCapabilities`]. +type TelemetryWorkerHandle = libdd_telemetry::worker::TelemetryWorkerHandle; + use ffi::MaybeError; #[cfg(not(feature = "expanded_builder_macros"))] @@ -132,7 +137,7 @@ pub unsafe extern "C" fn ddog_telemetry_builder_run( ) -> MaybeError { out_handle .as_ptr() - .write(Box::new(crate::try_c!(builder.run()))); + .write(Box::new(crate::try_c!(builder.run::()))); MaybeError::None } @@ -150,7 +155,7 @@ pub unsafe extern "C" fn ddog_telemetry_builder_run_metric_logs( builder.flavor = TelemetryWorkerFlavor::MetricsLogs; out_handle .as_ptr() - .write(Box::new(crate::try_c!(builder.run()))); + .write(Box::new(crate::try_c!(builder.run::()))); MaybeError::None } diff --git a/libdd-telemetry-ffi/src/lib.rs b/libdd-telemetry-ffi/src/lib.rs index 3f0bda6a2f..a9b7abbed3 100644 --- a/libdd-telemetry-ffi/src/lib.rs +++ b/libdd-telemetry-ffi/src/lib.rs @@ -111,16 +111,21 @@ mod tests { use crate::{builder::*, worker_handle::*}; use ffi::tags::{ddog_Vec_Tag_new, ddog_Vec_Tag_push, PushTagResult}; use ffi::MaybeError; + use libdd_capabilities_impl::NativeCapabilities; use libdd_common_ffi as ffi; use libdd_telemetry::{ data::{ metrics::{MetricNamespace, MetricType}, LogLevel, }, - worker::{TelemetryWorkerBuilder, TelemetryWorkerHandle}, + worker::TelemetryWorkerBuilder, }; use std::{mem::MaybeUninit, ptr::NonNull}; + /// Test-side alias: matches the FFI surface, which pins `C` to + /// [`NativeCapabilities`]. + type TelemetryWorkerHandle = libdd_telemetry::worker::TelemetryWorkerHandle; + /// Spins up a worker backed by a file:// endpoint, returns (handle, temp_file). /// The caller is responsible for stopping the worker and reading the file. unsafe fn start_file_backed_worker() -> (Box, tempfile::NamedTempFile) { diff --git a/libdd-telemetry-ffi/src/worker_handle.rs b/libdd-telemetry-ffi/src/worker_handle.rs index 16380a971c..b3eb93be3e 100644 --- a/libdd-telemetry-ffi/src/worker_handle.rs +++ b/libdd-telemetry-ffi/src/worker_handle.rs @@ -4,14 +4,18 @@ use ffi::slice::AsBytes; use ffi::MaybeError; use function_name::named; +use libdd_capabilities_impl::NativeCapabilities; use libdd_common::tag::Tag; use libdd_common_ffi as ffi; use libdd_telemetry::{ data::metrics::{MetricNamespace, MetricType}, metrics::ContextKey, - worker::TelemetryWorkerHandle, }; +/// FFI-facing alias: the C ABI surface is native-only, so the worker handle is +/// always pinned to [`NativeCapabilities`]. +type TelemetryWorkerHandle = libdd_telemetry::worker::TelemetryWorkerHandle; + #[allow(clippy::missing_safety_doc)] #[no_mangle] pub unsafe extern "C" fn ddog_telemetry_handle_add_dependency( diff --git a/libdd-telemetry/Cargo.toml b/libdd-telemetry/Cargo.toml index 94ec5ff03f..3e31ed2274 100644 --- a/libdd-telemetry/Cargo.toml +++ b/libdd-telemetry/Cargo.toml @@ -22,7 +22,6 @@ anyhow = { version = "1.0" } async-trait = "0.1" base64 = "0.22" futures = { version = "0.3", default-features = false } -http-body-util = "0.1" http = "1" serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } @@ -32,13 +31,19 @@ tracing = { version = "0.1", default-features = false } uuid = { version = "1.3", features = ["v4"] } hashbrown = "0.15" bytes = "1.4" +libdd-capabilities = { version = "2.0.0", path = "../libdd-capabilities" } libdd-common = { version = "5.0.0", path = "../libdd-common", default-features = false } libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime", default-features = false } libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" } +web-time = "1" # cross-platform Instant/SystemTime; shims via js-sys on wasm32 [target.'cfg(not(target_arch = "wasm32"))'.dependencies] sys-info = { version = "0.9.0" } +[target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom = { version = "0.2", features = ["js"] } +uuid = { version = "1", features = ["js"] } + [target."cfg(unix)".dependencies] libc = "0.2.176" @@ -48,5 +53,6 @@ winver = "1.0.0" [dev-dependencies] tracing-subscriber = "0.3.22" tokio = { version = "1.23", features = ["sync", "io-util", "rt-multi-thread"] } +libdd-capabilities-impl = { path = "../libdd-capabilities-impl" } libdd-common = { path = "../libdd-common", features = ["test-utils"] } httpmock = "0.8.0-alpha.1" diff --git a/libdd-telemetry/examples/tm-metrics-worker-test.rs b/libdd-telemetry/examples/tm-metrics-worker-test.rs index d896d294fc..78b56c2e96 100644 --- a/libdd-telemetry/examples/tm-metrics-worker-test.rs +++ b/libdd-telemetry/examples/tm-metrics-worker-test.rs @@ -45,7 +45,7 @@ fn main() -> Result<(), Box> { builder.config.debug_enabled = true; builder.flavor = worker::TelemetryWorkerFlavor::MetricsLogs; - let handle = builder.run()?; + let handle = builder.run::()?; let ping_metric = handle.register_metric_context( "test_telemetry.ping".into(), diff --git a/libdd-telemetry/examples/tm-ping.rs b/libdd-telemetry/examples/tm-ping.rs index de0d7d6d26..fc316491d0 100644 --- a/libdd-telemetry/examples/tm-ping.rs +++ b/libdd-telemetry/examples/tm-ping.rs @@ -6,13 +6,18 @@ use std::{ time::SystemTime, }; +use bytes::Bytes; use http::header::CONTENT_TYPE; +use libdd_capabilities::{HttpClientCapability, SleepCapability}; +use libdd_capabilities_impl::NativeCapabilities; use libdd_telemetry::{ build_host, config::Config, data::{self, AppStarted, Application, Telemetry}, worker::http_client::request_builder, }; +use std::time::Duration; +use tokio::select; fn build_app_started_payload() -> AppStarted { AppStarted { @@ -46,13 +51,26 @@ fn build_request<'a>( pub async fn push_telemetry(telemetry: &Telemetry<'_>) -> anyhow::Result<()> { let config = Config::from_env(); - let client = libdd_telemetry::worker::http_client::from_config(&config); + let timeout = Duration::from_millis( + config + .endpoint() + .map(|e| e.timeout_ms) + .unwrap_or(libdd_common::Endpoint::DEFAULT_TIMEOUT), + ); + let client = NativeCapabilities::new_client(); + let sleeper = ::new(); let req = request_builder(&config)? .method(http::Method::POST) .header(CONTENT_TYPE, libdd_common::header::APPLICATION_JSON) - .body(serde_json::to_string(telemetry)?.into())?; + .body(Bytes::from(serde_json::to_vec(telemetry)?))?; - let resp = client.request(req).await?; + let resp = select! { + biased; + result = client.request(req) => result?, + _ = sleeper.sleep(timeout) => { + return Err(anyhow::anyhow!("Telemetry request timed out")); + } + }; if !resp.status().is_success() { Err(anyhow::Error::msg(format!( diff --git a/libdd-telemetry/examples/tm-send-sketch.rs b/libdd-telemetry/examples/tm-send-sketch.rs index bfd54e119c..48f15ce7cf 100644 --- a/libdd-telemetry/examples/tm-send-sketch.rs +++ b/libdd-telemetry/examples/tm-send-sketch.rs @@ -8,13 +8,18 @@ use std::{ time::SystemTime, }; +use bytes::Bytes; use http::header::CONTENT_TYPE; +use libdd_capabilities::{HttpClientCapability, SleepCapability}; +use libdd_capabilities_impl::NativeCapabilities; use libdd_telemetry::{ build_host, config::Config, data::{self, metrics::Distribution, Application, Telemetry}, worker::http_client::request_builder, }; +use std::time::Duration; +use tokio::select; fn seq_id() -> u64 { static SEQ_ID: AtomicU64 = AtomicU64::new(1); @@ -39,13 +44,26 @@ fn build_request<'a>( } pub async fn push_telemetry(config: &Config, telemetry: &Telemetry<'_>) -> anyhow::Result<()> { - let client = libdd_telemetry::worker::http_client::from_config(config); + let timeout = Duration::from_millis( + config + .endpoint() + .map(|e| e.timeout_ms) + .unwrap_or(libdd_common::Endpoint::DEFAULT_TIMEOUT), + ); + let client = NativeCapabilities::new_client(); + let sleeper = ::new(); let req = request_builder(config)? .method(http::Method::POST) .header(CONTENT_TYPE, libdd_common::header::APPLICATION_JSON) - .body(serde_json::to_string(telemetry)?.into())?; + .body(Bytes::from(serde_json::to_vec(telemetry)?))?; - let resp = client.request(req).await?; + let resp = select! { + biased; + result = client.request(req) => result?, + _ = sleeper.sleep(timeout) => { + return Err(anyhow::anyhow!("Telemetry request timed out")); + } + }; if !resp.status().is_success() { Err(anyhow::Error::msg(format!( diff --git a/libdd-telemetry/examples/tm-worker-test.rs b/libdd-telemetry/examples/tm-worker-test.rs index 5825826ad3..950de8e7f7 100644 --- a/libdd-telemetry/examples/tm-worker-test.rs +++ b/libdd-telemetry/examples/tm-worker-test.rs @@ -46,7 +46,7 @@ fn main() -> Result<(), Box> { .unwrap(); builder.config.telemetry_heartbeat_interval = Duration::from_secs(1); - let handle = builder.run()?; + let handle = builder.run::()?; let ping_metric = handle.register_metric_context( "test_telemetry.ping".into(), diff --git a/libdd-telemetry/src/metrics.rs b/libdd-telemetry/src/metrics.rs index 1899f7aabd..c300a1ced9 100644 --- a/libdd-telemetry/src/metrics.rs +++ b/libdd-telemetry/src/metrics.rs @@ -4,8 +4,10 @@ use std::{ collections::HashMap, sync::{Arc, Mutex, MutexGuard}, - time, }; +// `time::SystemTime::UNIX_EPOCH.elapsed()` panics on wasm32-unknown-unknown +// (the host has no `std::time` backend); `web_time` proxies to `Date.now()`. +use web_time as time; use libdd_common::tag::Tag; use libdd_ddsketch::DDSketch; diff --git a/libdd-telemetry/src/worker/http_client.rs b/libdd-telemetry/src/worker/http_client.rs index 5d0274425d..997f236e17 100644 --- a/libdd-telemetry/src/worker/http_client.rs +++ b/libdd-telemetry/src/worker/http_client.rs @@ -1,15 +1,7 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use http_body_util::BodyExt; -use libdd_common::{http_common, HttpRequestBuilder}; -use std::{ - fs::OpenOptions, - future::Future, - io::Write, - pin::Pin, - sync::{Arc, Mutex}, -}; +use libdd_common::HttpRequestBuilder; use crate::config::Config; use tracing::{debug, error}; @@ -56,13 +48,6 @@ pub(crate) fn add_instrumentation_session_headers( builder } -pub type ResponseFuture = - Pin> + Send>>; - -pub trait HttpClient { - fn request(&self, req: http_common::HttpRequest) -> ResponseFuture; -} - pub fn request_builder(c: &Config) -> anyhow::Result { match &c.endpoint { Some(e) => { @@ -74,6 +59,11 @@ pub fn request_builder(c: &Config) -> anyhow::Result { ); let mut builder = e.to_request_builder(concat!("telemetry/", env!("CARGO_PKG_VERSION"))); + // Telemetry sends are heartbeat-paced (tens of seconds apart), longer + // than the agent's HTTP keep-alive, so pooled connections are typically + // half-closed by the next send and EOF on reuse. `Connection: close` + // forces a fresh socket per request. + builder = Ok(builder?.header(http::header::CONNECTION, "close")); if c.debug_enabled { debug!( telemetry.debug_enabled = true, @@ -91,120 +81,3 @@ pub fn request_builder(c: &Config) -> anyhow::Result { } } } - -pub fn from_config(c: &Config) -> Box { - match &c.endpoint { - Some(e) if e.url.scheme_str() == Some("file") => { - #[allow(clippy::expect_used)] - let file_path = libdd_common::decode_uri_path_in_authority(&e.url) - .expect("file urls should always have been encoded in authority"); - debug!( - file.path = ?file_path, - "Using file-based mock telemetry client" - ); - return Box::new(MockClient { - #[allow(clippy::expect_used)] - file: Arc::new(Mutex::new(Box::new( - OpenOptions::new() - .create(true) - .append(true) - .open(file_path.as_path()) - .expect("Couldn't open mock client file"), - ))), - }); - } - Some(e) => { - debug!( - endpoint.url = %e.url, - endpoint.timeout_ms = e.timeout_ms, - "Using HTTP telemetry client" - ); - } - None => { - debug!( - endpoint = "default", - "No telemetry endpoint configured, using default HTTP client" - ); - } - }; - Box::new(HyperClient { - inner: http_common::new_client_periodic(), - }) -} - -pub struct HyperClient { - inner: libdd_common::HttpClient, -} - -impl HttpClient for HyperClient { - fn request(&self, req: http_common::HttpRequest) -> ResponseFuture { - let resp = self.inner.request(req); - Box::pin(async move { - match resp.await { - Ok(response) => Ok(http_common::into_response(response)), - Err(e) => Err(http_common::Error::Client(e.into())), - } - }) - } -} - -#[derive(Clone)] -pub struct MockClient { - file: Arc>>, -} - -impl HttpClient for MockClient { - fn request(&self, req: http_common::HttpRequest) -> ResponseFuture { - let s = self.clone(); - Box::pin(async move { - debug!("MockClient writing request to file"); - let mut body = req.collect().await?.to_bytes().to_vec(); - body.push(b'\n'); - - { - #[allow(clippy::expect_used)] - let mut writer = s.file.lock().expect("mutex poisoned"); - - match writer.write_all(body.as_ref()) { - Ok(()) => debug!( - file.bytes_written = body.len(), - "Successfully wrote payload to mock file" - ), - Err(e) => { - error!( - error = %e, - "Failed to write to mock file" - ); - return Err(http_common::Error::from(e)); - } - } - } - - debug!(http.status = 202, "MockClient returning success response"); - http_common::empty_response(http::Response::builder().status(202)) - }) - } -} - -#[cfg(test)] -mod tests { - use libdd_common::HttpRequestBuilder; - - use super::*; - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_mock_client() { - let output: Vec = Vec::new(); - let c = MockClient { - file: Arc::new(Mutex::new(Box::new(output))), - }; - c.request( - HttpRequestBuilder::new() - .body(http_common::Body::from("hello world\n")) - .unwrap(), - ) - .await - .unwrap(); - } -} diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index e6549f2b94..90ab175c16 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -12,10 +12,13 @@ use crate::{ }; use async_trait::async_trait; -use libdd_common::{http_common, tag::Tag}; +use bytes::Bytes; +use libdd_capabilities::{HttpClientCapability, HttpError, MaybeSend, SleepCapability}; +use libdd_common::tag::Tag; use libdd_shared_runtime::Worker; use std::iter::Sum; +use std::marker::PhantomData; use std::ops::Add; use std::{ collections::hash_map::DefaultHasher, @@ -23,11 +26,18 @@ use std::{ ops::ControlFlow, sync::{ atomic::{AtomicU64, Ordering}, - Arc, Condvar, Mutex, + Arc, }, - time, }; use std::{collections::HashSet, fmt::Debug, time::Duration}; +// `web_time` re-exports `std::time::Instant`/`SystemTime` on native and +// provides Performance.now()/Date.now()-backed shims on wasm32. We use +// `time::Instant` and `time::SystemTime` through this module-local alias so +// the wasm runtime doesn't hit `time not implemented on this platform`. +use web_time as time; + +#[cfg(not(target_arch = "wasm32"))] +use std::sync::{Condvar, Mutex}; use crate::metrics::MetricBucketStats; use futures::{ @@ -36,11 +46,9 @@ use futures::{ }; use http::{header, HeaderValue}; use serde::{Deserialize, Serialize}; -use tokio::{ - runtime::{self, Handle}, - sync::mpsc, - task::JoinHandle, -}; +use tokio::sync::mpsc; +#[cfg(not(target_arch = "wasm32"))] +use tokio::{runtime, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::debug; @@ -48,8 +56,7 @@ const CONTINUE: ControlFlow<()> = ControlFlow::Continue(()); const BREAK: ControlFlow<()> = ControlFlow::Break(()); fn time_now() -> f64 { - #[allow(clippy::unwrap_used)] - std::time::SystemTime::UNIX_EPOCH + time::SystemTime::UNIX_EPOCH .elapsed() .unwrap_or_default() .as_secs_f64() @@ -131,21 +138,26 @@ struct TelemetryWorkerData { app: Application, } -pub struct TelemetryWorker { +/// `C` is the capability bundle. Leaf crates pin it to a concrete type +/// (`NativeCapabilities` on native, `WasmCapabilities` on wasm). +pub struct TelemetryWorker { flavor: TelemetryWorkerFlavor, config: Config, mailbox: mpsc::Receiver, cancellation_token: CancellationToken, seq_id: AtomicU64, runtime_id: String, - client: Box, + capabilities: C, metrics_flush_interval: Duration, deadlines: scheduler::Scheduler, data: TelemetryWorkerData, next_action: Option, stopped: bool, } -impl Debug for TelemetryWorker { + +impl Debug + for TelemetryWorker +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("TelemetryWorker") .field("flavor", &self.flavor) @@ -161,8 +173,11 @@ impl Debug for TelemetryWorker { } } -#[async_trait] -impl Worker for TelemetryWorker { +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Worker + for TelemetryWorker +{ async fn trigger(&mut self) { if self.next_action.is_some() { // An action is already available and hasn't been executed @@ -290,25 +305,24 @@ mod serialize { } } -impl TelemetryWorker { +impl TelemetryWorker { fn log_err(&self, err: &anyhow::Error) { telemetry_worker_log!(self, ERROR, "{}", err); } async fn recv_next_action(&mut self) -> TelemetryActions { let action = if let Some((deadline, deadline_action)) = self.deadlines.next_deadline() { + let deadline_action = *deadline_action; // If deadline passed, directly return associated action - if deadline - .checked_duration_since(time::Instant::now()) - .is_none() - { - return TelemetryActions::Lifecycle(*deadline_action); + let Some(remaining) = deadline.checked_duration_since(time::Instant::now()) else { + return TelemetryActions::Lifecycle(deadline_action); }; - // Otherwise run it in a timeout against the mailbox - match tokio::time::timeout_at(deadline.into(), self.mailbox.recv()).await { - Ok(mailbox_action) => mailbox_action, - Err(_) => Some(TelemetryActions::Lifecycle(*deadline_action)), + let sleeper = ::new(); + tokio::select! { + biased; + mailbox_action = self.mailbox.recv() => mailbox_action, + _ = sleeper.sleep(remaining) => Some(TelemetryActions::Lifecycle(deadline_action)), } } else { self.mailbox.recv().await @@ -765,7 +779,7 @@ impl TelemetryWorker { Ok(()) } - fn build_request(&self, payload: &data::Payload) -> anyhow::Result { + fn build_request(&self, payload: &data::Payload) -> anyhow::Result> { let seq_id = self.next_seq_id(); let tel = Telemetry { api_version: data::ApiVersion::V2, @@ -808,19 +822,20 @@ impl TelemetryWorker { self.config.root_session_id.as_deref(), ); - let body = http_common::Body::from(serialize::serialize(&tel)?); + let body = Bytes::from(serialize::serialize(&tel)?); Ok(req.body(body)?) } async fn send_request( &self, - req: http_common::HttpRequest, - ) -> Result { + req: http::Request, + ) -> Result, HttpError> { let timeout_ms = if let Some(endpoint) = self.config.endpoint.as_ref() { endpoint.timeout_ms } else { libdd_common::Endpoint::DEFAULT_TIMEOUT }; + let timeout = time::Duration::from_millis(timeout_ms); debug!( worker.runtime_id = %self.runtime_id, @@ -828,32 +843,24 @@ impl TelemetryWorker { "Sending HTTP request" ); + let sleeper = ::new(); tokio::select! { _ = self.cancellation_token.cancelled() => { debug!( worker.runtime_id = %self.runtime_id, "Telemetry request cancelled" ); - Err(http_common::Error::Other(anyhow::anyhow!("Request cancelled"))) + Err(HttpError::Other(anyhow::anyhow!("Request cancelled"))) }, - _ = tokio::time::sleep(time::Duration::from_millis(timeout_ms)) => { + _ = sleeper.sleep(timeout) => { debug!( worker.runtime_id = %self.runtime_id, http.timeout_ms = timeout_ms, "Telemetry request timed out" ); - Err(http_common::Error::Other(anyhow::anyhow!("Request timed out"))) + Err(HttpError::Other(anyhow::anyhow!("Request timed out"))) }, - r = self.client.request(req) => { - match r { - Ok(resp) => { - Ok(resp) - } - Err(e) => { - Err(e) - }, - } - } + r = self.capabilities.request(req) => r, } } @@ -925,12 +932,14 @@ impl TelemetryWorker { } } +#[cfg(not(target_arch = "wasm32"))] #[derive(Debug)] struct InnerTelemetryShutdown { is_shutdown: Mutex, condvar: Condvar, } +#[cfg(not(target_arch = "wasm32"))] impl InnerTelemetryShutdown { fn wait_for_shutdown(&self) { drop( @@ -950,26 +959,70 @@ impl InnerTelemetryShutdown { } } -#[derive(Clone, Debug)] /// TelemetryWorkerHandle is a handle which allows interactions with the telemetry worker. /// The handle is safe to use across threads. /// /// The worker won't send data to the agent until you call `TelemetryWorkerHandle::send_start` /// /// To stop the worker, call `TelemetryWorkerHandle::send_stop` which trigger flush asynchronously -/// then `TelemetryWorkerHandle::wait_for_shutdown` -pub struct TelemetryWorkerHandle { +/// then `TelemetryWorkerHandle::wait_for_shutdown` (native only — wasm callers rely on the +/// SharedRuntime worker JoinHandle instead). +pub struct TelemetryWorkerHandle< + C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, +> { sender: mpsc::Sender, + #[cfg(not(target_arch = "wasm32"))] shutdown: Arc, cancellation_token: CancellationToken, - // Used to spawn cancellation tasks. Should be None when running as a SharedRuntime worker, - // since the runtime is not guaranteed to exist for the lifetime of the worker. + #[cfg(not(target_arch = "wasm32"))] runtime: Option, - contexts: MetricContexts, + _phantom: PhantomData C>, } -impl TelemetryWorkerHandle { +impl Clone + for TelemetryWorkerHandle +{ + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + #[cfg(not(target_arch = "wasm32"))] + shutdown: self.shutdown.clone(), + cancellation_token: self.cancellation_token.clone(), + #[cfg(not(target_arch = "wasm32"))] + runtime: self.runtime.clone(), + contexts: self.contexts.clone(), + _phantom: PhantomData, + } + } +} + +impl Debug + for TelemetryWorkerHandle +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TelemetryWorkerHandle") + .field("sender", &self.sender) + .field("cancellation_token", &self.cancellation_token) + .finish() + } +} + +#[cfg(not(target_arch = "wasm32"))] +fn schedule_deferred_cancel(runtime: Option<&runtime::Handle>, future: F) +where + F: core::future::Future + Send + 'static, +{ + let Some(rt) = runtime else { + tracing::error!("Cannot schedule cancellation deadline: no runtime handle available"); + return; + }; + rt.spawn(future); +} + +impl + TelemetryWorkerHandle +{ pub fn register_metric_context( &self, name: String, @@ -1021,19 +1074,22 @@ impl TelemetryWorkerHandle { .try_send(TelemetryActions::Lifecycle(LifecycleAction::Stop))?) } - fn cancel_requests_with_deadline(&self, deadline: time::Instant) { - let Some(runtime) = &self.runtime else { - tracing::error!("Cannot schedule cancellation deadline: no runtime handle available"); - return; - }; + /// Schedule a deferred `CancellationToken::cancel()` to fire after `deadline`. + #[cfg(not(target_arch = "wasm32"))] + pub fn cancel_requests_with_deadline(&self, deadline: time::Instant) { let token = self.cancellation_token.clone(); - let f = async move { - tokio::time::sleep_until(deadline.into()).await; - token.cancel() + let remaining = deadline.saturating_duration_since(time::Instant::now()); + let sleeper = ::new(); + let future = async move { + sleeper.sleep(remaining).await; + token.cancel(); }; - runtime.spawn(f); + schedule_deferred_cancel(self.runtime.as_ref(), future); } + /// Sync wrapper: schedule a cancellation deadline and block the current + /// thread until shutdown finishes. + #[cfg(not(target_arch = "wasm32"))] pub fn wait_for_shutdown_deadline(&self, deadline: time::Instant) { self.cancel_requests_with_deadline(deadline); self.wait_for_shutdown() @@ -1104,6 +1160,7 @@ impl TelemetryWorkerHandle { Ok(()) } + #[cfg(not(target_arch = "wasm32"))] pub fn wait_for_shutdown(&self) { self.shutdown.wait_for_shutdown(); } @@ -1196,15 +1253,12 @@ impl TelemetryWorkerBuilder { } /// Build the corresponding worker and its handle. - /// - /// The optional runtime handle is stored in the worker handle and should be the one used to run - /// the worker task cancellation deadlines. Pass `None` when the worker will be run via a - /// [`SharedRuntime`](libdd_shared_runtime::SharedRuntime). - pub fn build_worker( + pub fn build_worker( self, - tokio_runtime: Option, - ) -> (TelemetryWorkerHandle, TelemetryWorker) { + #[cfg(not(target_arch = "wasm32"))] tokio_runtime: Option, + ) -> (TelemetryWorkerHandle, TelemetryWorker) { let (tx, mailbox) = mpsc::channel(5000); + #[cfg(not(target_arch = "wasm32"))] let shutdown = Arc::new(InnerTelemetryShutdown { is_shutdown: Mutex::new(false), condvar: Condvar::new(), @@ -1214,12 +1268,11 @@ impl TelemetryWorkerBuilder { let config = self.config; let telemetry_heartbeat_interval = config.telemetry_heartbeat_interval; let telemetry_extended_heartbeat_interval = config.telemetry_extended_heartbeat_interval; - let client = http_client::from_config(&config); + let capabilities = C::new_client(); let metrics_flush_interval = telemetry_heartbeat_interval.min(MetricBuckets::METRICS_FLUSH_INTERVAL); - #[allow(clippy::unwrap_used)] let worker = TelemetryWorker { flavor: self.flavor, data: TelemetryWorkerData { @@ -1240,7 +1293,7 @@ impl TelemetryWorkerBuilder { runtime_id: self .runtime_id .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()), - client, + capabilities, metrics_flush_interval, deadlines: scheduler::Scheduler::new(vec![ (metrics_flush_interval, LifecycleAction::FlushMetricAggr), @@ -1258,34 +1311,41 @@ impl TelemetryWorkerBuilder { ( TelemetryWorkerHandle { sender: tx, + #[cfg(not(target_arch = "wasm32"))] shutdown, cancellation_token: token, + #[cfg(not(target_arch = "wasm32"))] runtime: tokio_runtime, - contexts, + _phantom: PhantomData, }, worker, ) } - /// Spawns a telemetry worker task in the current tokio runtime - /// The worker will capture a reference to the runtime and use it to run its tasks - pub fn spawn(self) -> (TelemetryWorkerHandle, JoinHandle<()>) { + /// Spawns a telemetry worker task in the current tokio runtime. + #[cfg(not(target_arch = "wasm32"))] + pub fn spawn( + self, + ) -> (TelemetryWorkerHandle, JoinHandle<()>) { let tokio_runtime = tokio::runtime::Handle::current(); - let (worker_handle, worker) = self.build_worker(Some(tokio_runtime.clone())); + let (worker_handle, worker) = self.build_worker::(Some(tokio_runtime.clone())); let join_handle = tokio_runtime.spawn(async move { worker.run_loop().await }); (worker_handle, join_handle) } - /// Spawns a telemetry worker in a new thread and returns a handle to interact with it - pub fn run(self) -> anyhow::Result { + /// Spawns a telemetry worker in a new thread and returns a handle to interact with it. + #[cfg(not(target_arch = "wasm32"))] + pub fn run( + self, + ) -> anyhow::Result> { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - let (handle, worker) = self.build_worker(Some(runtime.handle().clone())); + let (handle, worker) = self.build_worker::(Some(runtime.handle().clone())); let notify_shutdown = handle.shutdown.clone(); std::thread::spawn(move || { runtime.block_on(worker.run_loop()); @@ -1308,7 +1368,7 @@ mod tests { LifecycleAction, TelemetryActions, TelemetryWorker, TelemetryWorkerBuilder, TelemetryWorkerFlavor, TelemetryWorkerHandle, }; - use libdd_common::http_common; + use libdd_capabilities_impl::NativeCapabilities; use tokio::runtime::Runtime; fn is_send(_: T) {} @@ -1317,16 +1377,16 @@ mod tests { #[test] fn test_handle_sync_send() { #[allow(clippy::redundant_closure)] - let _ = |h: TelemetryWorkerHandle| is_send(h); + let _ = |h: TelemetryWorkerHandle| is_send(h); #[allow(clippy::redundant_closure)] - let _ = |h: TelemetryWorkerHandle| is_sync(h); + let _ = |h: TelemetryWorkerHandle| is_sync(h); } fn test_worker( session_id: Option, root_session_id: Option, parent_session_id: Option, - ) -> TelemetryWorker { + ) -> TelemetryWorker { let mut b = TelemetryWorkerBuilder::new( "h".into(), "svc".into(), @@ -1345,7 +1405,8 @@ mod tests { b.config.parent_session_id = parent_session_id; b.config.root_session_id = root_session_id; let rt = Runtime::new().unwrap(); - b.build_worker(Some(rt.handle().clone())).1 + b.build_worker::(Some(rt.handle().clone())) + .1 } #[test] @@ -1411,7 +1472,7 @@ mod tests { #[test] fn telemetry_http_omits_session_family_without_valid_session_id() { - let assert_no_session_headers = |req: &http_common::HttpRequest| { + let assert_no_session_headers = |req: &http::Request| { assert!(req.headers().get(DD_SESSION_ID).is_none()); assert!(req.headers().get(DD_ROOT_SESSION_ID).is_none()); assert!(req.headers().get(DD_PARENT_SESSION_ID).is_none()); @@ -1463,7 +1524,9 @@ mod tests { ); } - fn build_test_worker_with_flavor(flavor: TelemetryWorkerFlavor) -> TelemetryWorker { + fn build_test_worker_with_flavor( + flavor: TelemetryWorkerFlavor, + ) -> TelemetryWorker { let mut b = TelemetryWorkerBuilder::new( "h".into(), "svc".into(), @@ -1479,7 +1542,8 @@ mod tests { .unwrap(); b.runtime_id = Some("rid".into()); b.flavor = flavor; - b.build_worker(Some(tokio::runtime::Handle::current())).1 + b.build_worker::(Some(tokio::runtime::Handle::current())) + .1 } /// Every event with a delay must be scheduled on Start; otherwise it sits in @@ -1577,9 +1641,13 @@ mod tests { metrics::{MetricNamespace, MetricType}, Configuration, ConfigurationOrigin, Dependency, Endpoint, Integration, Log, LogLevel, }; + use libdd_capabilities_impl::NativeCapabilities; use libdd_shared_runtime::Worker; - fn build_test_worker() -> (TelemetryWorkerHandle, TelemetryWorker) { + fn build_test_worker() -> ( + TelemetryWorkerHandle, + TelemetryWorker, + ) { let builder = TelemetryWorkerBuilder::new( "hostname".to_string(), "test-service".to_string(), @@ -1588,7 +1656,7 @@ mod tests { "1.0.0".to_string(), ); // build_worker requires a tokio Handle; tests using this must be #[tokio::test] - builder.build_worker(Some(tokio::runtime::Handle::current())) + builder.build_worker::(Some(tokio::runtime::Handle::current())) } fn make_log(id: u64, message: &str) -> (LogIdentifier, Log) { @@ -1833,7 +1901,8 @@ mod tests { let runtime_handle = shared_runtime .block_on(async { tokio::runtime::Handle::current() }) .expect("runtime handle"); - let (telemetry_handle, worker) = builder.build_worker(Some(runtime_handle)); + let (telemetry_handle, worker) = + builder.build_worker::(Some(runtime_handle)); let _worker_handle = shared_runtime .spawn_worker(worker, false) diff --git a/libdd-telemetry/src/worker/scheduler.rs b/libdd-telemetry/src/worker/scheduler.rs index 4ccdd0a893..3c678d4b51 100644 --- a/libdd-telemetry/src/worker/scheduler.rs +++ b/libdd-telemetry/src/worker/scheduler.rs @@ -1,7 +1,8 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use std::time::{Duration, Instant}; +use std::time::Duration; +use web_time::Instant; #[derive(Debug)] pub struct Scheduler { diff --git a/libdd-trace-stats/Cargo.toml b/libdd-trace-stats/Cargo.toml index 09f913e6e8..83531c2501 100644 --- a/libdd-trace-stats/Cargo.toml +++ b/libdd-trace-stats/Cargo.toml @@ -29,6 +29,8 @@ tokio = { version = "1.23", features = ["macros", "time"], default-features = fa tokio-util = "0.7.11" tracing = { version = "0.1", default-features = false } async-trait = "0.1.85" +# Cross-platform `SystemTime`; `std::time::SystemTime::now()` panics on wasm32. +web-time = "1" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl", default-features = false } diff --git a/libdd-trace-stats/src/lib.rs b/libdd-trace-stats/src/lib.rs index 8e5793c8bb..855e8ce088 100644 --- a/libdd-trace-stats/src/lib.rs +++ b/libdd-trace-stats/src/lib.rs @@ -10,5 +10,4 @@ //! This crate provides utilities to compute stats from traces. pub mod span_concentrator; -#[cfg(not(target_arch = "wasm32"))] pub mod stats_exporter; diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index f1beb5eab9..37036e5d5a 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -2,8 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 //! This module implements the SpanConcentrator used to aggregate spans into stats use std::collections::HashMap; -use std::time::{self, Duration, SystemTime}; +use std::time::Duration; use tracing::debug; +// std::time::SystemTime panics on wasm32. +use web_time::{SystemTime, UNIX_EPOCH}; use libdd_trace_protobuf::pb; @@ -35,7 +37,7 @@ impl FlushableConcentrator for SpanConcentrator { /// Return a Duration between t and the unix epoch /// If t is before the unix epoch return 0 fn system_time_to_unix_duration(t: SystemTime) -> Duration { - t.duration_since(time::UNIX_EPOCH) + t.duration_since(UNIX_EPOCH) .unwrap_or(Duration::from_nanos(0)) } diff --git a/libdd-trace-stats/src/stats_exporter.rs b/libdd-trace-stats/src/stats_exporter.rs index c771fd5b2b..adaa09677e 100644 --- a/libdd-trace-stats/src/stats_exporter.rs +++ b/libdd-trace-stats/src/stats_exporter.rs @@ -86,7 +86,7 @@ impl From for StatsMetadata { /// concrete type (`NativeCapabilities` or `WasmCapabilities`). #[derive(Debug)] pub struct StatsExporter< - Cap: HttpClientCapability + SleepCapability, + Cap: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, Con: FlushableConcentrator = SpanConcentrator, > { flush_interval: time::Duration, @@ -102,7 +102,7 @@ pub struct StatsExporter< /// Optional telemetry handle and context key. #[cfg(feature = "telemetry")] telemetry: Option<( - libdd_telemetry::worker::TelemetryWorkerHandle, + libdd_telemetry::worker::TelemetryWorkerHandle, libdd_telemetry::metrics::ContextKey, )>, /// Optional DogStatsD client. @@ -110,8 +110,10 @@ pub struct StatsExporter< dogstatsd: Option>, } -impl - StatsExporter +impl< + Cap: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, + Con: FlushableConcentrator, + > StatsExporter { /// Return a new StatsExporter /// @@ -131,7 +133,7 @@ impl obfuscation_config: SharedStatsComputationObfuscationConfig, #[cfg(feature = "stats-obfuscation")] supported_obfuscation_version: &'static str, #[cfg(feature = "telemetry")] telemetry: Option< - libdd_telemetry::worker::TelemetryWorkerHandle, + libdd_telemetry::worker::TelemetryWorkerHandle, >, #[cfg(feature = "dogstatsd")] dogstatsd: Option>, ) -> Self {