diff --git a/CHANGELOG.md b/CHANGELOG.md index e3091794e0..1e96198049 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add `minimum_severity_level` and `trace_based_sampling` logger parameters to filter logs + ([#4765](https://github.com/open-telemetry/opentelemetry-python/pull/4765)) - `opentelemetry-sdk`: Fix the type hint of the `_metrics_data` property to allow `None` ([#4837](https://github.com/open-telemetry/opentelemetry-python/pull/4837) - Regenerate opentelemetry-proto code with v1.9.0 release diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py index ec0b3dfb23..777eae1467 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. from opentelemetry.sdk._logs._internal import ( + FilteringLogRecordProcessor, LogDroppedAttributesWarning, Logger, LoggerProvider, @@ -32,6 +33,7 @@ "LogLimits", "LogRecordLimits", "LogRecordProcessor", + "FilteringLogRecordProcessor", "LogDroppedAttributesWarning", "LogRecordDroppedAttributesWarning", "ReadableLogRecord", diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py index d775dd4455..1b98e5d771 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py @@ -55,6 +55,7 @@ from opentelemetry.trace import ( format_span_id, format_trace_id, + get_current_span, ) from opentelemetry.util.types import AnyValue, _ExtendedAttributes @@ -317,6 +318,39 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: """ +class FilteringLogRecordProcessor(LogRecordProcessor): + """A processor that drops records based on minimum severity and/or trace based sampling parameters provided by the user.""" + + def __init__( + self, + log_record_processor: LogRecordProcessor, + *, + minimum_severity_level: SeverityNumber = SeverityNumber.UNSPECIFIED, + enable_trace_based_sampling: bool = False, + ): + self._log_record_processor = log_record_processor + self._minimum_severity_level = minimum_severity_level + self._enable_trace_based_sampling = enable_trace_based_sampling + + def on_emit(self, log_record: ReadWriteLogRecord): + record = log_record.log_record + if is_less_than_minimum_severity_level( + record, self._minimum_severity_level + ): + return + if should_drop_logs_for_trace_based_sampling( + record, self._enable_trace_based_sampling + ): + return + self._log_record_processor.on_emit(log_record) + + def shutdown(self): + self._log_record_processor.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return self._log_record_processor.force_flush(timeout_millis) + + # Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved # pylint:disable=no-member class SynchronousMultiLogRecordProcessor(LogRecordProcessor): @@ -835,3 +869,41 @@ def std_to_otel(levelno: int) -> SeverityNumber: if levelno > 53: return SeverityNumber.FATAL4 return _STD_TO_OTEL[levelno] + + +def is_less_than_minimum_severity_level( + record: LogRecord, minimum_severity_level: SeverityNumber +) -> bool: + """Checks if the log record's severity number is less than the minimum severity level. + + :return: True if the log record's severity number is less than the minimum + severity level, False otherwise. Log records with an unspecified severity (i.e. `0`) + are not affected by this parameter and therefore bypass minimum severity filtering. + """ + if record.severity_number is not None: + if ( + minimum_severity_level is not None + and minimum_severity_level != SeverityNumber.UNSPECIFIED + and record.severity_number.value < minimum_severity_level.value + ): + return True + return False + + +def should_drop_logs_for_trace_based_sampling( + record: LogRecord, enable_trace_based_sampling: bool +) -> bool: + """Determines whether the logger should drop log records associated with unsampled traces. + + If `enable_trace_based_sampling` is `true`, log records associated with unsampled traces are dropped by the `Logger`. + A log record is considered associated with an unsampled trace if it has a valid `SpanId` and its + `TraceFlags` indicate that the trace is unsampled. A log record that isn't associated with a trace + context is not affected by this parameter and therefore bypasses trace-based filtering. + """ + if enable_trace_based_sampling: + if record.context is not None: + span = get_current_span(record.context) + span_context = span.get_span_context() + if span_context.is_valid and not span_context.trace_flags.sampled: + return True + return False diff --git a/opentelemetry-sdk/tests/logs/test_logs.py b/opentelemetry-sdk/tests/logs/test_logs.py index 70811260ae..56491873db 100644 --- a/opentelemetry-sdk/tests/logs/test_logs.py +++ b/opentelemetry-sdk/tests/logs/test_logs.py @@ -25,6 +25,7 @@ ReadableLogRecord, ) from opentelemetry.sdk._logs._internal import ( + FilteringLogRecordProcessor, NoOpLogger, SynchronousMultiLogRecordProcessor, ) @@ -214,3 +215,248 @@ def test_can_emit_with_keywords_arguments(self): self.assertEqual(result_log_record.attributes, {"some": "attributes"}) self.assertEqual(result_log_record.event_name, "event_name") self.assertEqual(log_data.resource, logger.resource) + + +class TestFilteringLogRecordProcessor(unittest.TestCase): + @staticmethod + def _build_logger( + minimum_severity_level: SeverityNumber = SeverityNumber.UNSPECIFIED, + enable_trace_based_sampling: bool = False, + ): + processor_mock = Mock() + filtering_processor = FilteringLogRecordProcessor( + processor_mock, + minimum_severity_level=minimum_severity_level, + enable_trace_based_sampling=enable_trace_based_sampling, + ) + provider = LoggerProvider(resource=Resource.create({})) + provider.add_log_record_processor(filtering_processor) + logger = provider.get_logger( + "name", + version="version", + schema_url="schema_url", + attributes={"an": "attribute"}, + ) + return logger, processor_mock + + def test_emit_logrecord_with_minimum_severity_level_filtering(self): + logger, processor_mock = self._build_logger( + minimum_severity_level=SeverityNumber.DEBUG4 + ) + + log_record_info = LogRecord( + observed_timestamp=0, + body="info log line", + severity_number=SeverityNumber.DEBUG, + severity_text="DEBUG", + ) + + logger.emit(log_record_info) + processor_mock.on_emit.assert_not_called() + + processor_mock.reset_mock() + + log_record_error = LogRecord( + observed_timestamp=0, + body="error log line", + severity_number=SeverityNumber.ERROR, + severity_text="ERROR", + ) + + logger.emit(log_record_error) + + processor_mock.on_emit.assert_called_once() + log_data = processor_mock.on_emit.call_args.args[0] + self.assertTrue(isinstance(log_data.log_record, LogRecord)) + self.assertEqual( + log_data.log_record.severity_number, SeverityNumber.ERROR + ) + + def test_emit_logrecord_with_minimum_severity_level_unspecified(self): + logger, processor_mock = self._build_logger() + log_record = LogRecord( + observed_timestamp=0, + body="debug log line", + severity_number=SeverityNumber.DEBUG, + severity_text="DEBUG", + ) + logger.emit(log_record) + processor_mock.on_emit.assert_called_once() + + def test_emit_logrecord_with_trace_based_filtering(self): + logger, processor_mock = self._build_logger( + enable_trace_based_sampling=True + ) + + mock_span_context = Mock() + mock_span_context.is_valid = True + mock_span_context.trace_flags.sampled = False + + mock_span = Mock() + mock_span.get_span_context.return_value = mock_span_context + + mock_context = Mock() + + with patch( + "opentelemetry.sdk._logs._internal.get_current_span", + return_value=mock_span, + ): + log_record = LogRecord( + observed_timestamp=0, + body="should be dropped", + severity_number=SeverityNumber.INFO, + severity_text="INFO", + context=mock_context, + ) + + logger.emit(log_record) + processor_mock.on_emit.assert_not_called() + + processor_mock.reset_mock() + + mock_span_context = Mock() + mock_span_context.is_valid = True + mock_span_context.trace_flags.sampled = True + + mock_span = Mock() + mock_span.get_span_context.return_value = mock_span_context + + with patch( + "opentelemetry.sdk._logs._internal.get_current_span", + return_value=mock_span, + ): + log_record = LogRecord( + observed_timestamp=0, + body="should pass", + severity_number=SeverityNumber.INFO, + severity_text="INFO", + context=mock_context, + ) + + logger.emit(log_record) + processor_mock.on_emit.assert_called_once() + + def test_emit_logrecord_trace_based_filtering_disabled(self): + logger, processor_mock = self._build_logger( + enable_trace_based_sampling=False + ) + + mock_span_context = Mock() + mock_span_context.is_valid = False + mock_span_context.trace_flags.sampled = False + + mock_span = Mock() + mock_span.get_span_context.return_value = mock_span_context + + mock_context = Mock() + + with patch( + "opentelemetry.sdk._logs._internal.get_current_span", + return_value=mock_span, + ): + log_record = LogRecord( + observed_timestamp=0, + body="should be emitted when filtering disabled", + severity_number=SeverityNumber.INFO, + severity_text="INFO", + context=mock_context, + ) + + logger.emit(log_record) + processor_mock.on_emit.assert_called_once() + + def test_emit_logrecord_trace_based_filtering_edge_cases(self): + logger, processor_mock = self._build_logger( + enable_trace_based_sampling=True + ) + + mock_span_context = Mock() + mock_span_context.is_valid = False + mock_span_context.trace_flags.sampled = True + + mock_span = Mock() + mock_span.get_span_context.return_value = mock_span_context + + mock_context = Mock() + + with patch( + "opentelemetry.sdk._logs._internal.get_current_span", + return_value=mock_span, + ): + log_record = LogRecord( + observed_timestamp=0, + body="invalid but sampled", + severity_number=SeverityNumber.INFO, + severity_text="INFO", + context=mock_context, + ) + + logger.emit(log_record) + processor_mock.on_emit.assert_called_once() + + processor_mock.reset_mock() + + mock_span_context = Mock() + mock_span_context.is_valid = True + mock_span_context.trace_flags.sampled = False + + mock_span = Mock() + mock_span.get_span_context.return_value = mock_span_context + + with patch( + "opentelemetry.sdk._logs._internal.get_current_span", + return_value=mock_span, + ): + log_record = LogRecord( + observed_timestamp=0, + body="valid but not sampled", + severity_number=SeverityNumber.INFO, + severity_text="INFO", + context=mock_context, + ) + + logger.emit(log_record) + processor_mock.on_emit.assert_not_called() + + def test_emit_both_minimum_severity_level_and_trace_based_filtering(self): + logger, processor_mock = self._build_logger( + minimum_severity_level=SeverityNumber.WARN, + enable_trace_based_sampling=True, + ) + + mock_span_context = Mock() + mock_span_context.is_valid = True + mock_span_context.trace_flags.sampled = True + + mock_span = Mock() + mock_span.get_span_context.return_value = mock_span_context + + mock_context = Mock() + + with patch( + "opentelemetry.sdk._logs._internal.get_current_span", + return_value=mock_span, + ): + log_record_info = LogRecord( + observed_timestamp=0, + body="info log line", + severity_number=SeverityNumber.INFO, + severity_text="INFO", + context=mock_context, + ) + + logger.emit(log_record_info) + processor_mock.on_emit.assert_not_called() + + processor_mock.reset_mock() + + log_record_error = LogRecord( + observed_timestamp=0, + body="error log line", + severity_number=SeverityNumber.ERROR, + severity_text="ERROR", + context=mock_context, + ) + + logger.emit(log_record_error) + processor_mock.on_emit.assert_called_once()