Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ wasm-bindgen-macro,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/m
wasm-bindgen-macro-support,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/macro-support,MIT OR Apache-2.0,The wasm-bindgen Developers
wasm-bindgen-shared,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/shared,MIT OR Apache-2.0,The wasm-bindgen Developers
web-sys,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/web-sys,MIT OR Apache-2.0,The wasm-bindgen Developers
web-time,https://github.com/daxpedda/web-time,MIT OR Apache-2.0,The web-time Authors
webpki-root-certs,https://github.com/rustls/webpki-roots,CDLA-Permissive-2.0,The webpki-root-certs Authors
webpki-roots,https://github.com/rustls/webpki-roots,CDLA-Permissive-2.0,The webpki-roots Authors
widestring,https://github.com/VoidStarKat/widestring-rs,MIT OR Apache-2.0,The widestring Authors
Expand Down
11 changes: 7 additions & 4 deletions datadog-sidecar/src/self_telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use crate::config::Config;
use crate::log;
use crate::service::SidecarServer;
use crate::watchdog::WatchdogHandle;
use libdd_capabilities_impl::NativeCapabilities;
use libdd_common::{tag, tag::Tag, MutexExt};
use libdd_telemetry::data::metrics::{MetricNamespace, MetricType};
use libdd_telemetry::metrics::ContextKey;
use libdd_telemetry::worker::{
LifecycleAction, TelemetryActions, TelemetryWorkerBuilder, TelemetryWorkerHandle,
};
use libdd_telemetry::worker::{LifecycleAction, TelemetryActions, TelemetryWorkerBuilder};

/// The sidecar runs the telemetry worker on native, so its handle is pinned to
/// [`NativeCapabilities`].
type TelemetryWorkerHandle = libdd_telemetry::worker::TelemetryWorkerHandle<NativeCapabilities>;
use manual_future::ManualFuture;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
Expand Down Expand Up @@ -205,7 +208,7 @@ impl SelfTelemetry {
crate::sidecar_version!().to_string(),
);
builder.config = self.config.clone();
let (worker, join_handle) = builder.spawn();
let (worker, join_handle) = builder.spawn::<NativeCapabilities>();

let metrics = MetricData {
worker: &worker,
Expand Down
7 changes: 6 additions & 1 deletion datadog-sidecar/src/service/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
use zwohash::ZwoHasher;

use libdd_capabilities_impl::NativeCapabilities;
use libdd_common::tag::Tag;
use libdd_telemetry::worker::TelemetryWorkerBuilder;
use serde::{Deserialize, Serialize};
Expand All @@ -33,7 +34,11 @@ use std::time::SystemTime;
use libdd_telemetry::config::Config;
use libdd_telemetry::data::{self, Integration};
use libdd_telemetry::metrics::{ContextKey, MetricContext};
use libdd_telemetry::worker::{LifecycleAction, TelemetryActions, TelemetryWorkerHandle};
use libdd_telemetry::worker::{LifecycleAction, TelemetryActions};

/// Sidecar's telemetry worker is native-only, so its handle is pinned to
/// [`NativeCapabilities`].
type TelemetryWorkerHandle = libdd_telemetry::worker::TelemetryWorkerHandle<NativeCapabilities>;
use manual_future::ManualFuture;
use serde_with::{serde_as, VecSkipError};
use tokio::time::{sleep, sleep_until, Instant as TokioInstant};
Expand Down
1 change: 1 addition & 0 deletions libdd-capabilities-impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ crate-type = ["lib"]
bench = false

[dependencies]
anyhow = "1"
bytes = "1"
http = "1"
libdd-capabilities = { path = "../libdd-capabilities", version = "2.0.0" }
Expand Down
37 changes: 36 additions & 1 deletion libdd-capabilities-impl/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//! Native HTTP client implementation backed by hyper.

mod native {
use std::fs::OpenOptions;
use std::io::Write;
use std::sync::{Arc, OnceLock};

use libdd_capabilities::http::{HttpClientCapability, HttpError};
Expand All @@ -26,6 +28,32 @@ mod native {
}
}

/// Write `body` as a newline-terminated record to the file referenced by `uri` (which must
/// have a `file://` scheme), then return a synthetic 202 response. The same on-disk format
/// the pre-capability telemetry worker used when configured with a `file://` endpoint, so
/// downstream tests that diff against the recorded payload bytes keep working.
Comment on lines +33 to +34

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: too specific

fn write_to_file_endpoint(
uri: &http::Uri,
body: bytes::Bytes,
) -> Result<http::Response<bytes::Bytes>, HttpError> {
let path = libdd_common::decode_uri_path_in_authority(uri)
.map_err(|e| HttpError::Other(anyhow::anyhow!("invalid file:// URI: {e}")))?;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.map_err(|e| HttpError::Other(anyhow::anyhow!("opening {path:?}: {e}")))?;
let mut record = body.to_vec();
record.push(b'\n');
file.write_all(&record)
.map_err(|e| HttpError::Other(anyhow::anyhow!("writing {path:?}: {e}")))?;

http::Response::builder()
.status(http::StatusCode::ACCEPTED)
.body(bytes::Bytes::new())
.map_err(|e| HttpError::Other(e.into()))
}

impl HttpClientCapability for NativeHttpClient {
fn new_client() -> Self {
Self {
Expand All @@ -39,8 +67,15 @@ mod native {
req: http::Request<bytes::Bytes>,
) -> impl std::future::Future<Output = Result<http::Response<bytes::Bytes>, HttpError>> + MaybeSend
{
let client = self.client.get_or_init(new_default_client).clone();
let client_lock = self.client.clone();
async move {
// file:// URIs short-circuit to the on-disk recorder used by tests.
if req.uri().scheme_str() == Some("file") {
let (parts, body) = req.into_parts();
return write_to_file_endpoint(&parts.uri, body);
}
Comment on lines +73 to +76

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking since there not using capabilities, but it seems that crashtracker and the reqwest client also bind custom behavior on the file:// scheme so I wonder if patching the telemetry custom behavior in capabilities is the right solution


let client = client_lock.get_or_init(new_default_client).clone();
let hyper_req = req.map(Body::from_bytes);

let response = client
Expand Down
2 changes: 2 additions & 0 deletions libdd-crashtracker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ anyhow = "1.0"
chrono = {version = "0.4", default-features = false, features = ["std", "clock", "serde"]}
cxx = { version = "1.0", optional = true }
errno = "0.3"
libdd-capabilities = { version = "2.0.0", path = "../libdd-capabilities" }
libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl" }
libdd-common = { version = "5.0.0", path = "../libdd-common" }
libdd-telemetry = { version = "5.0.1", path = "../libdd-telemetry" }
http = "1.1"
Expand Down
33 changes: 18 additions & 15 deletions libdd-crashtracker/src/crash_info/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::{ErrorKind, SigInfo};
use super::{CrashInfo, Metadata, TARGET_TRIPLE};
use anyhow::Context;
use chrono::{DateTime, Utc};
use libdd_capabilities::HttpClientCapability;
use libdd_capabilities_impl::NativeCapabilities;
use libdd_common::Endpoint;
use libdd_telemetry::{
build_host,
Expand Down Expand Up @@ -352,9 +354,9 @@ impl TelemetryCrashUploader {
self.send_telemetry_payload(&payload).await
}

/// Helper to perform actual HTTP (or file) submission via configured telemetry client
/// Helper to perform actual HTTP submission via the native HTTP capability.
async fn send_telemetry_payload(&self, payload: &data::Telemetry<'_>) -> anyhow::Result<()> {
let client = libdd_telemetry::worker::http_client::from_config(&self.cfg);
let client = NativeCapabilities::new_client();
let req = request_builder(&self.cfg)?
.method(http::Method::POST)
.header(
Expand All @@ -369,19 +371,20 @@ impl TelemetryCrashUploader {
libdd_telemetry::worker::http_client::header::REQUEST_TYPE,
"logs",
)
.body(serde_json::to_string(&payload)?.into())?;

tokio::time::timeout(
std::time::Duration::from_millis({
if let Some(endp) = self.cfg.endpoint() {
endp.timeout_ms
} else {
Endpoint::DEFAULT_TIMEOUT
}
}),
client.request(req),
)
.await??;
.body(libdd_capabilities::Bytes::from(serde_json::to_vec(
&payload,
)?))?;

let timeout = std::time::Duration::from_millis({
if let Some(endp) = self.cfg.endpoint() {
endp.timeout_ms
} else {
Endpoint::DEFAULT_TIMEOUT
}
});
tokio::time::timeout(timeout, client.request(req))
.await
.map_err(|_| anyhow::anyhow!("Telemetry crash report timed out"))??;

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions libdd-data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ libdd-tinybytes = { version = "1.1.1", path = "../libdd-tinybytes", features = [
"bytes_string",
"serialization",
] }
web-time = "1"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.23", features = ["time", "test-util"], default-features = false }
Expand Down
3 changes: 2 additions & 1 deletion libdd-data-pipeline/src/otlp/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use libdd_trace_utils::otlp_encoder::mapper::status_code;
use libdd_trace_utils::otlp_encoder::OtlpResourceInfo;
use serde_json::{json, Value};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use std::time::Duration;
use tracing::error;
use web_time::SystemTime;

const METRIC_NAME: &str = "traces.span.sdk.metrics.duration";
const NANOS_PER_SECOND: f64 = 1_000_000_000.0;
Expand Down
8 changes: 6 additions & 2 deletions libdd-data-pipeline/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

//! Provides an abstraction layer to hold metrics that comes from 'SendDataResult'.
use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability};
use libdd_common::tag;
use libdd_telemetry::data::metrics::{MetricNamespace, MetricType};
use libdd_telemetry::metrics::ContextKey;
Expand Down Expand Up @@ -146,7 +147,9 @@ impl Index<MetricKind> for Metrics {

impl Metrics {
/// Creates a new Metrics instance
pub fn new(worker: &TelemetryWorkerHandle) -> Self {
pub fn new<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static>(
worker: &TelemetryWorkerHandle<C>,
) -> Self {
let mut keys = Vec::new();
for metric in METRICS {
let key = worker.register_metric_context(
Expand All @@ -171,6 +174,7 @@ impl Metrics {
#[cfg(test)]
mod tests {
use super::*;
use libdd_capabilities_impl::NativeCapabilities;
use libdd_telemetry::worker::TelemetryWorkerBuilder;

#[cfg_attr(miri, ignore)]
Expand All @@ -182,7 +186,7 @@ mod tests {
"0.1".to_string(),
"1.0".to_string(),
)
.spawn();
.spawn::<NativeCapabilities>();

let metrics = Metrics::new(&worker);

Expand Down
Loading
Loading