Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.42.0] - 2025-08-22

### Added

- `tilebox-storage`: Added `USGSLandsatStorageClient` to download landsat data from the USGS Landsat S3 bucket.
Expand All @@ -21,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
overwriting the existing task.
- `tilebox-workflows`: Fixed a bug where the `deserialize_task` function would fail to deserialize nested dataclasses or
protobuf messages that are wrapped in an `Optional` or `Annotated` type hint.
- `tilebox-workflows`: Calling `configure_otel_tracing` and `configure_otel_logging` multiple times correctly configures
multiple exporters instead of overwriting the existing ones.

## [0.41.0] - 2025-08-01

Expand Down Expand Up @@ -248,7 +252,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Released packages: `tilebox-datasets`, `tilebox-workflows`, `tilebox-storage`, `tilebox-grpc`


[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.41.0...HEAD
[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.42.0...HEAD
[0.42.0]: https://github.com/tilebox/tilebox-python/compare/v0.41.0...v0.42.0
[0.41.0]: https://github.com/tilebox/tilebox-python/compare/v0.40.0...v0.41.0
[0.40.0]: https://github.com/tilebox/tilebox-python/compare/v0.39.0...v0.40.0
[0.39.0]: https://github.com/tilebox/tilebox-python/compare/v0.38.0...v0.39.0
Expand Down
39 changes: 12 additions & 27 deletions tilebox-workflows/tilebox/workflows/observability/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,11 @@ def _get_attributes(record: logging.LogRecord) -> _ExtendedAttributes:
return attributes


def _otel_handler(
level: int = logging.NOTSET,
service: str | Resource | None = None,
def _otel_log_exporter(
endpoint: str | None = None,
headers: dict[str, str] | None = None,
export_interval: timedelta | None = None,
) -> LoggingHandler:
resource = _get_default_resource(service)
logger_provider = LoggerProvider(resource)

) -> BatchLogRecordProcessor:
if endpoint is None:
endpoint = os.environ.get(_OTEL_LOGS_ENDPOINT_ENV_VAR, None)
if endpoint is None:
Expand All @@ -115,19 +110,15 @@ def _otel_handler(
headers=headers,
)
schedule_delay = int(export_interval.total_seconds() * 1000) if export_interval is not None else None
batch_exporter = BatchLogRecordProcessor(exporter, schedule_delay_millis=schedule_delay) # type: ignore[arg-type]
return BatchLogRecordProcessor(exporter, schedule_delay_millis=schedule_delay) # type: ignore[arg-type]

logger_provider.add_log_record_processor(batch_exporter)
return OTELLoggingHandler(level=level, logger_provider=logger_provider)


def configure_otel_logging( # noqa: PLR0913
def configure_otel_logging(
service: str | Resource | None = None,
level: int = logging.DEBUG,
endpoint: str | None = None,
headers: dict[str, str] | None = None,
export_interval: timedelta | None = None,
reconfigure: bool = True,
) -> None:
"""
Configure logging to an OTLP compatible endpoint.
Expand All @@ -154,22 +145,21 @@ def configure_otel_logging( # noqa: PLR0913
export_interval: The interval at which to export logs to the endpoint. If not provided, the
environment variable OTEL_EXPORT_INTERVAL will be used. If that is not set either, the default open
telemetry export interval of 5s will be used.
reconfigure: Only relevant if configure_otel_logging is called multiple times. If True, any previously
configured OTEL logging handlers will be removed. If False, the existing handlers will be kept. Useful
if you want to log to multiple OTEL endpoints.

Raises:
ValueError: If no endpoint is provided and no OTEL_LOGS_ENDPOINT environment variable is set.
"""
handler = _otel_handler(level, service, endpoint, headers, export_interval)
provider = LoggerProvider(resource=_get_default_resource(service))

batch_exporter = _otel_log_exporter(endpoint, headers, export_interval)
provider.add_log_record_processor(batch_exporter)
handler = OTELLoggingHandler(level=level, logger_provider=provider)

root_logger = _root_logger()

# clean up previous handlers:
# remove the default handler if it exists, and all other OtelHandlers if reconfigure is True
# clean up the default handler if it exists
handlers_to_remove_indices = [
i
for i, handler in enumerate(root_logger.handlers)
if hasattr(handler, "_is_default") or (reconfigure and isinstance(handler, OTELLoggingHandler))
i for i, handler in enumerate(root_logger.handlers) if hasattr(handler, "_is_default")
]
for i in reversed(handlers_to_remove_indices): # reversed to avoid index shifting after deletion
root_logger.handlers.pop(i)
Expand All @@ -182,7 +172,6 @@ def configure_otel_logging_axiom(
level: int = logging.DEBUG,
dataset: str | None = None,
api_key: str | None = None,
reconfigure: bool = True,
) -> None:
"""
Configure opentelemetry logging to Axiom.
Expand All @@ -205,9 +194,6 @@ def configure_otel_logging_axiom(
AXIOM_LOGS_DATASET will be used. If that is not set either, an error will be raised.
api_key: The API key to use for authentication. If not provided, the environment variable AXIOM_API_KEY will be
used. If that is not set either, an error will be raised.
reconfigure: Only relevant if configure_otel_logging_axiom is called multiple times. If True, any previously
configured OTEL logging handlers will be removed. If False, the existing handlers will be kept. Useful
if you want to log to multiple OTEL endpoints.

Raises:
ValueError: If no dataset is provided and no AXIOM_LOGS_DATASET environment variable is set
Expand All @@ -234,7 +220,6 @@ def configure_otel_logging_axiom(
level,
endpoint=_AXIOM_ENDPOINT,
headers={"Authorization": f"Bearer {api_key}", "X-Axiom-Dataset": dataset},
reconfigure=reconfigure,
)


Expand Down
76 changes: 53 additions & 23 deletions tilebox-workflows/tilebox/workflows/observability/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import ProxyTracerProvider, get_current_span, get_tracer_provider, set_tracer_provider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanProcessor
from opentelemetry.trace import Span as OTSpan
from opentelemetry.trace import get_current_span
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from tilebox.workflows.data import Job
Expand All @@ -30,17 +30,44 @@
_OTEL_TRACES_ENDPOINT_ENV_VAR = "OTEL_TRACES_ENDPOINT"
_OTEL_EXPORT_INTERVAL_ENV_VAR = "OTEL_EXPORT_INTERVAL"

# the globally configured tilebox opentelemetry tracer provider.
# we explicitly avoid using the global opentelemetry tracer provider, because other libraries might also configure
# that (e.g. pytorch does sometimes), and we don't want to interfere with that.
# as default we don't use a proxy tracer provider, that only returns no-op tracers, because we still want to be able
# to extract trace_ids and spans, in case other runners / workflow clients have tracing configured.
# So instead we use a tracer provider without any exporters, which will still create traces and spans,
# but will not send them anywhere.
_tilebox_tracer_provider = TracerProvider()
_workflow_tracers = []


def _get_tilebox_tracer_provider() -> TracerProvider:
return _tilebox_tracer_provider


def _set_tilebox_tracer_provider(provider: TracerProvider) -> None:
global _tilebox_tracer_provider # noqa: PLW0603
_tilebox_tracer_provider = provider

for tracer in _workflow_tracers:
tracer._swap_provider(provider) # noqa: SLF001


class WorkflowTracer:
def __init__(self) -> None:
"""Instantiate a tracer from the currently configured global tracer provider."""
provider = get_tracer_provider()
if isinstance(provider, ProxyTracerProvider):
# if no tracer provider is configured, we still don't want to use the no-op tracer, because we
# want to be able to extract trace_ids and spans, in case other runners / workflow clients have tracing
# configured. So instead we use a tracer provider without any exporters, which will still create traces
# and spans, but will not send them anywhere.
provider = TracerProvider()
provider = _get_tilebox_tracer_provider()
self._tracer = provider.get_tracer(_INSTRUMENTATION_MODULE_NAME)

# keep track of all workflow tracers, to be able to update them in case the
# global tracer provider is replaced.
_workflow_tracers.append(self)

def _swap_provider(self, provider: TracerProvider) -> None:
"""
A callback function that get's invoked in case a new tracer provider is configured, to make sure
existing workflow tracers are updated to use the new provider.
"""
self._tracer = provider.get_tracer(_INSTRUMENTATION_MODULE_NAME)

# functools.wraps is a bit buggy with class methods, so we are not using it here
Expand All @@ -62,16 +89,11 @@ def get_trace_parent_of_current_span() -> str:
return carrier["traceparent"]


def _otel_tracer_provider(
service: str | Resource | None = None,
def _otel_span_exporter(
endpoint: str | None = None,
headers: dict[str, str] | None = None,
export_interval: timedelta | None = None,
) -> TracerProvider:
resource = _get_default_resource(service)

provider = TracerProvider(resource=resource)

) -> SpanProcessor:
if endpoint is None:
endpoint = os.environ.get(_OTEL_TRACES_ENDPOINT_ENV_VAR, None)
if endpoint is None:
Expand All @@ -94,11 +116,7 @@ def _otel_tracer_provider(
headers=headers,
)
schedule_delay = int(export_interval.total_seconds() * 1000) if export_interval is not None else None
batch_processor = BatchSpanProcessor(exporter, schedule_delay_millis=schedule_delay) # type: ignore[arg-type]

provider.add_span_processor(batch_processor)

return provider
return BatchSpanProcessor(exporter, schedule_delay_millis=schedule_delay) # type: ignore[arg-type]


class SpanEventLoggingHandler(logging.Handler):
Expand Down Expand Up @@ -156,8 +174,20 @@ def configure_otel_tracing(
Raises:
ValueError: If no endpoint is provided and no OTEL_TRACES_ENDPOINT environment variable is set.
"""
provider = _otel_tracer_provider(service, endpoint, headers, export_interval)
set_tracer_provider(provider)
provider = _get_tilebox_tracer_provider()
resource = _get_default_resource(service)
if provider.resource.attributes != resource.attributes:
# It's either the first time we configure tracing, or we are trying to reconfigure it with a different resource.
# That means we need to create a new provider.
provider = TracerProvider(
resource=resource,
# keep the existing span processor, so that all previously configured exports are still used as well
active_span_processor=provider._active_span_processor, # noqa: SLF001
)
_set_tilebox_tracer_provider(provider)

exporter = _otel_span_exporter(endpoint, headers, export_interval)
provider.add_span_processor(exporter)

# if we configure tracing, we also want to add log messages to active spans, which is a mixture of a logging
# tracing feature. But configure this here, because we anyways don't need to do this if tracing is not configured.
Expand Down
Loading