Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ jobs:
# shellcheck disable=SC2086
cargo fmt $FMT_PACKAGES -- --check
fi

clippy:
needs: setup
if: needs.setup.outputs.crates-count != '0'
Expand Down Expand Up @@ -157,6 +158,67 @@ jobs:
cargo clippy $CLIPPY_PACKAGES --all-targets --all-features -- -D warnings
fi

clippy-wasm:
needs: setup
if: needs.setup.outputs.crates-count != '0'
name: "clippy #wasm ${{ matrix.rust_version }}"
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
rust_version: ["msrv", "stable", "nightly"]
steps:
- name: Checkout sources
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # 4.2.2
with:
submodules: recursive
- name: Install ${{ matrix.rust_version }} toolchain and clippy
id: install-toolchain
shell: bash
run: |
VERSION="${{ matrix.rust_version }}"
if [ "$VERSION" = "msrv" ]; then
VERSION="${{ needs.setup.outputs.rust-version }}"
elif [ "$VERSION" = "nightly" ]; then
VERSION="${{ needs.setup.outputs.nightly-version }}"
fi
rustup set profile minimal
rustup install "$VERSION"
rustup component add clippy --toolchain "$VERSION"
rustup target add wasm32-unknown-unknown --toolchain "$VERSION"
echo "version=$VERSION" >> "$GITHUB_OUTPUT"
- name: Cache [rust]
uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # 2.8.1
with:
cache-targets: true # cache build artifacts
cache-bin: true # cache the ~/.cargo/bin directory
- name: Run clippy on wasm ${{ matrix.rust_version }}
shell: bash
env:
CLIPPY_PACKAGES: ${{ needs.setup.outputs.clippy-packages }}
# Override rust-toolchain.toml so each matrix entry actually runs on
# its chosen toolchain instead of falling back to the workspace MSRV.
RUSTUP_TOOLCHAIN: ${{ steps.install-toolchain.outputs.version }}
run: |
# List of crates that are meant to be compilable for wasm.
WASM_ALLOWLIST="build_common cc_utils datadog-ffe datadog-ffe-test-suite datadog-ipc-macros datadog-sidecar-macros libdd-capabilities libdd-common libdd-data-pipeline libdd-ddsketch libdd-dogstatsd-client libdd-log libdd-otel-thread-ctx libdd-otel-thread-ctx-ffi libdd-profiling-protobuf libdd-remote-config libdd-sampling libdd-shared-runtime libdd-tinybytes libdd-trace-normalization libdd-trace-obfuscation libdd-trace-protobuf libdd-trace-stats libdd-trace-utils sidecar_mockgen"
if [[ -z "$CLIPPY_PACKAGES" ]]; then
WASM_PACKAGES=$(for p in $WASM_ALLOWLIST; do echo "-p $p"; done | xargs)
else
WASM_PACKAGES=""
for pkg in $WASM_ALLOWLIST; do
if echo "$CLIPPY_PACKAGES" | grep -qw -- "$pkg"; then
WASM_PACKAGES="$WASM_PACKAGES -p $pkg"
fi
done
fi
if [[ -z "$WASM_PACKAGES" ]]; then
echo "No wasm-compatible crates affected by this change; skipping."
else
# shellcheck disable=SC2086
cargo clippy $WASM_PACKAGES --no-default-features --target wasm32-unknown-unknown -- -D warnings
fi

licensecheck:
needs: setup
if: needs.setup.outputs.crates-count != '0'
Expand Down
5 changes: 4 additions & 1 deletion libdd-data-pipeline/src/otlp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ pub mod config;
pub mod exporter;
pub mod metrics;

pub use config::{OtlpMetricsConfig, OtlpProtocol, OtlpTraceConfig};
#[cfg(not(target_arch = "wasm32"))]
pub use config::OtlpMetricsConfig;
pub use config::{OtlpProtocol, OtlpTraceConfig};
pub use exporter::send_otlp_traces_http;
pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo};
#[cfg(not(target_arch = "wasm32"))]
pub use metrics::OtlpStatsExporter;
14 changes: 13 additions & 1 deletion libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
use crate::agent_info::AgentInfoFetcher;
use crate::agentless::config::{AgentlessTraceConfig, DEFAULT_AGENTLESS_TIMEOUT};
use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT};
use crate::otlp::{OtlpMetricsConfig, OtlpResourceInfo, OtlpTraceConfig};
use crate::otlp::OtlpTraceConfig;
#[cfg(not(target_arch = "wasm32"))]
use crate::otlp::{OtlpMetricsConfig, OtlpResourceInfo};
#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))]
use crate::telemetry::TelemetryClientBuilder;
use crate::trace_exporter::agent_response::AgentResponsePayloadVersion;
Expand All @@ -23,6 +25,8 @@ use arc_swap::ArcSwap;
use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability};
use libdd_common::{parse_uri, tag, Endpoint};
use libdd_dogstatsd_client::new;
#[cfg(target_arch = "wasm32")]
use libdd_shared_runtime::LocalRuntime;
use libdd_shared_runtime::SharedRuntime;
#[cfg(not(target_arch = "wasm32"))]
use libdd_shared_runtime::{BlockingRuntime, ForkSafeRuntime};
Expand Down Expand Up @@ -113,6 +117,13 @@ impl Default for TraceExporterBuilder<ForkSafeRuntime> {
}
}

#[cfg(target_arch = "wasm32")]
impl Default for TraceExporterBuilder<LocalRuntime> {
fn default() -> Self {
Self::new()
}
}

impl<R: SharedRuntime> TraceExporterBuilder<R> {
/// Construct a builder with all fields at their initial / `None` state.
///
Expand Down Expand Up @@ -704,6 +715,7 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
otel_trace_semantics_enabled: self.otel_trace_semantics_enabled,
});

#[cfg(not(target_arch = "wasm32"))]
let otlp_metrics_config = self.otlp_metrics_endpoint.map(|url| OtlpMetricsConfig {
endpoint_url: url,
headers: build_otlp_header_map(self.otlp_metrics_headers),
Expand Down
62 changes: 32 additions & 30 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ use self::metrics::MetricsEmitter;
use self::stats::StatsComputationStatus;
use self::trace_serializer::TraceSerializer;
use crate::agent_info::ResponseObserver;
#[cfg(not(target_arch = "wasm32"))]
use crate::agent_info::{self, schema::AgentInfo};
use crate::agentless::{send_agentless_traces_http, AgentlessTraceConfig};
use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig};
#[cfg(feature = "telemetry")]
use crate::telemetry::{SendPayloadTelemetry, TelemetryClient};
use crate::trace_exporter::agent_response::{
AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION,
};
use crate::trace_exporter::error::{
InternalErrorKind, RequestError, ShutdownError, TraceExporterError,
};
#[cfg(not(target_arch = "wasm32"))]
use crate::trace_exporter::error::ShutdownError;
use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError};
use crate::trace_exporter::stats::StatsComputationConfig;
use crate::{
agent_info::{self, schema::AgentInfo},
health_metrics,
health_metrics::{HealthMetric, SendResult, TransportErrorType},
};
Expand All @@ -45,7 +46,9 @@ use libdd_common::Endpoint;
use libdd_dogstatsd_client::Client;
#[cfg(not(target_arch = "wasm32"))]
use libdd_shared_runtime::BlockingRuntime;
use libdd_shared_runtime::{SharedRuntime, WorkerHandle};
use libdd_shared_runtime::SharedRuntime;
#[cfg(not(target_arch = "wasm32"))]
use libdd_shared_runtime::WorkerHandle;
use libdd_trace_utils::msgpack_decoder;
use libdd_trace_utils::send_with_retry::{
send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
Expand All @@ -55,9 +58,12 @@ use libdd_trace_utils::trace_utils::TracerHeaderTags;
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Once};
#[cfg(any(not(target_arch = "wasm32"), feature = "test-utils"))]
use std::time::Duration;
use std::{borrow::Borrow, str::FromStr};
#[cfg(not(target_arch = "wasm32"))]
use tokio::task::JoinSet;
#[cfg(not(target_arch = "wasm32"))]
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, warn};

Expand Down Expand Up @@ -249,8 +255,10 @@ pub struct TraceExporter<
/// Used to emit a one-shot warning when V1 is requested by the SDK but the agent never
/// advertises `/v1.0/traces`. Without it we'd either spam the warning on every `/info`
/// poll or stay silent and leave SDK authors without a signal.
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
v1_unavailable_logged: Once,
serializer: TraceSerializer,
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
shared_runtime: Arc<R>,
/// None if dogstatsd is disabled
dogstatsd: Option<Client>,
Expand Down Expand Up @@ -332,38 +340,32 @@ impl<
}
}

#[cfg(not(target_arch = "wasm32"))]
async fn shutdown_workers(self) {
#[cfg(not(target_arch = "wasm32"))]
{
let mut join_set = JoinSet::new();
let mut join_set = JoinSet::new();

// Extract the stats handle before moving other fields.
if let StatsComputationStatus::Enabled { worker_handle, .. } =
&**self.client_side_stats.status.load()
{
let handle = worker_handle.clone();
join_set.spawn(async move { handle.stop().await });
}
// Extract the stats handle before moving other fields.
if let StatsComputationStatus::Enabled { worker_handle, .. } =
&**self.client_side_stats.status.load()
{
let handle = worker_handle.clone();
join_set.spawn(async move { handle.stop().await });
}

if let Some(info_fetcher) = self.workers.info_fetcher {
join_set.spawn(async move { info_fetcher.stop().await });
}
if let Some(info_fetcher) = self.workers.info_fetcher {
join_set.spawn(async move { info_fetcher.stop().await });
}

#[cfg(feature = "telemetry")]
if let Some(telemetry) = self.workers.telemetry {
join_set.spawn(async move { telemetry.stop().await });
}
#[cfg(feature = "telemetry")]
if let Some(telemetry) = self.workers.telemetry {
join_set.spawn(async move { telemetry.stop().await });
}

while let Some(result) = join_set.join_next().await {
if let Ok(Err(e)) = result {
error!("Worker failed to shutdown: {:?}", e);
}
while let Some(result) = join_set.join_next().await {
if let Ok(Err(e)) = result {
error!("Worker failed to shutdown: {:?}", e);
}
}

// On wasm32 workers are no-ops, nothing to stop.
#[cfg(target_arch = "wasm32")]
let _ = self;
}

/// Send msgpack serialized traces to the agent.
Expand Down
10 changes: 9 additions & 1 deletion libdd-data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@

#[cfg(not(target_arch = "wasm32"))]
use super::add_path;
#[cfg(not(target_arch = "wasm32"))]
use super::TracerMetadata;
#[cfg(not(target_arch = "wasm32"))]
use crate::agent_info::schema::AgentInfo;
use arc_swap::ArcSwap;
#[cfg(not(target_arch = "wasm32"))]
use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability};
#[cfg(not(target_arch = "wasm32"))]
use libdd_common::Endpoint;
use libdd_common::MutexExt;
use libdd_shared_runtime::{SharedRuntime, WorkerHandle};
#[cfg(not(target_arch = "wasm32"))]
use libdd_shared_runtime::SharedRuntime;
use libdd_shared_runtime::WorkerHandle;
use libdd_trace_stats::span_concentrator::SpanConcentrator;
#[cfg(feature = "stats-obfuscation")]
use libdd_trace_stats::span_concentrator::{
Expand Down Expand Up @@ -324,6 +328,10 @@ pub(crate) fn process_traces_for_stats<T: libdd_trace_utils::span::TraceData>(
stats_concentrator, ..
} = &**status
{
#[cfg_attr(
any(target_arch = "wasm32", not(feature = "telemetry")),
allow(unused_variables, reason = "FIXME: add telemetry on wasm")
)]
let dropped_by_trace_filter = trace_filterer.filter_traces(traces);

if !client_computed_top_level {
Expand Down
16 changes: 8 additions & 8 deletions libdd-shared-runtime/src/shared_runtime/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
use crate::worker::Worker;
use futures::stream::{FuturesUnordered, StreamExt};
use libdd_common::MutexExt;
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use tracing::{debug, error};

use super::{
pausable_worker::PausableWorker, BoxedWorker, SharedRuntime, SharedRuntimeError, WorkerEntry,
WorkerHandle,
WorkerHandle, WorkerRegistry,
};

/// Single-threaded local executor runtime for wasm32.
Expand All @@ -22,7 +23,7 @@ use super::{
/// [`crate::BasicRuntime`] (caller-provided tokio runtime) instead.
#[derive(Debug)]
pub struct LocalRuntime {
workers: Arc<Mutex<Vec<WorkerEntry>>>,
workers: WorkerRegistry,
next_worker_id: AtomicU64,
}

Expand All @@ -31,12 +32,10 @@ impl LocalRuntime {
&self,
workers_guard: &mut std::sync::MutexGuard<Vec<WorkerEntry>>,
pausable_worker: PausableWorker<BoxedWorker>,
restart_on_fork: bool,
) -> WorkerHandle {
let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed);
workers_guard.push(WorkerEntry {
id: worker_id,
restart_on_fork,
worker: pausable_worker,
});
WorkerHandle {
Expand All @@ -49,15 +48,16 @@ impl LocalRuntime {
impl SharedRuntime for LocalRuntime {
fn new() -> Result<Self, SharedRuntimeError> {
Ok(Self {
workers: Arc::new(Mutex::new(Vec::new())),
workers: Rc::new(Mutex::new(Vec::new())),
next_worker_id: AtomicU64::new(1),
})
}

fn spawn_worker<T: Worker + Sync + 'static>(
&self,
worker: T,
restart_on_fork: bool,
// LocalRuntime has no fork protocol.
_restart_on_fork: bool,
) -> Result<WorkerHandle, SharedRuntimeError> {
let boxed_worker: BoxedWorker = Box::new(worker);
debug!(?boxed_worker, "Spawning worker on LocalRuntime");
Expand All @@ -71,7 +71,7 @@ impl SharedRuntime for LocalRuntime {
Box::pin(async { Ok(handle.await) })
})?;

Ok(self.push_worker(&mut workers_guard, pausable_worker, restart_on_fork))
Ok(self.push_worker(&mut workers_guard, pausable_worker))
}

async fn shutdown_async(&self) {
Expand Down
14 changes: 12 additions & 2 deletions libdd-shared-runtime/src/shared_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,25 @@ use crate::worker::Worker;
use libdd_capabilities::MaybeSend;
use libdd_common::MutexExt;
use pausable_worker::{PausableWorker, PausableWorkerError};
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use std::{fmt, io};

// Shared, reference-counted registry of live workers. On native targets it must be `Arc` because
// runtimes and their `WorkerHandle`s are shared across threads. On wasm the runtime is
// single-threaded and its workers are `!Send`, so `Rc` is both correct and avoids the
// `arc_with_non_send_sync` lint.
#[cfg(not(target_arch = "wasm32"))]
pub(crate) type WorkerRegistry = std::sync::Arc<Mutex<Vec<WorkerEntry>>>;
#[cfg(target_arch = "wasm32")]
pub(crate) type WorkerRegistry = std::rc::Rc<Mutex<Vec<WorkerEntry>>>;

/// A worker registered on a [`SharedRuntime`].
pub(crate) type BoxedWorker = Box<dyn Worker + Sync>;

#[derive(Debug)]
pub(crate) struct WorkerEntry {
pub(crate) id: u64,
#[cfg(not(target_arch = "wasm32"))]
pub(crate) restart_on_fork: bool,
pub(crate) worker: PausableWorker<BoxedWorker>,
}
Expand Down Expand Up @@ -97,7 +107,7 @@ pub trait BlockingRuntime: SharedRuntime {
#[derive(Clone, Debug)]
pub struct WorkerHandle {
pub(crate) worker_id: u64,
pub(crate) workers: Arc<Mutex<Vec<WorkerEntry>>>,
pub(crate) workers: WorkerRegistry,
}

#[derive(Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl<T: Worker + MaybeSend + Sync + 'static> PausableWorker<T> {
}

/// Reset the worker state (e.g. in a fork child).
#[cfg(not(target_arch = "wasm32"))]
pub fn reset(&mut self) {
if let PausableWorker::Paused { worker } = self {
worker.reset();
Expand Down
Loading