From a9df7accf70afd25e27d6232b67f24b4a6f1c631 Mon Sep 17 00:00:00 2001 From: Nicolas Coiffier Date: Thu, 26 Mar 2026 11:56:02 +0100 Subject: [PATCH] Extract W3C traceparent header and set it as Span parent --- quickwit/Cargo.lock | 5 +++++ quickwit/Cargo.toml | 1 + quickwit/quickwit-proto/src/lib.rs | 18 ++++++++++++++++++ quickwit/quickwit-serve/Cargo.toml | 5 +++++ quickwit/quickwit-serve/src/rest.rs | 19 ++++++++++++++++++- 5 files changed, 47 insertions(+), 1 deletion(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 6204f0ae0a1..9b3d0ed7391 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7394,6 +7394,8 @@ dependencies = [ "mime_guess", "mockall", "once_cell", + "opentelemetry", + "opentelemetry_sdk", "percent-encoding", "pprof", "prost 0.14.1", @@ -7438,6 +7440,8 @@ dependencies = [ "tower 0.5.2", "tower-http", "tracing", + "tracing-opentelemetry", + "tracing-subscriber", "utoipa", "warp", "zstd", @@ -10132,6 +10136,7 @@ dependencies = [ "tower 0.5.2", "tower-layer", "tower-service", + "tracing", ] [[package]] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 2b61b579fbf..f726f5e59bd 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -278,6 +278,7 @@ tower-http = { version = "0.6", features = [ "compression-gzip", "compression-zstd", "cors", + "trace", ] } tracing = "0.1" tracing-opentelemetry = "0.32" diff --git a/quickwit/quickwit-proto/src/lib.rs b/quickwit/quickwit-proto/src/lib.rs index f89fdb97687..2f68625c3d4 100644 --- a/quickwit/quickwit-proto/src/lib.rs +++ b/quickwit/quickwit-proto/src/lib.rs @@ -203,6 +203,24 @@ pub fn set_parent_span_from_request_metadata(request_metadata: &tonic::metadata: let _ = Span::current().set_parent(parent_cx); } +/// `HeaderMap` extracts OpenTelemetry tracing keys from HTTP headers. +struct HeaderMap<'a>(&'a http::HeaderMap); + +impl Extractor for HeaderMap<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(|key| key.as_str()).collect() + } +} + +/// Extracts an OpenTelemetry context from HTTP [`http::HeaderMap`]. +pub fn extract_context_from_request_headers(headers: &http::HeaderMap) -> ::opentelemetry::Context { + global::get_text_map_propagator(|prop| prop.extract(&HeaderMap(headers))) +} + impl search::SortOrder { #[inline(always)] pub fn compare_opt(&self, this: &Option, other: &Option) -> Ordering { diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 2721aa719f3..2fd33c79e62 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -53,6 +53,7 @@ tonic-reflection = { workspace = true } tower = { workspace = true, features = ["limit"] } tower-http = { workspace = true } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } utoipa = { workspace = true } warp = { workspace = true, features = ["server"] } zstd = { workspace = true } @@ -85,10 +86,14 @@ assert-json-diff = { workspace = true } http = { workspace = true } itertools = { workspace = true } mockall = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } +tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true } quickwit-actors = { workspace = true, features = ["testsuite"] } quickwit-cluster = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 3f193783b04..6033510e057 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -21,6 +21,7 @@ use hyper_util::server::conn::auto::Builder; use hyper_util::service::TowerToHyperService; use quickwit_common::tower::BoxFutureInfaillible; use quickwit_config::{disable_ingest_v1, enable_ingest_v2}; +use quickwit_proto::extract_context_from_request_headers; use quickwit_search::SearchService; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; @@ -30,7 +31,9 @@ use tower::ServiceBuilder; use tower_http::compression::CompressionLayer; use tower_http::compression::predicate::{NotForContentType, Predicate, SizeAbove}; use tower_http::cors::{AllowOrigin, CorsLayer}; -use tracing::{error, info}; +use tower_http::trace::TraceLayer; +use tracing::{Level, error, info}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use warp::filters::log::Info; use warp::hyper::http::HeaderValue; use warp::hyper::{Method, StatusCode, http}; @@ -208,7 +211,21 @@ pub(crate) async fn start_rest_server( let compression_predicate = CompressionPredicate::from_env().and(NotForContentType::IMAGES); let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins); + let trace_layer = TraceLayer::new_for_http().make_span_with(|request: &http::Request<_>| { + let span = tracing::span!( + Level::INFO, + "http_request", + otel.kind = "Server", + http.method = %request.method(), + http.target = %request.uri(), + ); + let ctx = extract_context_from_request_headers(request.headers()); + let _ = span.set_parent(ctx); + span + }); + let service = ServiceBuilder::new() + .layer(trace_layer) .layer( CompressionLayer::new() .zstd(true)