Skip to content

Commit 7eab50b

Browse files
committed
Add filters for minimum severity level and trace based sampling
1 parent 8101abd commit 7eab50b

File tree

3 files changed

+299
-0
lines changed

3 files changed

+299
-0
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
LogRecordDroppedAttributesWarning,
2222
LogRecordLimits,
2323
LogRecordProcessor,
24+
FilteringLogRecordProcessor,
2425
ReadableLogRecord,
2526
ReadWriteLogRecord,
2627
)
@@ -32,6 +33,7 @@
3233
"LogLimits",
3334
"LogRecordLimits",
3435
"LogRecordProcessor",
36+
"FilteringLogRecordProcessor",
3537
"LogDroppedAttributesWarning",
3638
"LogRecordDroppedAttributesWarning",
3739
"ReadableLogRecord",

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
from opentelemetry.trace import (
5656
format_span_id,
5757
format_trace_id,
58+
get_current_span,
5859
)
5960
from opentelemetry.util.types import AnyValue, _ExtendedAttributes
6061

@@ -317,6 +318,35 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
317318
"""
318319

319320

321+
class FilteringLogRecordProcessor(LogRecordProcessor):
322+
"""A processor that drops records based on minimum severity and/or trace based sampling parameters provided by the user."""
323+
324+
def __init__(
325+
self,
326+
log_record_processor: LogRecordProcessor,
327+
*,
328+
minimum_severity_level: SeverityNumber = SeverityNumber.UNSPECIFIED,
329+
enable_trace_based_sampling: bool = False,
330+
):
331+
self._log_record_processor = log_record_processor
332+
self._minimum_severity_level = minimum_severity_level
333+
self._enable_trace_based_sampling = enable_trace_based_sampling
334+
335+
def on_emit(self, log_record: ReadWriteLogRecord):
336+
record = log_record.log_record
337+
if is_less_than_min_severity(record, self._minimum_severity_level):
338+
return
339+
if should_drop_logs_for_trace_based(record, self._enable_trace_based_sampling):
340+
return
341+
self._log_record_processor.on_emit(log_record)
342+
343+
def shutdown(self):
344+
self._log_record_processor.shutdown()
345+
346+
def force_flush(self, timeout_millis: int = 30000) -> bool:
347+
return self._log_record_processor.force_flush(timeout_millis)
348+
349+
320350
# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved
321351
# pylint:disable=no-member
322352
class SynchronousMultiLogRecordProcessor(LogRecordProcessor):
@@ -835,3 +865,29 @@ def std_to_otel(levelno: int) -> SeverityNumber:
835865
if levelno > 53:
836866
return SeverityNumber.FATAL4
837867
return _STD_TO_OTEL[levelno]
868+
869+
870+
def is_less_than_minimum_severity_level(
871+
record: LogRecord, minimum_severity_level: SeverityNumber
872+
) -> bool:
873+
if record.severity_number is not None:
874+
if (
875+
minimum_severity_level is not None
876+
and minimum_severity_level != SeverityNumber.UNSPECIFIED
877+
and record.severity_number.value < minimum_severity_level.value
878+
):
879+
return True
880+
return False
881+
882+
883+
def should_drop_logs_for_trace_based_sampling(
884+
record: LogRecord, enable_trace_based_sampling: bool
885+
) -> bool:
886+
if enable_trace_based_sampling:
887+
if record.context is not None:
888+
span = get_current_span(record.context)
889+
span_context = span.get_span_context()
890+
if span_context.is_valid and not span_context.trace_flags.sampled:
891+
return True
892+
return False
893+

opentelemetry-sdk/tests/logs/test_logs.py

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from opentelemetry.sdk._logs._internal import (
2828
NoOpLogger,
2929
SynchronousMultiLogRecordProcessor,
30+
FilteringLogRecordProcessor,
3031
)
3132
from opentelemetry.sdk.environment_variables import OTEL_SDK_DISABLED
3233
from opentelemetry.sdk.resources import Resource
@@ -214,3 +215,243 @@ def test_can_emit_with_keywords_arguments(self):
214215
self.assertEqual(result_log_record.attributes, {"some": "attributes"})
215216
self.assertEqual(result_log_record.event_name, "event_name")
216217
self.assertEqual(log_data.resource, logger.resource)
218+
219+
def test_emit_logrecord_with_min_severity_filtering(self):
220+
logger_processor_mock = Mock()
221+
filtering_processor = FilteringLogRecordProcessor(
222+
logger_processor_mock,
223+
minimum_severity_level=SeverityNumber.DEBUG4,
224+
)
225+
logger, _ = self._get_logger(filtering_processor)
226+
227+
log_record_info = LogRecord(
228+
observed_timestamp=0,
229+
body="info log line",
230+
severity_number=SeverityNumber.DEBUG,
231+
severity_text="DEBUG",
232+
)
233+
234+
logger.emit(log_record_info)
235+
logger_processor_mock.on_emit.assert_not_called()
236+
237+
logger_processor_mock.reset_mock()
238+
239+
log_record_error = LogRecord(
240+
observed_timestamp=0,
241+
body="error log line",
242+
severity_number=SeverityNumber.ERROR,
243+
severity_text="ERROR",
244+
)
245+
246+
logger.emit(log_record_error)
247+
248+
logger_processor_mock.on_emit.assert_called_once()
249+
log_data = logger_processor_mock.on_emit.call_args.args[0]
250+
self.assertTrue(isinstance(log_data.log_record, LogRecord))
251+
self.assertEqual(log_data.log_record.severity_number, SeverityNumber.ERROR)
252+
253+
def test_emit_logrecord_with_min_severity_unspecified(self):
254+
logger_processor_mock = Mock()
255+
filtering_processor = FilteringLogRecordProcessor(
256+
logger_processor_mock,
257+
minimum_severity_level=SeverityNumber.UNSPECIFIED,
258+
)
259+
logger, _ = self._get_logger(filtering_processor)
260+
log_record = LogRecord(
261+
observed_timestamp=0,
262+
body="debug log line",
263+
severity_number=SeverityNumber.DEBUG,
264+
severity_text="DEBUG",
265+
)
266+
logger.emit(log_record)
267+
logger_processor_mock.on_emit.assert_called_once()
268+
269+
def test_emit_logrecord_with_trace_based_filtering(self):
270+
logger_processor_mock = Mock()
271+
filtering_processor = FilteringLogRecordProcessor(
272+
logger_processor_mock,
273+
enable_trace_based_sampling=True,
274+
)
275+
logger, _ = self._get_logger(filtering_processor)
276+
277+
mock_span_context = Mock()
278+
mock_span_context.is_valid = True
279+
mock_span_context.trace_flags.sampled = False
280+
281+
mock_span = Mock()
282+
mock_span.get_span_context.return_value = mock_span_context
283+
284+
mock_context = Mock()
285+
286+
with patch(
287+
"opentelemetry.sdk._logs._internal.get_current_span",
288+
return_value=mock_span,
289+
):
290+
log_record = LogRecord(
291+
observed_timestamp=0,
292+
body="should be dropped",
293+
severity_number=SeverityNumber.INFO,
294+
severity_text="INFO",
295+
context=mock_context,
296+
)
297+
298+
logger.emit(log_record)
299+
logger_processor_mock.on_emit.assert_not_called()
300+
301+
logger_processor_mock.reset_mock()
302+
303+
mock_span_context = Mock()
304+
mock_span_context.is_valid = True
305+
mock_span_context.trace_flags.sampled = True
306+
307+
mock_span = Mock()
308+
mock_span.get_span_context.return_value = mock_span_context
309+
310+
with patch(
311+
"opentelemetry.sdk._logs._internal.get_current_span",
312+
return_value=mock_span,
313+
):
314+
log_record = LogRecord(
315+
observed_timestamp=0,
316+
body="should pass",
317+
severity_number=SeverityNumber.INFO,
318+
severity_text="INFO",
319+
context=mock_context,
320+
)
321+
322+
logger.emit(log_record)
323+
logger_processor_mock.on_emit.assert_called_once()
324+
325+
def test_emit_logrecord_trace_filtering_disabled(self):
326+
logger_processor_mock = Mock()
327+
filtering_processor = FilteringLogRecordProcessor(
328+
logger_processor_mock,
329+
enable_trace_based_sampling=False,
330+
)
331+
logger, _ = self._get_logger(filtering_processor)
332+
333+
mock_span_context = Mock()
334+
mock_span_context.is_valid = False
335+
mock_span_context.trace_flags.sampled = False
336+
337+
mock_span = Mock()
338+
mock_span.get_span_context.return_value = mock_span_context
339+
340+
mock_context = Mock()
341+
342+
with patch(
343+
"opentelemetry.sdk._logs._internal.get_current_span",
344+
return_value=mock_span,
345+
):
346+
log_record = LogRecord(
347+
observed_timestamp=0,
348+
body="should be emitted when filtering disabled",
349+
severity_number=SeverityNumber.INFO,
350+
severity_text="INFO",
351+
context=mock_context,
352+
)
353+
354+
logger.emit(log_record)
355+
logger_processor_mock.on_emit.assert_called_once()
356+
357+
def test_emit_logrecord_trace_filtering_edge_cases(self):
358+
logger_processor_mock = Mock()
359+
filtering_processor = FilteringLogRecordProcessor(
360+
logger_processor_mock,
361+
enable_trace_based_sampling=True,
362+
)
363+
logger, _ = self._get_logger(filtering_processor)
364+
365+
mock_span_context = Mock()
366+
mock_span_context.is_valid = False
367+
mock_span_context.trace_flags.sampled = True
368+
369+
mock_span = Mock()
370+
mock_span.get_span_context.return_value = mock_span_context
371+
372+
mock_context = Mock()
373+
374+
with patch(
375+
"opentelemetry.sdk._logs._internal.get_current_span",
376+
return_value=mock_span,
377+
):
378+
log_record = LogRecord(
379+
observed_timestamp=0,
380+
body="invalid but sampled",
381+
severity_number=SeverityNumber.INFO,
382+
severity_text="INFO",
383+
context=mock_context,
384+
)
385+
386+
logger.emit(log_record)
387+
logger_processor_mock.on_emit.assert_called_once()
388+
389+
logger_processor_mock.reset_mock()
390+
391+
mock_span_context = Mock()
392+
mock_span_context.is_valid = True
393+
mock_span_context.trace_flags.sampled = False
394+
395+
mock_span = Mock()
396+
mock_span.get_span_context.return_value = mock_span_context
397+
398+
with patch(
399+
"opentelemetry.sdk._logs._internal.get_current_span",
400+
return_value=mock_span,
401+
):
402+
log_record = LogRecord(
403+
observed_timestamp=0,
404+
body="valid but not sampled",
405+
severity_number=SeverityNumber.INFO,
406+
severity_text="INFO",
407+
context=mock_context,
408+
)
409+
410+
logger.emit(log_record)
411+
logger_processor_mock.on_emit.assert_not_called()
412+
413+
def test_emit_both_min_severity_and_trace_based_filtering(self):
414+
logger_processor_mock = Mock()
415+
filtering_processor = FilteringLogRecordProcessor(
416+
logger_processor_mock,
417+
minimum_severity_level=SeverityNumber.WARN,
418+
enable_trace_based_sampling=True,
419+
)
420+
logger, _ = self._get_logger(filtering_processor)
421+
422+
mock_span_context = Mock()
423+
mock_span_context.is_valid = True
424+
mock_span_context.trace_flags.sampled = True
425+
426+
mock_span = Mock()
427+
mock_span.get_span_context.return_value = mock_span_context
428+
429+
mock_context = Mock()
430+
431+
with patch(
432+
"opentelemetry.sdk._logs._internal.get_current_span",
433+
return_value=mock_span,
434+
):
435+
log_record_info = LogRecord(
436+
observed_timestamp=0,
437+
body="info log line",
438+
severity_number=SeverityNumber.INFO,
439+
severity_text="INFO",
440+
context=mock_context,
441+
)
442+
443+
logger.emit(log_record_info)
444+
logger_processor_mock.on_emit.assert_not_called()
445+
446+
logger_processor_mock.reset_mock()
447+
448+
log_record_error = LogRecord(
449+
observed_timestamp=0,
450+
body="error log line",
451+
severity_number=SeverityNumber.ERROR,
452+
severity_text="ERROR",
453+
context=mock_context,
454+
)
455+
456+
logger.emit(log_record_error)
457+
logger_processor_mock.on_emit.assert_called_once()

0 commit comments

Comments
 (0)