From d861987d0761067f772c9a0dd199ac6371300659 Mon Sep 17 00:00:00 2001 From: Lukas Bindreiter Date: Fri, 22 Aug 2025 16:22:43 +0200 Subject: [PATCH 1/2] Allow multiple otel trace and log exporters --- .../workflows/observability/logging.py | 39 +++------- .../workflows/observability/tracing.py | 76 +++++++++++++------ 2 files changed, 65 insertions(+), 50 deletions(-) diff --git a/tilebox-workflows/tilebox/workflows/observability/logging.py b/tilebox-workflows/tilebox/workflows/observability/logging.py index b217ba7..eb02ad8 100644 --- a/tilebox-workflows/tilebox/workflows/observability/logging.py +++ b/tilebox-workflows/tilebox/workflows/observability/logging.py @@ -83,16 +83,11 @@ def _get_attributes(record: logging.LogRecord) -> _ExtendedAttributes: return attributes -def _otel_handler( - level: int = logging.NOTSET, - service: str | Resource | None = None, +def _otel_log_exporter( endpoint: str | None = None, headers: dict[str, str] | None = None, export_interval: timedelta | None = None, -) -> LoggingHandler: - resource = _get_default_resource(service) - logger_provider = LoggerProvider(resource) - +) -> BatchLogRecordProcessor: if endpoint is None: endpoint = os.environ.get(_OTEL_LOGS_ENDPOINT_ENV_VAR, None) if endpoint is None: @@ -115,19 +110,15 @@ def _otel_handler( headers=headers, ) schedule_delay = int(export_interval.total_seconds() * 1000) if export_interval is not None else None - batch_exporter = BatchLogRecordProcessor(exporter, schedule_delay_millis=schedule_delay) # type: ignore[arg-type] + return BatchLogRecordProcessor(exporter, schedule_delay_millis=schedule_delay) # type: ignore[arg-type] - logger_provider.add_log_record_processor(batch_exporter) - return OTELLoggingHandler(level=level, logger_provider=logger_provider) - -def configure_otel_logging( # noqa: PLR0913 +def configure_otel_logging( service: str | Resource | None = None, level: int = logging.DEBUG, endpoint: str | None = None, headers: dict[str, str] | None = None, export_interval: timedelta | None = None, - reconfigure: bool = True, ) -> None: """ Configure logging to an OTLP compatible endpoint. @@ -154,22 +145,21 @@ def configure_otel_logging( # noqa: PLR0913 export_interval: The interval at which to export logs to the endpoint. If not provided, the environment variable OTEL_EXPORT_INTERVAL will be used. If that is not set either, the default open telemetry export interval of 5s will be used. - reconfigure: Only relevant if configure_otel_logging is called multiple times. If True, any previously - configured OTEL logging handlers will be removed. If False, the existing handlers will be kept. Useful - if you want to log to multiple OTEL endpoints. Raises: ValueError: If no endpoint is provided and no OTEL_LOGS_ENDPOINT environment variable is set. """ - handler = _otel_handler(level, service, endpoint, headers, export_interval) + provider = LoggerProvider(resource=_get_default_resource(service)) + + batch_exporter = _otel_log_exporter(endpoint, headers, export_interval) + provider.add_log_record_processor(batch_exporter) + handler = OTELLoggingHandler(level=level, logger_provider=provider) + root_logger = _root_logger() - # clean up previous handlers: - # remove the default handler if it exists, and all other OtelHandlers if reconfigure is True + # clean up the default handler if it exists handlers_to_remove_indices = [ - i - for i, handler in enumerate(root_logger.handlers) - if hasattr(handler, "_is_default") or (reconfigure and isinstance(handler, OTELLoggingHandler)) + i for i, handler in enumerate(root_logger.handlers) if hasattr(handler, "_is_default") ] for i in reversed(handlers_to_remove_indices): # reversed to avoid index shifting after deletion root_logger.handlers.pop(i) @@ -182,7 +172,6 @@ def configure_otel_logging_axiom( level: int = logging.DEBUG, dataset: str | None = None, api_key: str | None = None, - reconfigure: bool = True, ) -> None: """ Configure opentelemetry logging to Axiom. @@ -205,9 +194,6 @@ def configure_otel_logging_axiom( AXIOM_LOGS_DATASET will be used. If that is not set either, an error will be raised. api_key: The API key to use for authentication. If not provided, the environment variable AXIOM_API_KEY will be used. If that is not set either, an error will be raised. - reconfigure: Only relevant if configure_otel_logging_axiom is called multiple times. If True, any previously - configured OTEL logging handlers will be removed. If False, the existing handlers will be kept. Useful - if you want to log to multiple OTEL endpoints. Raises: ValueError: If no dataset is provided and no AXIOM_LOGS_DATASET environment variable is set @@ -234,7 +220,6 @@ def configure_otel_logging_axiom( level, endpoint=_AXIOM_ENDPOINT, headers={"Authorization": f"Bearer {api_key}", "X-Axiom-Dataset": dataset}, - reconfigure=reconfigure, ) diff --git a/tilebox-workflows/tilebox/workflows/observability/tracing.py b/tilebox-workflows/tilebox/workflows/observability/tracing.py index c5d89e2..f8c48ae 100644 --- a/tilebox-workflows/tilebox/workflows/observability/tracing.py +++ b/tilebox-workflows/tilebox/workflows/observability/tracing.py @@ -12,9 +12,9 @@ ) from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.trace import ProxyTracerProvider, get_current_span, get_tracer_provider, set_tracer_provider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanProcessor from opentelemetry.trace import Span as OTSpan +from opentelemetry.trace import get_current_span from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from tilebox.workflows.data import Job @@ -30,17 +30,44 @@ _OTEL_TRACES_ENDPOINT_ENV_VAR = "OTEL_TRACES_ENDPOINT" _OTEL_EXPORT_INTERVAL_ENV_VAR = "OTEL_EXPORT_INTERVAL" +# the globally configured tilebox opentelemetry tracer provider. +# we explicitly avoid using the global opentelemetry tracer provider, because other libraries might also configure +# that (e.g. pytorch does sometimes), and we don't want to interfere with that. +# as default we don't use a proxy tracer provider, that only returns no-op tracers, because we still want to be able +# to extract trace_ids and spans, in case other runners / workflow clients have tracing configured. +# So instead we use a tracer provider without any exporters, which will still create traces and spans, +# but will not send them anywhere. +_tilebox_tracer_provider = TracerProvider() +_workflow_tracers = [] + + +def _get_tilebox_tracer_provider() -> TracerProvider: + return _tilebox_tracer_provider + + +def _set_tilebox_tracer_provider(provider: TracerProvider) -> None: + global _tilebox_tracer_provider # noqa: PLW0603 + _tilebox_tracer_provider = provider + + for tracer in _workflow_tracers: + tracer._swap_provider(provider) # noqa: SLF001 + class WorkflowTracer: def __init__(self) -> None: """Instantiate a tracer from the currently configured global tracer provider.""" - provider = get_tracer_provider() - if isinstance(provider, ProxyTracerProvider): - # if no tracer provider is configured, we still don't want to use the no-op tracer, because we - # want to be able to extract trace_ids and spans, in case other runners / workflow clients have tracing - # configured. So instead we use a tracer provider without any exporters, which will still create traces - # and spans, but will not send them anywhere. - provider = TracerProvider() + provider = _get_tilebox_tracer_provider() + self._tracer = provider.get_tracer(_INSTRUMENTATION_MODULE_NAME) + + # keep track of all workflow tracers, to be able to update them in case the + # global tracer provider is replaced. + _workflow_tracers.append(self) + + def _swap_provider(self, provider: TracerProvider) -> None: + """ + A callback function that get's invoked in case a new tracer provider is configured, to make sure + existing workflow tracers are updated to use the new provider. + """ self._tracer = provider.get_tracer(_INSTRUMENTATION_MODULE_NAME) # functools.wraps is a bit buggy with class methods, so we are not using it here @@ -62,16 +89,11 @@ def get_trace_parent_of_current_span() -> str: return carrier["traceparent"] -def _otel_tracer_provider( - service: str | Resource | None = None, +def _otel_span_exporter( endpoint: str | None = None, headers: dict[str, str] | None = None, export_interval: timedelta | None = None, -) -> TracerProvider: - resource = _get_default_resource(service) - - provider = TracerProvider(resource=resource) - +) -> SpanProcessor: if endpoint is None: endpoint = os.environ.get(_OTEL_TRACES_ENDPOINT_ENV_VAR, None) if endpoint is None: @@ -94,11 +116,7 @@ def _otel_tracer_provider( headers=headers, ) schedule_delay = int(export_interval.total_seconds() * 1000) if export_interval is not None else None - batch_processor = BatchSpanProcessor(exporter, schedule_delay_millis=schedule_delay) # type: ignore[arg-type] - - provider.add_span_processor(batch_processor) - - return provider + return BatchSpanProcessor(exporter, schedule_delay_millis=schedule_delay) # type: ignore[arg-type] class SpanEventLoggingHandler(logging.Handler): @@ -156,8 +174,20 @@ def configure_otel_tracing( Raises: ValueError: If no endpoint is provided and no OTEL_TRACES_ENDPOINT environment variable is set. """ - provider = _otel_tracer_provider(service, endpoint, headers, export_interval) - set_tracer_provider(provider) + provider = _get_tilebox_tracer_provider() + resource = _get_default_resource(service) + if provider.resource.attributes != resource.attributes: + # It's either the first time we configure tracing, or we are trying to reconfigure it with a different resource. + # That means we need to create a new provider. + provider = TracerProvider( + resource=resource, + # keep the existing span processor, so that all previously configured exports are still used as well + active_span_processor=provider._active_span_processor, # noqa: SLF001 + ) + _set_tilebox_tracer_provider(provider) + + exporter = _otel_span_exporter(endpoint, headers, export_interval) + provider.add_span_processor(exporter) # if we configure tracing, we also want to add log messages to active spans, which is a mixture of a logging # tracing feature. But configure this here, because we anyways don't need to do this if tracing is not configured. From 9d69ed173662f88786956f8dbe86ca0d7982badd Mon Sep 17 00:00:00 2001 From: Lukas Bindreiter Date: Fri, 22 Aug 2025 16:26:28 +0200 Subject: [PATCH 2/2] Prepare release v42 --- CHANGELOG.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9ab098..52b3799 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.42.0] - 2025-08-22 + ### Added - `tilebox-storage`: Added `USGSLandsatStorageClient` to download landsat data from the USGS Landsat S3 bucket. @@ -21,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 overwriting the existing task. - `tilebox-workflows`: Fixed a bug where the `deserialize_task` function would fail to deserialize nested dataclasses or protobuf messages that are wrapped in an `Optional` or `Annotated` type hint. +- `tilebox-workflows`: Calling `configure_otel_tracing` and `configure_otel_logging` multiple times correctly configures + multiple exporters instead of overwriting the existing ones. ## [0.41.0] - 2025-08-01 @@ -248,7 +252,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Released packages: `tilebox-datasets`, `tilebox-workflows`, `tilebox-storage`, `tilebox-grpc` -[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.41.0...HEAD +[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.42.0...HEAD +[0.42.0]: https://github.com/tilebox/tilebox-python/compare/v0.41.0...v0.42.0 [0.41.0]: https://github.com/tilebox/tilebox-python/compare/v0.40.0...v0.41.0 [0.40.0]: https://github.com/tilebox/tilebox-python/compare/v0.39.0...v0.40.0 [0.39.0]: https://github.com/tilebox/tilebox-python/compare/v0.38.0...v0.39.0