diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 717b037b09..65d8105e7a 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -105,6 +105,7 @@ jobs: # shellcheck disable=SC2086 cargo fmt $FMT_PACKAGES -- --check fi + clippy: needs: setup if: needs.setup.outputs.crates-count != '0' @@ -157,6 +158,67 @@ jobs: cargo clippy $CLIPPY_PACKAGES --all-targets --all-features -- -D warnings fi + clippy-wasm: + needs: setup + if: needs.setup.outputs.crates-count != '0' + name: "clippy #wasm ${{ matrix.rust_version }}" + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + rust_version: ["msrv", "stable", "nightly"] + steps: + - name: Checkout sources + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # 4.2.2 + with: + submodules: recursive + - name: Install ${{ matrix.rust_version }} toolchain and clippy + id: install-toolchain + shell: bash + run: | + VERSION="${{ matrix.rust_version }}" + if [ "$VERSION" = "msrv" ]; then + VERSION="${{ needs.setup.outputs.rust-version }}" + elif [ "$VERSION" = "nightly" ]; then + VERSION="${{ needs.setup.outputs.nightly-version }}" + fi + rustup set profile minimal + rustup install "$VERSION" + rustup component add clippy --toolchain "$VERSION" + rustup target add wasm32-unknown-unknown --toolchain "$VERSION" + echo "version=$VERSION" >> "$GITHUB_OUTPUT" + - name: Cache [rust] + uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # 2.8.1 + with: + cache-targets: true # cache build artifacts + cache-bin: true # cache the ~/.cargo/bin directory + - name: Run clippy on wasm ${{ matrix.rust_version }} + shell: bash + env: + CLIPPY_PACKAGES: ${{ needs.setup.outputs.clippy-packages }} + # Override rust-toolchain.toml so each matrix entry actually runs on + # its chosen toolchain instead of falling back to the workspace MSRV. + RUSTUP_TOOLCHAIN: ${{ steps.install-toolchain.outputs.version }} + run: | + # List of crates that are meant to be compilable for wasm. + WASM_ALLOWLIST="build_common cc_utils datadog-ffe datadog-ffe-test-suite datadog-ipc-macros datadog-sidecar-macros libdd-capabilities libdd-common libdd-data-pipeline libdd-ddsketch libdd-dogstatsd-client libdd-log libdd-otel-thread-ctx libdd-otel-thread-ctx-ffi libdd-profiling-protobuf libdd-remote-config libdd-sampling libdd-shared-runtime libdd-tinybytes libdd-trace-normalization libdd-trace-obfuscation libdd-trace-protobuf libdd-trace-stats libdd-trace-utils sidecar_mockgen" + if [[ -z "$CLIPPY_PACKAGES" ]]; then + WASM_PACKAGES=$(for p in $WASM_ALLOWLIST; do echo "-p $p"; done | xargs) + else + WASM_PACKAGES="" + for pkg in $WASM_ALLOWLIST; do + if echo "$CLIPPY_PACKAGES" | grep -qw -- "$pkg"; then + WASM_PACKAGES="$WASM_PACKAGES -p $pkg" + fi + done + fi + if [[ -z "$WASM_PACKAGES" ]]; then + echo "No wasm-compatible crates affected by this change; skipping." + else + # shellcheck disable=SC2086 + cargo clippy $WASM_PACKAGES --no-default-features --target wasm32-unknown-unknown -- -D warnings + fi + licensecheck: needs: setup if: needs.setup.outputs.crates-count != '0' diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index 0bda6b1b7e..1be29ceba5 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -31,7 +31,10 @@ pub mod config; pub mod exporter; pub mod metrics; -pub use config::{OtlpMetricsConfig, OtlpProtocol, OtlpTraceConfig}; +#[cfg(not(target_arch = "wasm32"))] +pub use config::OtlpMetricsConfig; +pub use config::{OtlpProtocol, OtlpTraceConfig}; pub use exporter::send_otlp_traces_http; pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; +#[cfg(not(target_arch = "wasm32"))] pub use metrics::OtlpStatsExporter; diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 9c2970a475..41dca6f312 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -4,7 +4,9 @@ use crate::agent_info::AgentInfoFetcher; use crate::agentless::config::{AgentlessTraceConfig, DEFAULT_AGENTLESS_TIMEOUT}; use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; -use crate::otlp::{OtlpMetricsConfig, OtlpResourceInfo, OtlpTraceConfig}; +use crate::otlp::OtlpTraceConfig; +#[cfg(not(target_arch = "wasm32"))] +use crate::otlp::{OtlpMetricsConfig, OtlpResourceInfo}; #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; @@ -23,6 +25,8 @@ use arc_swap::ArcSwap; use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability}; use libdd_common::{parse_uri, tag, Endpoint}; use libdd_dogstatsd_client::new; +#[cfg(target_arch = "wasm32")] +use libdd_shared_runtime::LocalRuntime; use libdd_shared_runtime::SharedRuntime; #[cfg(not(target_arch = "wasm32"))] use libdd_shared_runtime::{BlockingRuntime, ForkSafeRuntime}; @@ -113,6 +117,13 @@ impl Default for TraceExporterBuilder { } } +#[cfg(target_arch = "wasm32")] +impl Default for TraceExporterBuilder { + fn default() -> Self { + Self::new() + } +} + impl TraceExporterBuilder { /// Construct a builder with all fields at their initial / `None` state. /// @@ -704,6 +715,7 @@ impl TraceExporterBuilder { otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, }); + #[cfg(not(target_arch = "wasm32"))] let otlp_metrics_config = self.otlp_metrics_endpoint.map(|url| OtlpMetricsConfig { endpoint_url: url, headers: build_otlp_header_map(self.otlp_metrics_headers), diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 52e42dcd67..7cac2cd0fe 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -18,6 +18,8 @@ use self::metrics::MetricsEmitter; use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; use crate::agent_info::ResponseObserver; +#[cfg(not(target_arch = "wasm32"))] +use crate::agent_info::{self, schema::AgentInfo}; use crate::agentless::{send_agentless_traces_http, AgentlessTraceConfig}; use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig}; #[cfg(feature = "telemetry")] @@ -25,12 +27,11 @@ use crate::telemetry::{SendPayloadTelemetry, TelemetryClient}; use crate::trace_exporter::agent_response::{ AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION, }; -use crate::trace_exporter::error::{ - InternalErrorKind, RequestError, ShutdownError, TraceExporterError, -}; +#[cfg(not(target_arch = "wasm32"))] +use crate::trace_exporter::error::ShutdownError; +use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; use crate::trace_exporter::stats::StatsComputationConfig; use crate::{ - agent_info::{self, schema::AgentInfo}, health_metrics, health_metrics::{HealthMetric, SendResult, TransportErrorType}, }; @@ -45,7 +46,9 @@ use libdd_common::Endpoint; use libdd_dogstatsd_client::Client; #[cfg(not(target_arch = "wasm32"))] use libdd_shared_runtime::BlockingRuntime; -use libdd_shared_runtime::{SharedRuntime, WorkerHandle}; +use libdd_shared_runtime::SharedRuntime; +#[cfg(not(target_arch = "wasm32"))] +use libdd_shared_runtime::WorkerHandle; use libdd_trace_utils::msgpack_decoder; use libdd_trace_utils::send_with_retry::{ send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult, @@ -55,9 +58,12 @@ use libdd_trace_utils::trace_utils::TracerHeaderTags; use std::io; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Once}; +#[cfg(any(not(target_arch = "wasm32"), feature = "test-utils"))] use std::time::Duration; use std::{borrow::Borrow, str::FromStr}; +#[cfg(not(target_arch = "wasm32"))] use tokio::task::JoinSet; +#[cfg(not(target_arch = "wasm32"))] use tokio_util::sync::CancellationToken; use tracing::{debug, error, warn}; @@ -249,8 +255,10 @@ pub struct TraceExporter< /// Used to emit a one-shot warning when V1 is requested by the SDK but the agent never /// advertises `/v1.0/traces`. Without it we'd either spam the warning on every `/info` /// poll or stay silent and leave SDK authors without a signal. + #[cfg_attr(target_arch = "wasm32", allow(dead_code))] v1_unavailable_logged: Once, serializer: TraceSerializer, + #[cfg_attr(target_arch = "wasm32", allow(dead_code))] shared_runtime: Arc, /// None if dogstatsd is disabled dogstatsd: Option, @@ -332,38 +340,32 @@ impl< } } + #[cfg(not(target_arch = "wasm32"))] async fn shutdown_workers(self) { - #[cfg(not(target_arch = "wasm32"))] - { - let mut join_set = JoinSet::new(); + let mut join_set = JoinSet::new(); - // Extract the stats handle before moving other fields. - if let StatsComputationStatus::Enabled { worker_handle, .. } = - &**self.client_side_stats.status.load() - { - let handle = worker_handle.clone(); - join_set.spawn(async move { handle.stop().await }); - } + // Extract the stats handle before moving other fields. + if let StatsComputationStatus::Enabled { worker_handle, .. } = + &**self.client_side_stats.status.load() + { + let handle = worker_handle.clone(); + join_set.spawn(async move { handle.stop().await }); + } - if let Some(info_fetcher) = self.workers.info_fetcher { - join_set.spawn(async move { info_fetcher.stop().await }); - } + if let Some(info_fetcher) = self.workers.info_fetcher { + join_set.spawn(async move { info_fetcher.stop().await }); + } - #[cfg(feature = "telemetry")] - if let Some(telemetry) = self.workers.telemetry { - join_set.spawn(async move { telemetry.stop().await }); - } + #[cfg(feature = "telemetry")] + if let Some(telemetry) = self.workers.telemetry { + join_set.spawn(async move { telemetry.stop().await }); + } - while let Some(result) = join_set.join_next().await { - if let Ok(Err(e)) = result { - error!("Worker failed to shutdown: {:?}", e); - } + while let Some(result) = join_set.join_next().await { + if let Ok(Err(e)) = result { + error!("Worker failed to shutdown: {:?}", e); } } - - // On wasm32 workers are no-ops, nothing to stop. - #[cfg(target_arch = "wasm32")] - let _ = self; } /// Send msgpack serialized traces to the agent. diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 81996006d6..8396ffa743 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -9,15 +9,19 @@ #[cfg(not(target_arch = "wasm32"))] use super::add_path; +#[cfg(not(target_arch = "wasm32"))] use super::TracerMetadata; #[cfg(not(target_arch = "wasm32"))] use crate::agent_info::schema::AgentInfo; use arc_swap::ArcSwap; +#[cfg(not(target_arch = "wasm32"))] use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; #[cfg(not(target_arch = "wasm32"))] use libdd_common::Endpoint; use libdd_common::MutexExt; -use libdd_shared_runtime::{SharedRuntime, WorkerHandle}; +#[cfg(not(target_arch = "wasm32"))] +use libdd_shared_runtime::SharedRuntime; +use libdd_shared_runtime::WorkerHandle; use libdd_trace_stats::span_concentrator::SpanConcentrator; #[cfg(feature = "stats-obfuscation")] use libdd_trace_stats::span_concentrator::{ @@ -324,6 +328,10 @@ pub(crate) fn process_traces_for_stats( stats_concentrator, .. } = &**status { + #[cfg_attr( + any(target_arch = "wasm32", not(feature = "telemetry")), + allow(unused_variables, reason = "FIXME: add telemetry on wasm") + )] let dropped_by_trace_filter = trace_filterer.filter_traces(traces); if !client_computed_top_level { diff --git a/libdd-shared-runtime/src/shared_runtime/local.rs b/libdd-shared-runtime/src/shared_runtime/local.rs index c02730d0c8..df4221a086 100644 --- a/libdd-shared-runtime/src/shared_runtime/local.rs +++ b/libdd-shared-runtime/src/shared_runtime/local.rs @@ -4,13 +4,14 @@ use crate::worker::Worker; use futures::stream::{FuturesUnordered, StreamExt}; use libdd_common::MutexExt; +use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use tracing::{debug, error}; use super::{ pausable_worker::PausableWorker, BoxedWorker, SharedRuntime, SharedRuntimeError, WorkerEntry, - WorkerHandle, + WorkerHandle, WorkerRegistry, }; /// Single-threaded local executor runtime for wasm32. @@ -22,7 +23,7 @@ use super::{ /// [`crate::BasicRuntime`] (caller-provided tokio runtime) instead. #[derive(Debug)] pub struct LocalRuntime { - workers: Arc>>, + workers: WorkerRegistry, next_worker_id: AtomicU64, } @@ -31,12 +32,10 @@ impl LocalRuntime { &self, workers_guard: &mut std::sync::MutexGuard>, pausable_worker: PausableWorker, - restart_on_fork: bool, ) -> WorkerHandle { let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed); workers_guard.push(WorkerEntry { id: worker_id, - restart_on_fork, worker: pausable_worker, }); WorkerHandle { @@ -49,7 +48,7 @@ impl LocalRuntime { impl SharedRuntime for LocalRuntime { fn new() -> Result { Ok(Self { - workers: Arc::new(Mutex::new(Vec::new())), + workers: Rc::new(Mutex::new(Vec::new())), next_worker_id: AtomicU64::new(1), }) } @@ -57,7 +56,8 @@ impl SharedRuntime for LocalRuntime { fn spawn_worker( &self, worker: T, - restart_on_fork: bool, + // LocalRuntime has no fork protocol. + _restart_on_fork: bool, ) -> Result { let boxed_worker: BoxedWorker = Box::new(worker); debug!(?boxed_worker, "Spawning worker on LocalRuntime"); @@ -71,7 +71,7 @@ impl SharedRuntime for LocalRuntime { Box::pin(async { Ok(handle.await) }) })?; - Ok(self.push_worker(&mut workers_guard, pausable_worker, restart_on_fork)) + Ok(self.push_worker(&mut workers_guard, pausable_worker)) } async fn shutdown_async(&self) { diff --git a/libdd-shared-runtime/src/shared_runtime/mod.rs b/libdd-shared-runtime/src/shared_runtime/mod.rs index 852283179f..d2d1d67ffb 100644 --- a/libdd-shared-runtime/src/shared_runtime/mod.rs +++ b/libdd-shared-runtime/src/shared_runtime/mod.rs @@ -24,15 +24,25 @@ use crate::worker::Worker; use libdd_capabilities::MaybeSend; use libdd_common::MutexExt; use pausable_worker::{PausableWorker, PausableWorkerError}; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use std::{fmt, io}; +// Shared, reference-counted registry of live workers. On native targets it must be `Arc` because +// runtimes and their `WorkerHandle`s are shared across threads. On wasm the runtime is +// single-threaded and its workers are `!Send`, so `Rc` is both correct and avoids the +// `arc_with_non_send_sync` lint. +#[cfg(not(target_arch = "wasm32"))] +pub(crate) type WorkerRegistry = std::sync::Arc>>; +#[cfg(target_arch = "wasm32")] +pub(crate) type WorkerRegistry = std::rc::Rc>>; + /// A worker registered on a [`SharedRuntime`]. pub(crate) type BoxedWorker = Box; #[derive(Debug)] pub(crate) struct WorkerEntry { pub(crate) id: u64, + #[cfg(not(target_arch = "wasm32"))] pub(crate) restart_on_fork: bool, pub(crate) worker: PausableWorker, } @@ -97,7 +107,7 @@ pub trait BlockingRuntime: SharedRuntime { #[derive(Clone, Debug)] pub struct WorkerHandle { pub(crate) worker_id: u64, - pub(crate) workers: Arc>>, + pub(crate) workers: WorkerRegistry, } #[derive(Debug)] diff --git a/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs b/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs index e39291b48f..8498328a29 100644 --- a/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs +++ b/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs @@ -191,6 +191,7 @@ impl PausableWorker { } /// Reset the worker state (e.g. in a fork child). + #[cfg(not(target_arch = "wasm32"))] pub fn reset(&mut self) { if let PausableWorker::Paused { worker } = self { worker.reset();