diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index faa0959fce..3c5deebfd5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -580,3 +580,16 @@ def get_meter( self._measurement_consumer, ) return self._meters[info] + + def add_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.export.MetricReader" + ) -> None: + with self._lock: + self._measurement_consumer.add_metric_reader(metric_reader) + + def remove_metric_reader( + self, + metric_reader: "opentelemetry.sdk.metrics.export.MetricReader", + ) -> None: + with self._lock: + self._measurement_consumer.remove_metric_reader(metric_reader) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index c651033051..658a4245c0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -15,6 +15,7 @@ # pylint: disable=unused-import from abc import ABC, abstractmethod +from logging import getLogger from threading import Lock from time import time_ns from typing import Iterable, List, Mapping, Optional @@ -31,6 +32,8 @@ ) from opentelemetry.sdk.metrics._internal.point import Metric +_logger = getLogger(__name__) + class MeasurementConsumer(ABC): @abstractmethod @@ -143,3 +146,39 @@ def collect( result = self._reader_storages[metric_reader].collect() return result + + def add_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" + ) -> None: + """Registers a new metric reader.""" + with self._lock: + if metric_reader in self._reader_storages: + _logger.warning("'%s' already registered!", metric_reader) + self._sdk_config.metric_readers += type( + self._sdk_config.metric_readers + )((metric_reader,)) + self._reader_storages[metric_reader] = MetricReaderStorage( + self._sdk_config, + metric_reader._instrument_class_temporality, + metric_reader._instrument_class_aggregation, + ) + metric_reader._set_collect_callback(self.collect) + + def remove_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" + ) -> None: + """Unregisters the given metric reader.""" + with self._lock: + if metric_reader not in self._reader_storages: + _logger.warning("'%s' has not been registered!", metric_reader) + self._reader_storages.pop(metric_reader, None) + metric_reader._set_collect_callback(None) + self._sdk_config.metric_readers = type( + self._sdk_config.metric_readers + )( + ( + reader + for reader in self._sdk_config.metric_readers + if reader is not metric_reader + ) + ) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 3991fd6e15..4cedcf407f 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -13,9 +13,8 @@ # limitations under the License. # pylint: disable=protected-access,no-self-use - import weakref -from logging import WARNING +from logging import DEBUG, WARNING from time import sleep from typing import Iterable, Sequence from unittest.mock import MagicMock, Mock, patch @@ -36,6 +35,7 @@ ) from opentelemetry.sdk.metrics._internal import SynchronousMeasurementConsumer from opentelemetry.sdk.metrics.export import ( + InMemoryMetricReader, Metric, MetricExporter, MetricExportResult, @@ -426,6 +426,40 @@ def test_consume_measurement_gauge(self, mock_sync_measurement_consumer): sync_consumer_instance.consume_measurement.assert_called() + def test_addition_of_metric_reader(self): + # Suppress warnings for calling collect on an unregistered metric reader + with self.assertLogs( + "opentelemetry.sdk.metrics._internal.export", DEBUG + ): + reader = InMemoryMetricReader() + meter_provider = MeterProvider() + meter = meter_provider.get_meter(__name__) + counter = meter.create_counter("counter") + counter.add(1) + self.assertIsNone(reader.get_metrics_data()) + + meter_provider.add_metric_reader(reader) + counter.add(1) + self.assertIsNotNone(reader.get_metrics_data()) + + with self.assertLogs( + "opentelemetry.sdk.metrics._internal.measurement_consumer", + WARNING, + ) as logger: + meter_provider.add_metric_reader(reader) + self.assertIn("already registered!", logger.output[0]) + + meter_provider.remove_metric_reader(reader) + counter.add(1) + self.assertIsNone(reader.get_metrics_data()) + + with self.assertLogs( + "opentelemetry.sdk.metrics._internal.measurement_consumer", + WARNING, + ) as logger: + meter_provider.remove_metric_reader(reader) + self.assertIn("has not been registered!", logger.output[0]) + class TestMeter(TestCase): def setUp(self):