From f226dfd6e4fa7a67a4906145bbacca0ed7f23ae0 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Thu, 25 Jun 2026 17:55:06 -0400 Subject: [PATCH] Fix gRPC telemetry exporters --- quickwit/Cargo.lock | 2 +- quickwit/Cargo.toml | 13 ++++++++++++- quickwit/quickwit-cli/src/main.rs | 4 +--- quickwit/quickwit-telemetry-exporters/Cargo.toml | 9 +++++++-- quickwit/quickwit-telemetry-exporters/src/lib.rs | 15 ++++++++++++++- .../src/otlp/config.rs | 13 ++++++++----- .../quickwit-telemetry-exporters/src/otlp/logs.rs | 7 +++++-- .../src/otlp/metrics.rs | 6 +++++- .../src/otlp/traces.rs | 6 ++++-- 9 files changed, 57 insertions(+), 18 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 608c3204836..7d9ac914da6 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -9468,6 +9468,7 @@ dependencies = [ "quickwit-metrics", "serde_json", "time", + "tokio", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -10069,7 +10070,6 @@ checksum = "62e0021ea2c22aed41653bc7e1419abb2c97e038ff2c33d0e1309e49a97deec0" dependencies = [ "base64 0.22.1", "bytes", - "futures-channel", "futures-core", "futures-util", "http 1.4.0", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 547177f3638..c9f0fc98f0d 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -189,7 +189,18 @@ openssl-probe = "0.2" opentelemetry = "0.32" opentelemetry-appender-tracing = "0.32" opentelemetry_sdk = { version = "0.32", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.32", features = ["grpc-tonic", "http-json"] } +# `default-features = false` drops the default `reqwest-blocking-client`: +# see https://github.com/quickwit-oss/quickwit/pull/6558 for more details. +opentelemetry-otlp = { version = "0.32", default-features = false, features = [ + "grpc-tonic", + "http-json", + "http-proto", + "internal-logs", + "logs", + "metrics", + "reqwest-client", + "trace", +] } ouroboros = "0.18" parquet = { version = "58", default-features = false, features = ["arrow", "experimental", "snap", "variant_experimental", "zstd"] } percent-encoding = "2.3" diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 9f8662ccf7e..efcb1ea1d6f 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -147,9 +147,7 @@ async fn main_impl() -> anyhow::Result<()> { } else { 0 }; - - telemetry_handle.shutdown()?; - + telemetry_handle.shutdown().await?; std::process::exit(return_code) } diff --git a/quickwit/quickwit-telemetry-exporters/Cargo.toml b/quickwit/quickwit-telemetry-exporters/Cargo.toml index 616ca25972b..a1973b4dbd8 100644 --- a/quickwit/quickwit-telemetry-exporters/Cargo.toml +++ b/quickwit/quickwit-telemetry-exporters/Cargo.toml @@ -13,15 +13,20 @@ license.workspace = true [dependencies] anyhow = { workspace = true } metrics = { workspace = true } -metrics-opentelemetry = { workspace = true } metrics-exporter-prometheus = { workspace = true } +metrics-opentelemetry = { workspace = true } metrics-util = { workspace = true } opentelemetry = { workspace = true } opentelemetry-appender-tracing = { workspace = true } opentelemetry-otlp = { workspace = true, features = ["experimental-grpc-retry", "experimental-http-retry"] } -opentelemetry_sdk = { workspace = true } +opentelemetry_sdk = { workspace = true, features = [ + "experimental_logs_batch_log_processor_with_async_runtime", + "experimental_metrics_periodicreader_with_async_runtime", + "experimental_trace_batch_span_processor_with_async_runtime", +] } serde_json = { workspace = true } time = { workspace = true, features = ["parsing"] } +tokio = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/quickwit/quickwit-telemetry-exporters/src/lib.rs b/quickwit/quickwit-telemetry-exporters/src/lib.rs index 3b8a93ead22..838d954f35b 100644 --- a/quickwit/quickwit-telemetry-exporters/src/lib.rs +++ b/quickwit/quickwit-telemetry-exporters/src/lib.rs @@ -48,7 +48,20 @@ pub struct TelemetryHandle { } impl TelemetryHandle { - pub fn shutdown(self) -> anyhow::Result<()> { + /// Shuts down the OpenTelemetry providers, flushing any pending telemetry. + /// + /// The providers are driven by processors spawned onto the Tokio runtime, so the underlying + /// `shutdown` blocks while it hands a flush message to those tasks and waits for the + /// acknowledgement. We run it on the blocking pool rather than inline so it never occupies a + /// runtime worker: the flush tasks can then make progress even when the runtime is configured + /// with a single worker. + pub async fn shutdown(self) -> anyhow::Result<()> { + tokio::task::spawn_blocking(move || self.shutdown_blocking()) + .await + .context("failed to join telemetry shutdown task")? + } + + fn shutdown_blocking(self) -> anyhow::Result<()> { if let Some(tracer_provider) = self.tracer_provider { tracer_provider .shutdown() diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/config.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/config.rs index e8edef94cac..48fe64aaee8 100644 --- a/quickwit/quickwit-telemetry-exporters/src/otlp/config.rs +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/config.rs @@ -23,9 +23,9 @@ pub const QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER_ENV_KEY: &str = "QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER"; const OTEL_EXPORTER_OTLP_PROTOCOL_ENV_KEY: &str = "OTEL_EXPORTER_OTLP_PROTOCOL"; -const OTEL_EXPORTER_OTLP_TRACES_PROTOCOL_ENV_KEY: &str = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"; const OTEL_EXPORTER_OTLP_LOGS_PROTOCOL_ENV_KEY: &str = "OTEL_EXPORTER_OTLP_LOGS_PROTOCOL"; const OTEL_EXPORTER_OTLP_METRICS_PROTOCOL_ENV_KEY: &str = "OTEL_EXPORTER_OTLP_METRICS_PROTOCOL"; +const OTEL_EXPORTER_OTLP_TRACES_PROTOCOL_ENV_KEY: &str = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum OtlpProtocol { @@ -39,17 +39,20 @@ impl FromStr for OtlpProtocol { fn from_str(protocol_str: &str) -> anyhow::Result { const OTLP_PROTOCOL_GRPC: &str = "grpc"; - const OTLP_PROTOCOL_HTTP_PROTOBUF: &str = "http/protobuf"; const OTLP_PROTOCOL_HTTP_JSON: &str = "http/json"; + const OTLP_PROTOCOL_HTTP_PROTO: &str = "http/proto"; + const OTLP_PROTOCOL_HTTP_PROTOBUF: &str = "http/protobuf"; match protocol_str { OTLP_PROTOCOL_GRPC => Ok(OtlpProtocol::Grpc), - OTLP_PROTOCOL_HTTP_PROTOBUF => Ok(OtlpProtocol::HttpProtobuf), OTLP_PROTOCOL_HTTP_JSON => Ok(OtlpProtocol::HttpJson), + OTLP_PROTOCOL_HTTP_PROTO | OTLP_PROTOCOL_HTTP_PROTOBUF => { + Ok(OtlpProtocol::HttpProtobuf) + } other => anyhow::bail!( "unsupported OTLP protocol `{other}`, supported values are \ - `{OTLP_PROTOCOL_GRPC}`, `{OTLP_PROTOCOL_HTTP_PROTOBUF}` and \ - `{OTLP_PROTOCOL_HTTP_JSON}`" + `{OTLP_PROTOCOL_GRPC}`, `{OTLP_PROTOCOL_HTTP_JSON}`, and \ + `{OTLP_PROTOCOL_HTTP_PROTO}`" ), } } diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/logs.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/logs.rs index 4f2949bb612..bc02e260731 100644 --- a/quickwit/quickwit-telemetry-exporters/src/otlp/logs.rs +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/logs.rs @@ -16,8 +16,9 @@ use anyhow::Context; use opentelemetry_otlp::{ LogExporter, Protocol as OtlpWireProtocol, WithExportConfig, WithHttpConfig, WithTonicConfig, }; -use opentelemetry_sdk::Resource; use opentelemetry_sdk::logs::SdkLoggerProvider; +use opentelemetry_sdk::logs::log_processor_with_async_runtime::BatchLogProcessor; +use opentelemetry_sdk::{Resource, runtime}; use crate::otlp::{OtlpExporterConfig, OtlpProtocol}; @@ -43,14 +44,16 @@ impl OtlpProtocol { } } +/// Builds the OTLP logger provider. pub(crate) fn init_logger_provider( otlp_config: &OtlpExporterConfig, resource: Resource, ) -> anyhow::Result { let logs_protocol = otlp_config.logs_protocol()?; let log_exporter = logs_protocol.log_exporter()?; + let log_processor = BatchLogProcessor::builder(log_exporter, runtime::Tokio).build(); Ok(SdkLoggerProvider::builder() .with_resource(resource) - .with_batch_exporter(log_exporter) + .with_log_processor(log_processor) .build()) } diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs index d43eb3dceb0..793b8b56b98 100644 --- a/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs @@ -19,6 +19,8 @@ use opentelemetry_otlp::{ MetricExporter, Protocol as OtlpWireProtocol, WithExportConfig, WithHttpConfig, WithTonicConfig, }; use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader; +use opentelemetry_sdk::runtime; use crate::otlp::{OtlpExporterConfig, OtlpProtocol, quickwit_resource}; @@ -44,15 +46,17 @@ impl OtlpProtocol { } } +/// Builds the OTLP metrics recorder and its meter provider. pub(crate) fn build_recorder( service_version: &str, otlp_config: &OtlpExporterConfig, ) -> anyhow::Result<(OpenTelemetryRecorder, SdkMeterProvider)> { let metrics_protocol = otlp_config.metrics_protocol()?; let metric_exporter = metrics_protocol.metric_exporter()?; + let metric_reader = PeriodicReader::builder(metric_exporter, runtime::Tokio).build(); let metrics_provider = SdkMeterProvider::builder() .with_resource(quickwit_resource(service_version)) - .with_periodic_exporter(metric_exporter) + .with_reader(metric_reader) .build(); let meter = metrics_provider.meter("quickwit"); diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/traces.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/traces.rs index 67b5db32e18..7bb6879779f 100644 --- a/quickwit/quickwit-telemetry-exporters/src/otlp/traces.rs +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/traces.rs @@ -16,8 +16,9 @@ use anyhow::Context; use opentelemetry_otlp::{ Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig, WithHttpConfig, WithTonicConfig, }; +use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor; use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider}; -use opentelemetry_sdk::{Resource, trace}; +use opentelemetry_sdk::{Resource, runtime}; use crate::otlp::{OtlpExporterConfig, OtlpProtocol}; @@ -43,13 +44,14 @@ impl OtlpProtocol { } } +/// Builds the OTLP tracer provider. pub(crate) fn init_tracer_provider( otlp_config: &OtlpExporterConfig, resource: Resource, ) -> anyhow::Result { let traces_protocol = otlp_config.traces_protocol()?; let span_exporter = traces_protocol.span_exporter()?; - let span_processor = trace::BatchSpanProcessor::builder(span_exporter) + let span_processor = BatchSpanProcessor::builder(span_exporter, runtime::Tokio) .with_batch_config( BatchConfigBuilder::default() // Quickwit can generate a lot of spans, especially in debug mode, and the