From 81ca3f7783d51d1e663af63d45ccf9f91afb5672 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Fri, 8 May 2026 18:03:16 -0600 Subject: [PATCH 1/3] [kafka_actions] Add plugin architecture for formats and compression codecs Introduces a pluggable interface for message format handlers and app-level payload compression codecs, discovered via the 'datadog_kafka_actions.formats' and 'datadog_kafka_actions.compressions' entry-point groups. Existing built-in formats (json, string, raw, bson, avro, protobuf) are now first-class plugins registered by the wheel itself. New 'value_compression' and 'key_compression' config keys decompress payloads before deserialization. No compression codecs ship in core; install a plugin wheel (e.g. datadog-kafka-deserializers from integrations-extras) to add them. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../changelog.d/plugin-architecture.added | 1 + .../datadog_checks/kafka_actions/__about__.py | 2 +- .../datadog_checks/kafka_actions/check.py | 2 + .../kafka_actions/compression/__init__.py | 20 + .../kafka_actions/compression/base.py | 19 + .../kafka_actions/compression/registry.py | 64 ++ .../kafka_actions/formats/__init__.py | 18 + .../kafka_actions/formats/base.py | 42 ++ .../kafka_actions/formats/builtins.py | 263 ++++++++ .../kafka_actions/formats/registry.py | 69 ++ .../kafka_actions/message_deserializer.py | 637 ++++-------------- kafka_actions/pyproject.toml | 8 + 12 files changed, 634 insertions(+), 511 deletions(-) create mode 100644 kafka_actions/changelog.d/plugin-architecture.added create mode 100644 kafka_actions/datadog_checks/kafka_actions/compression/__init__.py create mode 100644 kafka_actions/datadog_checks/kafka_actions/compression/base.py create mode 100644 kafka_actions/datadog_checks/kafka_actions/compression/registry.py create mode 100644 kafka_actions/datadog_checks/kafka_actions/formats/__init__.py create mode 100644 kafka_actions/datadog_checks/kafka_actions/formats/base.py create mode 100644 kafka_actions/datadog_checks/kafka_actions/formats/builtins.py create mode 100644 kafka_actions/datadog_checks/kafka_actions/formats/registry.py diff --git a/kafka_actions/changelog.d/plugin-architecture.added b/kafka_actions/changelog.d/plugin-architecture.added new file mode 100644 index 0000000000000..796f29374cd36 --- /dev/null +++ b/kafka_actions/changelog.d/plugin-architecture.added @@ -0,0 +1 @@ +Add a plugin architecture for message format handlers and payload-compression codecs. Format handlers can be registered via the `datadog_kafka_actions.formats` entry-point group, and compression codecs via `datadog_kafka_actions.compressions`. Built-in formats (json, string, raw, bson, avro, protobuf) are now first-class plugins. New `value_compression` and `key_compression` config keys decompress payloads before deserialization. No compression codecs ship in core — install a plugin wheel to add them. diff --git a/kafka_actions/datadog_checks/kafka_actions/__about__.py b/kafka_actions/datadog_checks/kafka_actions/__about__.py index 0657a9824a298..d72f3df63c920 100644 --- a/kafka_actions/datadog_checks/kafka_actions/__about__.py +++ b/kafka_actions/datadog_checks/kafka_actions/__about__.py @@ -1,4 +1,4 @@ # (C) Datadog, Inc. 2025-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) -__version__ = '2.6.0' +__version__ = '2.7.0' diff --git a/kafka_actions/datadog_checks/kafka_actions/check.py b/kafka_actions/datadog_checks/kafka_actions/check.py index 36749930be796..f60a68a3753eb 100644 --- a/kafka_actions/datadog_checks/kafka_actions/check.py +++ b/kafka_actions/datadog_checks/kafka_actions/check.py @@ -280,6 +280,8 @@ def _action_read_messages(self): 'key_schema': config.get('key_schema'), 'key_uses_schema_registry': config.get('key_uses_schema_registry', False), 'key_skip_bytes': config.get('key_skip_bytes', 0), + 'value_compression': config.get('value_compression'), + 'key_compression': config.get('key_compression'), } self.log.debug( diff --git a/kafka_actions/datadog_checks/kafka_actions/compression/__init__.py b/kafka_actions/datadog_checks/kafka_actions/compression/__init__.py new file mode 100644 index 0000000000000..be9f92f79aa05 --- /dev/null +++ b/kafka_actions/datadog_checks/kafka_actions/compression/__init__.py @@ -0,0 +1,20 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""Compression codec registry for kafka_actions. + +Some producers compress message payloads at the application layer (before +handing bytes to the Kafka producer) using a variety of algorithms, separate +from the broker-negotiated ``compression.type`` setting. This module exposes +a pluggable codec interface so consumers can decompress those payloads +before deserialization. + +No codecs ship in the core wheel — install a plugin wheel that registers +codecs on the ``datadog_kafka_actions.compressions`` entry-point group, or +register them directly via :func:`register_codec` in tests. +""" + +from .base import CompressionCodec +from .registry import get_codec, list_codecs, register_codec + +__all__ = ['CompressionCodec', 'get_codec', 'list_codecs', 'register_codec'] diff --git a/kafka_actions/datadog_checks/kafka_actions/compression/base.py b/kafka_actions/datadog_checks/kafka_actions/compression/base.py new file mode 100644 index 0000000000000..23a580fc5cca6 --- /dev/null +++ b/kafka_actions/datadog_checks/kafka_actions/compression/base.py @@ -0,0 +1,19 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""Base class for app-level payload compression codecs.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod + + +class CompressionCodec(ABC): + """Plug-in interface for app-level payload decompression.""" + + name: str = '' + + @abstractmethod + def decompress(self, data: bytes) -> bytes: + """Return the uncompressed payload bytes.""" + raise NotImplementedError diff --git a/kafka_actions/datadog_checks/kafka_actions/compression/registry.py b/kafka_actions/datadog_checks/kafka_actions/compression/registry.py new file mode 100644 index 0000000000000..13ca013f10c22 --- /dev/null +++ b/kafka_actions/datadog_checks/kafka_actions/compression/registry.py @@ -0,0 +1,64 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""Lazy registry of compression codecs.""" + +from __future__ import annotations + +import logging +from importlib.metadata import entry_points +from threading import Lock + +from .base import CompressionCodec + +_LOG = logging.getLogger(__name__) +_ENTRY_POINT_GROUP = 'datadog_kafka_actions.compressions' + +_lock = Lock() +_codecs: dict[str, CompressionCodec] = {} +_loaded = False + + +def register_codec(codec: CompressionCodec) -> None: + if not codec.name: + raise ValueError(f"CompressionCodec {type(codec).__name__} has no name set") + with _lock: + _codecs[codec.name] = codec + + +def _load_entry_points() -> None: + global _loaded + if _loaded: + return + with _lock: + if _loaded: + return + try: + eps = entry_points(group=_ENTRY_POINT_GROUP) + except TypeError: # pragma: no cover + eps = entry_points().get(_ENTRY_POINT_GROUP, []) + for ep in eps: + if ep.name in _codecs: + continue + try: + cls = ep.load() + instance = cls() if isinstance(cls, type) else cls + if not isinstance(instance, CompressionCodec): + _LOG.warning("Entry point %s did not produce a CompressionCodec", ep.name) + continue + if not instance.name: + instance.name = ep.name + _codecs[instance.name] = instance + except Exception as e: + _LOG.warning("Failed to load compression codec '%s': %s", ep.name, e) + _loaded = True + + +def get_codec(name: str) -> CompressionCodec | None: + _load_entry_points() + return _codecs.get(name) + + +def list_codecs() -> list[str]: + _load_entry_points() + return sorted(_codecs) diff --git a/kafka_actions/datadog_checks/kafka_actions/formats/__init__.py b/kafka_actions/datadog_checks/kafka_actions/formats/__init__.py new file mode 100644 index 0000000000000..49fc302239f69 --- /dev/null +++ b/kafka_actions/datadog_checks/kafka_actions/formats/__init__.py @@ -0,0 +1,18 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""Format handler registry for kafka_actions. + +External wheels can register additional handlers by exposing them on the +``datadog_kafka_actions.formats`` entry-point group: + + [project.entry-points."datadog_kafka_actions.formats"] + myformat = "my_pkg.handler:MyHandler" + +Handlers must subclass :class:`FormatHandler` from ``base``. +""" + +from .base import FormatHandler +from .registry import get_handler, list_handlers, register_handler + +__all__ = ['FormatHandler', 'get_handler', 'list_handlers', 'register_handler'] diff --git a/kafka_actions/datadog_checks/kafka_actions/formats/base.py b/kafka_actions/datadog_checks/kafka_actions/formats/base.py new file mode 100644 index 0000000000000..1e7f99e2f7c2b --- /dev/null +++ b/kafka_actions/datadog_checks/kafka_actions/formats/base.py @@ -0,0 +1,42 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""Base class for kafka_actions format handlers.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any + + +class FormatHandler(ABC): + """Plug-in interface for message-body deserialization. + + Subclasses are instantiated once and reused across messages, so they + should be stateless or maintain only thread-safe caches. + """ + + name: str = '' + + def build_schema(self, schema_str: str) -> Any: + """Build a schema object from an inline (config-supplied) schema string. + + Override for formats that need a parsed schema (e.g. Avro, Protobuf). + Schemaless formats (json, msgpack, raw) can leave the default. + """ + return None + + def build_schema_from_registry(self, schema_str: str, dep_schemas: list) -> Any: + """Build a schema object from registry-supplied bytes. + + ``dep_schemas`` is a list of ``(name, base64_bytes)`` tuples for + dependencies (e.g. imported .proto files). + + Defaults to :meth:`build_schema` for formats that don't distinguish. + """ + return self.build_schema(schema_str) + + @abstractmethod + def deserialize(self, message: bytes, schema: Any, *, log, uses_schema_registry: bool) -> str | None: + """Decode ``message`` and return a JSON string (or None for empty messages).""" + raise NotImplementedError diff --git a/kafka_actions/datadog_checks/kafka_actions/formats/builtins.py b/kafka_actions/datadog_checks/kafka_actions/formats/builtins.py new file mode 100644 index 0000000000000..b27c86d982a66 --- /dev/null +++ b/kafka_actions/datadog_checks/kafka_actions/formats/builtins.py @@ -0,0 +1,263 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""Built-in format handlers, registered via entry points in pyproject.toml.""" + +from __future__ import annotations + +import base64 +import datetime +import decimal +import json +import uuid +from io import BytesIO + +from .base import FormatHandler + + +class JsonHandler(FormatHandler): + name = 'json' + + def deserialize(self, message, schema, *, log, uses_schema_registry): + if not message: + return None + decoded = message.decode('utf-8').strip() + if not decoded: + return None + json.loads(decoded) + return decoded + + +class StringHandler(FormatHandler): + name = 'string' + + def deserialize(self, message, schema, *, log, uses_schema_registry): + if not message: + return None + decoded = message.decode('utf-8') + if not decoded: + return None + return json.dumps(decoded) + + +class RawHandler(FormatHandler): + name = 'raw' + + def deserialize(self, message, schema, *, log, uses_schema_registry): + if not message: + return None + return json.dumps(base64.b64encode(message).decode('ascii')) + + +class BsonHandler(FormatHandler): + name = 'bson' + + def deserialize(self, message, schema, *, log, uses_schema_registry): + if not message: + return None + from bson import decode as bson_decode + from bson.json_util import dumps as bson_dumps + + try: + return bson_dumps(bson_decode(message)) + except Exception as e: + raise ValueError(f"Failed to deserialize BSON message: {e}") + + +class _AvroJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, decimal.Decimal): + return str(obj) + if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): + return obj.isoformat() + if isinstance(obj, uuid.UUID): + return str(obj) + if isinstance(obj, bytes): + return base64.b64encode(obj).decode('ascii') + return super().default(obj) + + +class AvroHandler(FormatHandler): + name = 'avro' + + def build_schema(self, schema_str): + schema = json.loads(schema_str) + if schema is None: + raise ValueError("Avro schema cannot be None") + return schema + + def build_schema_from_registry(self, schema_str, dep_schemas): + return self.build_schema(schema_str) + + def deserialize(self, message, schema, *, log, uses_schema_registry): + if not message: + return None + if schema is None: + raise ValueError("Avro schema is required") + from fastavro import schemaless_reader + + try: + bio = BytesIO(message) + initial = bio.tell() + data = schemaless_reader(bio, schema) + bytes_read = bio.tell() - initial + if bytes_read != len(message): + raise ValueError( + f"Not all bytes were consumed during Avro decoding! " + f"Read {bytes_read} bytes, but message has {len(message)} bytes." + ) + return json.dumps(data, cls=_AvroJSONEncoder) + except Exception as e: + raise ValueError(f"Failed to deserialize Avro message: {e}") + + +def _read_varint(data): + shift = 0 + result = 0 + bytes_read = 0 + for byte in data: + bytes_read += 1 + result |= (byte & 0x7F) << shift + if (byte & 0x80) == 0: + return result, bytes_read + shift += 7 + raise ValueError("Incomplete varint") + + +def _read_protobuf_message_indices(payload): + array_len, bytes_read = _read_varint(payload) + payload = payload[bytes_read:] + indices = [] + for _ in range(array_len): + index, bytes_read = _read_varint(payload) + indices.append(index) + payload = payload[bytes_read:] + return indices, payload + + +def _preload_well_known_types(pool): + from google.protobuf import ( + any_pb2, + api_pb2, + descriptor_pb2, + duration_pb2, + empty_pb2, + field_mask_pb2, + source_context_pb2, + struct_pb2, + timestamp_pb2, + type_pb2, + wrappers_pb2, + ) + + modules = ( + any_pb2, + duration_pb2, + empty_pb2, + field_mask_pb2, + source_context_pb2, + struct_pb2, + timestamp_pb2, + wrappers_pb2, + type_pb2, + api_pb2, + ) + for module in modules: + file_name = module.DESCRIPTOR.name + try: + pool.FindFileByName(file_name) + continue + except KeyError: + pass + fd_proto = descriptor_pb2.FileDescriptorProto() + module.DESCRIPTOR.CopyToProto(fd_proto) + pool.Add(fd_proto) + + +def _get_protobuf_message_class(schema_info, message_indices): + from google.protobuf import message_factory + + pool, descriptor_set = schema_info + file_descriptor = descriptor_set.file[0] + message_descriptor_proto = file_descriptor.message_type[message_indices[0]] + package = file_descriptor.package + name_parts = [message_descriptor_proto.name] + + current_proto = message_descriptor_proto + for idx in message_indices[1:]: + current_proto = current_proto.nested_type[idx] + name_parts.append(current_proto.name) + + full_name = f"{package}.{'.'.join(name_parts)}" if package else '.'.join(name_parts) + message_descriptor = pool.FindMessageTypeByName(full_name) + return message_factory.GetMessageClass(message_descriptor) + + +class ProtobufHandler(FormatHandler): + name = 'protobuf' + + def build_schema(self, schema_str): + from google.protobuf import descriptor_pb2, descriptor_pool + + schema_bytes = base64.b64decode(schema_str) + descriptor_set = descriptor_pb2.FileDescriptorSet() + descriptor_set.ParseFromString(schema_bytes) + + pool = descriptor_pool.DescriptorPool() + _preload_well_known_types(pool) + for fd_proto in descriptor_set.file: + pool.Add(fd_proto) + return (pool, descriptor_set) + + def build_schema_from_registry(self, schema_str, dep_schemas): + from google.protobuf import descriptor_pb2, descriptor_pool + + pool = descriptor_pool.DescriptorPool() + _preload_well_known_types(pool) + descriptor_set = descriptor_pb2.FileDescriptorSet() + + for dep_name, dep_b64 in dep_schemas: + try: + pool.FindFileByName(dep_name) + continue + except KeyError: + pass + dep_bytes = base64.b64decode(dep_b64) + dep_proto = descriptor_pb2.FileDescriptorProto() + dep_proto.ParseFromString(dep_bytes) + dep_proto.name = dep_name + pool.Add(dep_proto) + + schema_bytes = base64.b64decode(schema_str) + fd_proto = descriptor_pb2.FileDescriptorProto() + fd_proto.ParseFromString(schema_bytes) + descriptor_set.file.append(fd_proto) + pool.Add(fd_proto) + return (pool, descriptor_set) + + def deserialize(self, message, schema, *, log, uses_schema_registry): + from google.protobuf.json_format import MessageToJson + + if not message: + return None + if schema is None: + raise ValueError("Protobuf schema is required") + try: + if uses_schema_registry: + message_indices, message = _read_protobuf_message_indices(message) + if not message_indices: + message_indices = [0] + else: + message_indices = [0] + + message_class = _get_protobuf_message_class(schema, message_indices) + instance = message_class() + consumed = instance.ParseFromString(message) + if consumed != len(message): + raise ValueError( + f"Not all bytes were consumed during Protobuf decoding! " + f"Read {consumed} bytes, but message has {len(message)} bytes." + ) + return MessageToJson(instance) + except Exception as e: + raise ValueError(f"Failed to deserialize Protobuf message: {e}") diff --git a/kafka_actions/datadog_checks/kafka_actions/formats/registry.py b/kafka_actions/datadog_checks/kafka_actions/formats/registry.py new file mode 100644 index 0000000000000..18a8f59ad023c --- /dev/null +++ b/kafka_actions/datadog_checks/kafka_actions/formats/registry.py @@ -0,0 +1,69 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""Lazy registry of format handlers, populated from entry points + direct registration.""" + +from __future__ import annotations + +import logging +from importlib.metadata import entry_points +from threading import Lock + +from .base import FormatHandler + +_LOG = logging.getLogger(__name__) +_ENTRY_POINT_GROUP = 'datadog_kafka_actions.formats' + +_lock = Lock() +_handlers: dict[str, FormatHandler] = {} +_loaded = False + + +def register_handler(handler: FormatHandler) -> None: + """Register a handler instance directly (bypasses entry points). + + Useful for tests and for environments where the wheel was not installed + via pip (e.g. running from a source checkout). + """ + if not handler.name: + raise ValueError(f"FormatHandler {type(handler).__name__} has no name set") + with _lock: + _handlers[handler.name] = handler + + +def _load_entry_points() -> None: + global _loaded + if _loaded: + return + with _lock: + if _loaded: + return + try: + eps = entry_points(group=_ENTRY_POINT_GROUP) + except TypeError: # pragma: no cover — older importlib.metadata + eps = entry_points().get(_ENTRY_POINT_GROUP, []) + for ep in eps: + if ep.name in _handlers: + continue + try: + cls = ep.load() + instance = cls() if isinstance(cls, type) else cls + if not isinstance(instance, FormatHandler): + _LOG.warning("Entry point %s did not produce a FormatHandler", ep.name) + continue + if not instance.name: + instance.name = ep.name + _handlers[instance.name] = instance + except Exception as e: + _LOG.warning("Failed to load format handler '%s': %s", ep.name, e) + _loaded = True + + +def get_handler(name: str) -> FormatHandler | None: + _load_entry_points() + return _handlers.get(name) + + +def list_handlers() -> list[str]: + _load_entry_points() + return sorted(_handlers) diff --git a/kafka_actions/datadog_checks/kafka_actions/message_deserializer.py b/kafka_actions/datadog_checks/kafka_actions/message_deserializer.py index fe09e8035745f..087bfb402cd4d 100644 --- a/kafka_actions/datadog_checks/kafka_actions/message_deserializer.py +++ b/kafka_actions/datadog_checks/kafka_actions/message_deserializer.py @@ -1,181 +1,61 @@ # (C) Datadog, Inc. 2025-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) -"""Message deserialization for Kafka messages.""" +"""Message deserialization for Kafka messages. + +Format handlers and compression codecs are pluggable via the +``datadog_kafka_actions.formats`` and ``datadog_kafka_actions.compressions`` +entry-point groups. This wheel ships built-in handlers for +json/string/raw/bson/avro/protobuf; compression codecs are provided by +plugin wheels (none ship in core). +""" + +from __future__ import annotations import base64 -import datetime -import decimal import hashlib import json -import uuid -from io import BytesIO - -from bson import decode as bson_decode -from bson.json_util import dumps as bson_dumps -from fastavro import schemaless_reader -from google.protobuf import ( - any_pb2, - api_pb2, - descriptor_pb2, - descriptor_pool, - duration_pb2, - empty_pb2, - field_mask_pb2, - message_factory, - source_context_pb2, - struct_pb2, - timestamp_pb2, - type_pb2, - wrappers_pb2, -) -from google.protobuf.json_format import MessageToJson - -SCHEMA_REGISTRY_MAGIC_BYTE = 0x00 - -_WELL_KNOWN_TYPE_MODULES = ( - any_pb2, - duration_pb2, - empty_pb2, - field_mask_pb2, - source_context_pb2, - struct_pb2, - timestamp_pb2, - wrappers_pb2, - type_pb2, - api_pb2, -) - - -def _preload_well_known_types(pool): - """Add google/protobuf/*.proto well-known types to a fresh DescriptorPool. - - Registry-provided FileDescriptorProtos may depend on well-known types - (e.g. google/protobuf/timestamp.proto) without listing them as references. - A custom DescriptorPool doesn't have them by default, so we copy them from - the generated modules before adding user schemas. - """ - for module in _WELL_KNOWN_TYPE_MODULES: - file_name = module.DESCRIPTOR.name - try: - pool.FindFileByName(file_name) - continue - except KeyError: - pass - fd_proto = descriptor_pb2.FileDescriptorProto() - module.DESCRIPTOR.CopyToProto(fd_proto) - pool.Add(fd_proto) - - -class _AvroJSONEncoder(json.JSONEncoder): - """JSON encoder that handles types returned by fastavro for Avro logical types.""" - - def default(self, obj): - if isinstance(obj, decimal.Decimal): - return str(obj) - if isinstance(obj, datetime.datetime): - return obj.isoformat() - if isinstance(obj, datetime.date): - return obj.isoformat() - if isinstance(obj, datetime.time): - return obj.isoformat() - if isinstance(obj, uuid.UUID): - return str(obj) - if isinstance(obj, bytes): - return base64.b64encode(obj).decode('ascii') - return super().default(obj) - - -def _read_varint(data): - shift = 0 - result = 0 - bytes_read = 0 - - for byte in data: - bytes_read += 1 - result |= (byte & 0x7F) << shift - if (byte & 0x80) == 0: - return result, bytes_read - shift += 7 - - raise ValueError("Incomplete varint") - - -def _read_protobuf_message_indices(payload): - """ - Read the Confluent Protobuf message indices array. - - The Confluent Protobuf wire format includes message indices after the schema ID: - [message_indices_length:varint][message_indices:varint...] - The indices indicate which message type to use from the .proto schema. - For example, [0] = first message, [1] = second message, [0, 0] = nested message. - - Args: - payload: bytes after the schema ID - - Returns: - tuple: (message_indices list, remaining payload bytes) - """ - array_len, bytes_read = _read_varint(payload) - payload = payload[bytes_read:] - - indices = [] - for _ in range(array_len): - index, bytes_read = _read_varint(payload) - indices.append(index) - payload = payload[bytes_read:] - - return indices, payload +from .compression import get_codec as _get_compression_codec +from .formats import get_handler as _get_format_handler +from .formats.registry import register_handler as _register_handler +SCHEMA_REGISTRY_MAGIC_BYTE = 0x00 -def _get_protobuf_message_class(schema_info, message_indices): - """Get the protobuf message class based on schema info and message indices. - Args: - schema_info: Tuple of (descriptor_pool, file_descriptor_set) - message_indices: List of indices (e.g., [0], [1], [2, 0] for nested) +def _bootstrap_format_handlers(): + """Direct-register bundled handlers. - Returns: - Message class for the specified type + Entry points only resolve once the wheel has been ``pip install``ed. + For source-mode tests and ``ddev test`` runs we register the builtins + directly so the check is functional without an install step. Idempotent + once the entry-point loader has populated the registry with the same + names. """ - pool, descriptor_set = schema_info - - # First index is the message type in the file - file_descriptor = descriptor_set.file[0] - message_descriptor_proto = file_descriptor.message_type[message_indices[0]] - - package = file_descriptor.package - name_parts = [message_descriptor_proto.name] + from .formats.builtins import ( + AvroHandler, + BsonHandler, + JsonHandler, + ProtobufHandler, + RawHandler, + StringHandler, + ) - # Handle nested messages if there are more indices - current_proto = message_descriptor_proto - for idx in message_indices[1:]: - current_proto = current_proto.nested_type[idx] - name_parts.append(current_proto.name) + for h in (JsonHandler(), StringHandler(), RawHandler(), BsonHandler(), AvroHandler(), ProtobufHandler()): + _register_handler(h) - if package: - full_name = f"{package}.{'.'.join(name_parts)}" - else: - full_name = '.'.join(name_parts) - message_descriptor = pool.FindMessageTypeByName(full_name) - return message_factory.GetMessageClass(message_descriptor) +_bootstrap_format_handlers() class MessageDeserializer: - """Handles deserialization of Kafka messages with support for JSON, BSON, Protobuf, and Avro.""" + """Deserialize Kafka messages with pluggable format + compression support.""" def __init__(self, log, schema_registry=None): self.log = log self.schema_registry = schema_registry - - # Cache for built schemas to avoid rebuilding for every message - # Key: (format_type, schema_str_hash) - self._schema_cache = {} - - # Cache for schemas fetched from registry, keyed by (format_type, schema_id) - self._registry_schema_cache: dict[tuple[str, int], object] = {} + self._schema_cache: dict[tuple[str, str], object] = {} + self._registry_schema_cache: dict[tuple[str, int], tuple[object, str]] = {} def deserialize_message( self, @@ -184,26 +64,29 @@ def deserialize_message( schema_str: str | None = None, uses_schema_registry: bool = False, skip_bytes: int = 0, + compression: str | None = None, ) -> tuple[str | None, int | None]: """Deserialize a message (key or value). Args: - raw_bytes: Raw message bytes - format_type: 'json', 'bson', 'protobuf', 'avro', or 'raw' - schema_str: Schema definition (for protobuf/avro) - uses_schema_registry: Whether to expect Schema Registry format - skip_bytes: Number of bytes to drop from the start of raw_bytes - before any further processing. Useful for stripping a custom - producer-side prefix (e.g. a 1-byte version flag, a 4-byte - tenant id, a non-Confluent schema registry envelope) so the - remaining bytes can be fed to one of the standard format - paths. Applied before raw/json/bson/protobuf/avro and before - Schema Registry magic-byte detection. + raw_bytes: Raw message bytes from Kafka. + format_type: Name of a registered format handler. Built-in: + 'json', 'string', 'raw', 'bson', 'avro', 'protobuf'. + Third-party handlers may be installed via the + ``datadog_kafka_actions.formats`` entry-point group. + schema_str: Schema definition (for protobuf/avro), or arbitrary + schema material for custom handlers. + uses_schema_registry: If True, expect Confluent Schema Registry + wire format (magic byte 0x00 + 4-byte schema id). + skip_bytes: Drop this many bytes from the start of raw_bytes + before any other processing (custom producer prefixes). + compression: Name of a registered compression codec to apply + BEFORE the format handler runs. No codecs ship in core; + install a plugin wheel that registers them on the + ``datadog_kafka_actions.compressions`` entry-point group. Returns: - Tuple of (deserialized_string, schema_id) - - deserialized_string: JSON string representation of the message, or base64 for raw format - - schema_id: Schema ID from Schema Registry (if used), or None + Tuple of (deserialized_string, schema_id). """ if not raw_bytes: return None, None @@ -222,26 +105,36 @@ def deserialize_message( if not raw_bytes: return None, None - if format_type == 'raw': - return json.dumps(base64.b64encode(raw_bytes).decode('ascii')), None + if compression: + codec = _get_compression_codec(compression) + if codec is None: + return f"", None + try: + raw_bytes = codec.decompress(raw_bytes) + except Exception as e: + self.log.warning("Failed to decompress %s payload: %s", compression, e) + return f"", None + if not raw_bytes: + return None, None + + handler = _get_format_handler(format_type) + if handler is None: + return f"", None try: schema = None - # When uses_schema_registry is True and a registry client is available, - # always fetch the schema from the registry — ignore any inline schema. - if format_type in ('protobuf', 'avro') and schema_str: - if not (uses_schema_registry and self.schema_registry is not None): - schema = self._get_or_build_schema(format_type, schema_str) - - return self._deserialize_bytes_maybe_schema_registry(raw_bytes, format_type, schema, uses_schema_registry) + if schema_str and not (uses_schema_registry and self.schema_registry is not None): + schema = self._get_or_build_schema(handler, format_type, schema_str) + return self._deserialize_bytes_maybe_schema_registry( + raw_bytes, handler, format_type, schema, uses_schema_registry + ) except Exception as e: self.log.warning("Failed to deserialize message: %s", e) return f"", None def _deserialize_bytes_maybe_schema_registry( - self, message: bytes, message_format: str, schema, uses_schema_registry: bool + self, message: bytes, handler, format_type: str, schema, uses_schema_registry: bool ) -> tuple[str | None, int | None]: - """Deserialize message, handling Schema Registry format if present.""" if uses_schema_registry: if len(message) < 5 or message[0] != SCHEMA_REGISTRY_MAGIC_BYTE: msg_hex = message[:5].hex() if len(message) >= 5 else message.hex() @@ -250,329 +143,78 @@ def _deserialize_bytes_maybe_schema_registry( f"but message is too short or has wrong magic byte: {msg_hex}" ) schema_id = int.from_bytes(message[1:5], 'big') - message = message[5:] # Skip the magic byte and schema ID bytes + message = message[5:] - actual_format = message_format + actual_handler = handler if self.schema_registry is not None: - schema, actual_format = self._fetch_and_build_schema(schema_id, message_format) + schema, actual_format = self._fetch_and_build_schema(schema_id, format_type) + if actual_format != format_type: + actual_handler = _get_format_handler(actual_format) or handler - return self._deserialize_bytes(message, actual_format, schema, uses_schema_registry=True), schema_id - else: - # Fallback behavior: try without schema registry format first, then with it - try: - return self._deserialize_bytes(message, message_format, schema, uses_schema_registry=False), None - except (UnicodeDecodeError, json.JSONDecodeError, ValueError) as e: - # If the message is not valid, it might be a schema registry message - if len(message) < 5 or message[0] != SCHEMA_REGISTRY_MAGIC_BYTE: - raise e - schema_id = int.from_bytes(message[1:5], 'big') - message = message[5:] # Skip the magic byte and schema ID bytes - return self._deserialize_bytes(message, message_format, schema, uses_schema_registry=True), schema_id - - def _deserialize_bytes( - self, message: bytes, message_format: str, schema, uses_schema_registry: bool = False - ) -> str | None: - """Deserialize message bytes to JSON string. - - Args: - message: Raw message bytes - message_format: 'json', 'bson', 'protobuf', 'avro', or 'string' - schema: Schema object (for protobuf/avro) - uses_schema_registry: Whether to extract Confluent message indices from the message - - Returns: - JSON string representation, or None if message is empty - """ - if not message: - return None - - if message_format == 'protobuf': - return self._deserialize_protobuf(message, schema, uses_schema_registry) - elif message_format == 'avro': - return self._deserialize_avro(message, schema) - elif message_format == 'bson': - return self._deserialize_bson(message) - elif message_format == 'string': - return self._deserialize_string(message) - else: # Default to json - return self._deserialize_json(message) - - def _deserialize_json(self, message: bytes) -> str | None: - """Deserialize JSON message.""" - if not message: - return None - - decoded = message.decode('utf-8').strip() - if not decoded: - return None - - json.loads(decoded) - return decoded - - def _deserialize_string(self, message: bytes) -> str | None: - """Deserialize plain string message (not JSON, just UTF-8 text). - - Returns JSON-encoded string for consistency with other deserializers. - """ - if not message: - return None - - decoded = message.decode('utf-8') - if not decoded: - return None - - # Return as JSON string (quoted string) - return json.dumps(decoded) - - def _deserialize_bson(self, message: bytes) -> str | None: - """Deserialize BSON message to JSON string. - - BSON (Binary JSON) is commonly used with MongoDB and some Kafka producers. - This method decodes BSON bytes and converts to JSON representation. - """ - if not message: - return None - - try: - bson_doc = bson_decode(message) - return bson_dumps(bson_doc) - except Exception as e: - raise ValueError(f"Failed to deserialize BSON message: {e}") - - def _deserialize_protobuf(self, message: bytes, schema_info, uses_schema_registry: bool) -> str: - """Deserialize Protobuf message using google.protobuf with strict validation. - - Args: - message: Raw protobuf bytes - schema_info: Tuple of (descriptor_pool, file_descriptor_set) from _build_protobuf_schema - uses_schema_registry: Whether to extract Confluent message indices from the message - """ - if schema_info is None: - raise ValueError("Protobuf schema is required") - - try: - if uses_schema_registry: - message_indices, message = _read_protobuf_message_indices(message) - # Empty indices array means use the first message type (index 0) - if not message_indices: - message_indices = [0] - else: - message_indices = [0] - - message_class = _get_protobuf_message_class(schema_info, message_indices) - schema_instance = message_class() - - bytes_consumed = schema_instance.ParseFromString(message) - - # Strict validation: ensure all bytes consumed - if bytes_consumed != len(message): - raise ValueError( - f"Not all bytes were consumed during Protobuf decoding! " - f"Read {bytes_consumed} bytes, but message has {len(message)} bytes." - ) - - return MessageToJson(schema_instance) - except Exception as e: - raise ValueError(f"Failed to deserialize Protobuf message: {e}") - - def _deserialize_avro(self, message: bytes, schema) -> str: - """Deserialize Avro message.""" - if schema is None: - raise ValueError("Avro schema is required") + return ( + actual_handler.deserialize(message, schema, log=self.log, uses_schema_registry=True), + schema_id, + ) try: - bio = BytesIO(message) - initial_position = bio.tell() - data = schemaless_reader(bio, schema) - final_position = bio.tell() - - # Strict validation: ensure all bytes consumed - bytes_read = final_position - initial_position - total_bytes = len(message) - - if bytes_read != total_bytes: - raise ValueError( - f"Not all bytes were consumed during Avro decoding! " - f"Read {bytes_read} bytes, but message has {total_bytes} bytes." - ) - - return json.dumps(data, cls=_AvroJSONEncoder) - except Exception as e: - raise ValueError(f"Failed to deserialize Avro message: {e}") - - def _fetch_and_build_schema(self, schema_id: int, message_format: str): - """Fetch schema from the registry by ID and build it. + return ( + handler.deserialize(message, schema, log=self.log, uses_schema_registry=False), + None, + ) + except (UnicodeDecodeError, json.JSONDecodeError, ValueError) as e: + if len(message) < 5 or message[0] != SCHEMA_REGISTRY_MAGIC_BYTE: + raise e + schema_id = int.from_bytes(message[1:5], 'big') + message = message[5:] + return ( + handler.deserialize(message, schema, log=self.log, uses_schema_registry=True), + schema_id, + ) - Returns: - Tuple of (schema_object, actual_format) where actual_format is the - format reported by the registry (may differ from message_format). - """ - cache_key = (message_format, schema_id) + def _fetch_and_build_schema(self, schema_id: int, format_type: str): + cache_key = (format_type, schema_id) cached = self._registry_schema_cache.get(cache_key) if cached is not None: return cached schema_str, schema_type, dep_schemas = self.schema_registry.get_schema(schema_id) + registry_type_map = {'AVRO': 'avro', 'PROTOBUF': 'protobuf', 'JSON': 'json'} + actual_format = registry_type_map.get(schema_type, format_type) - # Map Schema Registry type names to our format names - registry_type_map = { - 'AVRO': 'avro', - 'PROTOBUF': 'protobuf', - 'JSON': 'json', - } - actual_format = registry_type_map.get(schema_type, message_format) + handler = _get_format_handler(actual_format) + if handler is None: + raise ValueError(f"No format handler registered for registry type '{actual_format}'") + + schema_hash = hashlib.sha256(schema_str.encode('utf-8')).hexdigest() + schema_cache_key = (actual_format, schema_hash + ':registry') + schema = self._schema_cache.get(schema_cache_key) + if schema is None: + schema = handler.build_schema_from_registry(schema_str, dep_schemas or []) + self._schema_cache[schema_cache_key] = schema - schema = self._get_or_build_schema(actual_format, schema_str, from_registry=True, dep_schemas=dep_schemas) result = (schema, actual_format) self._registry_schema_cache[cache_key] = result return result - def _get_or_build_schema( - self, message_format: str, schema_str: str, from_registry: bool = False, dep_schemas: list[str] | None = None - ): - """Get cached schema or build it if not in cache. - - Args: - message_format: 'protobuf' or 'avro' - schema_str: Schema definition string - from_registry: If True, schema comes from Schema Registry (FileDescriptorProto for protobuf). - If False, schema is inline config (FileDescriptorSet for protobuf). - dep_schemas: For protobuf from registry, base64-encoded FileDescriptorProtos - for dependencies (in dependency order). - - Returns: - Schema object (cached or newly built) - """ + def _get_or_build_schema(self, handler, format_type: str, schema_str: str): schema_hash = hashlib.sha256(schema_str.encode('utf-8')).hexdigest() - cache_key = (message_format, schema_hash) - - if cache_key in self._schema_cache: - self.log.debug("Using cached schema for %s (hash: %s...)", message_format, schema_hash[:8]) - return self._schema_cache[cache_key] - - self.log.debug("Building new schema for %s (hash: %s...)", message_format, schema_hash[:8]) - schema = self._build_schema(message_format, schema_str, from_registry, dep_schemas) - + cache_key = (format_type, schema_hash) + cached = self._schema_cache.get(cache_key) + if cached is not None: + return cached + schema = handler.build_schema(schema_str) self._schema_cache[cache_key] = schema return schema - def _build_schema( - self, message_format: str, schema_str: str, from_registry: bool = False, dep_schemas: list[str] | None = None - ): - """Build schema object from schema string. - - Args: - message_format: 'protobuf' or 'avro' - schema_str: Schema definition string - from_registry: If True, use registry format (FileDescriptorProto for protobuf) - dep_schemas: For protobuf from registry, dependency FileDescriptorProtos. - - Returns: - Schema object - """ - if message_format == 'protobuf': - if from_registry: - return self._build_protobuf_schema_from_registry(schema_str, dep_schemas or []) - return self._build_protobuf_schema(schema_str) - elif message_format == 'avro': - return self._build_avro_schema(schema_str) - return None - - def _build_avro_schema(self, schema_str: str): - """Build an Avro schema from a JSON string.""" - schema = json.loads(schema_str) - - if schema is None: - raise ValueError("Avro schema cannot be None") - - return schema - - def _build_protobuf_schema(self, schema_str: str): - """Build a Protobuf schema from a base64-encoded FileDescriptorSet. - - Used for inline schemas provided via configuration (value_schema/key_schema). - - Returns: - Tuple of (descriptor_pool, file_descriptor_set) for use with - _get_protobuf_message_class to select the correct message type. - """ - schema_bytes = base64.b64decode(schema_str) - descriptor_set = descriptor_pb2.FileDescriptorSet() - descriptor_set.ParseFromString(schema_bytes) - - pool = descriptor_pool.DescriptorPool() - _preload_well_known_types(pool) - for fd_proto in descriptor_set.file: - pool.Add(fd_proto) - - return (pool, descriptor_set) - - def _build_protobuf_schema_from_registry(self, schema_str: str, dep_schemas: list): - """Build a Protobuf schema from base64-encoded FileDescriptorProtos. - - The Confluent Schema Registry's ?format=serialized endpoint returns a - base64-encoded FileDescriptorProto (single file). Schemas with imports - (e.g., google/protobuf/timestamp.proto) have references that must be - added to the descriptor pool before the main schema. - - The registry sets all FileDescriptorProto names to 'default', so we - fix dependency names to match their import paths (e.g., - 'google/protobuf/timestamp.proto') before adding to the pool. - - Args: - schema_str: Base64-encoded FileDescriptorProto for the main schema. - dep_schemas: List of (name, base64_schema) tuples for dependencies, - in dependency order (deps of deps come first). The name - is the import path used to fix the descriptor name. - - Returns: - Tuple of (descriptor_pool, file_descriptor_set) for use with - _get_protobuf_message_class to select the correct message type. - """ - pool = descriptor_pool.DescriptorPool() - _preload_well_known_types(pool) - descriptor_set = descriptor_pb2.FileDescriptorSet() - - # Add dependencies first (in dependency order), fixing names - for dep_name, dep_b64 in dep_schemas: - try: - pool.FindFileByName(dep_name) - continue - except KeyError: - pass - dep_bytes = base64.b64decode(dep_b64) - dep_proto = descriptor_pb2.FileDescriptorProto() - dep_proto.ParseFromString(dep_bytes) - # Fix the name from 'default' to the actual import path - dep_proto.name = dep_name - pool.Add(dep_proto) - - # Add the main schema - schema_bytes = base64.b64decode(schema_str) - fd_proto = descriptor_pb2.FileDescriptorProto() - fd_proto.ParseFromString(schema_bytes) - descriptor_set.file.append(fd_proto) - pool.Add(fd_proto) - - return (pool, descriptor_set) - class DeserializedMessage: """Represents a deserialized Kafka message with metadata.""" def __init__(self, kafka_msg, deserializer: MessageDeserializer, config: dict): - """Initialize deserialized message. - - Args: - kafka_msg: Raw confluent_kafka.Message object - deserializer: MessageDeserializer instance - config: Deserialization configuration (value_format, key_format, etc.) - """ self.kafka_msg = kafka_msg self.deserializer = deserializer self.config = config - # Lazy deserialization - only deserialize when accessed self._key_deserialized = None self._value_deserialized = None self._key_schema_id = None @@ -580,28 +222,23 @@ def __init__(self, kafka_msg, deserializer: MessageDeserializer, config: dict): @property def offset(self) -> int: - """Message offset.""" return self.kafka_msg.offset() @property def partition(self) -> int: - """Partition number.""" return self.kafka_msg.partition() @property def timestamp(self) -> int: - """Message timestamp.""" ts_type, ts_value = self.kafka_msg.timestamp() return ts_value if ts_value else 0 @property def topic(self) -> str: - """Topic name.""" return self.kafka_msg.topic() @property def headers(self) -> dict: - """Message headers as dict.""" headers = {} if self.kafka_msg.headers(): for key, value in self.kafka_msg.headers(): @@ -613,15 +250,6 @@ def headers(self) -> dict: @staticmethod def _parse_deserialized(deserialized: str | None): - """Parse a deserialized string into a Python object. - - deserialize_message returns either: - - A valid JSON string (for successfully deserialized messages) - - An error string like '' (on failure) - - None (for empty messages) - - Error strings are returned as-is (not parsed as JSON). - """ if not deserialized: return None try: @@ -631,56 +259,45 @@ def _parse_deserialized(deserialized: str | None): @property def key(self) -> dict | str | None: - """Deserialized key (lazy).""" if self._key_deserialized is None and self.kafka_msg.key(): - key_format = self.config.get('key_format', 'json') - key_schema = self.config.get('key_schema') - key_uses_sr = self.config.get('key_uses_schema_registry', False) - key_skip_bytes = self.config.get('key_skip_bytes', 0) - deserialized, schema_id = self.deserializer.deserialize_message( - self.kafka_msg.key(), key_format, key_schema, key_uses_sr, skip_bytes=key_skip_bytes + self.kafka_msg.key(), + self.config.get('key_format', 'json'), + self.config.get('key_schema'), + self.config.get('key_uses_schema_registry', False), + skip_bytes=self.config.get('key_skip_bytes', 0), + compression=self.config.get('key_compression'), ) - self._key_deserialized = self._parse_deserialized(deserialized) self._key_schema_id = schema_id - return self._key_deserialized @property def value(self) -> dict | str | None: - """Deserialized value (lazy).""" if self._value_deserialized is None and self.kafka_msg.value(): - value_format = self.config.get('value_format', 'json') - value_schema = self.config.get('value_schema') - value_uses_sr = self.config.get('value_uses_schema_registry', False) - value_skip_bytes = self.config.get('value_skip_bytes', 0) - deserialized, schema_id = self.deserializer.deserialize_message( - self.kafka_msg.value(), value_format, value_schema, value_uses_sr, skip_bytes=value_skip_bytes + self.kafka_msg.value(), + self.config.get('value_format', 'json'), + self.config.get('value_schema'), + self.config.get('value_uses_schema_registry', False), + skip_bytes=self.config.get('value_skip_bytes', 0), + compression=self.config.get('value_compression'), ) - self._value_deserialized = self._parse_deserialized(deserialized) self._value_schema_id = schema_id - return self._value_deserialized @property def key_schema_id(self) -> int | None: - """Schema Registry schema ID for key (if any).""" - # Trigger lazy deserialization if needed _ = self.key return self._key_schema_id @property def value_schema_id(self) -> int | None: - """Schema Registry schema ID for value (if any).""" - # Trigger lazy deserialization if needed _ = self.value return self._value_schema_id def to_dict(self) -> dict: - """Convert message to dict for filtering/output.""" return { 'offset': self.offset, 'partition': self.partition, diff --git a/kafka_actions/pyproject.toml b/kafka_actions/pyproject.toml index 378786a01b4ed..899dd6bbf1c3d 100644 --- a/kafka_actions/pyproject.toml +++ b/kafka_actions/pyproject.toml @@ -43,6 +43,14 @@ deps = [ "pymongo[srv]==4.8.0; python_version >= '3.9'", ] +[project.entry-points."datadog_kafka_actions.formats"] +json = "datadog_checks.kafka_actions.formats.builtins:JsonHandler" +string = "datadog_checks.kafka_actions.formats.builtins:StringHandler" +raw = "datadog_checks.kafka_actions.formats.builtins:RawHandler" +bson = "datadog_checks.kafka_actions.formats.builtins:BsonHandler" +avro = "datadog_checks.kafka_actions.formats.builtins:AvroHandler" +protobuf = "datadog_checks.kafka_actions.formats.builtins:ProtobufHandler" + [project.urls] Source = "https://github.com/DataDog/integrations-core" From 49fe382d473ad9ad1d69e0d7d9f218b582bef0ac Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Fri, 8 May 2026 18:13:36 -0600 Subject: [PATCH 2/3] [kafka_actions] fixup: license headers and revert version bump - Use 2026-present in license headers to match repo standard - Revert __about__.py to 2.6.0; version bump is handled at release time by towncrier consuming changelog.d/ entries Co-Authored-By: Claude Opus 4.7 (1M context) --- kafka_actions/datadog_checks/kafka_actions/__about__.py | 2 +- .../datadog_checks/kafka_actions/compression/__init__.py | 2 +- kafka_actions/datadog_checks/kafka_actions/compression/base.py | 2 +- .../datadog_checks/kafka_actions/compression/registry.py | 2 +- kafka_actions/datadog_checks/kafka_actions/formats/__init__.py | 2 +- kafka_actions/datadog_checks/kafka_actions/formats/base.py | 2 +- kafka_actions/datadog_checks/kafka_actions/formats/builtins.py | 2 +- kafka_actions/datadog_checks/kafka_actions/formats/registry.py | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/kafka_actions/datadog_checks/kafka_actions/__about__.py b/kafka_actions/datadog_checks/kafka_actions/__about__.py index d72f3df63c920..0657a9824a298 100644 --- a/kafka_actions/datadog_checks/kafka_actions/__about__.py +++ b/kafka_actions/datadog_checks/kafka_actions/__about__.py @@ -1,4 +1,4 @@ # (C) Datadog, Inc. 2025-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) -__version__ = '2.7.0' +__version__ = '2.6.0' diff --git a/kafka_actions/datadog_checks/kafka_actions/compression/__init__.py b/kafka_actions/datadog_checks/kafka_actions/compression/__init__.py index be9f92f79aa05..f959df4c3bf19 100644 --- a/kafka_actions/datadog_checks/kafka_actions/compression/__init__.py +++ b/kafka_actions/datadog_checks/kafka_actions/compression/__init__.py @@ -1,4 +1,4 @@ -# (C) Datadog, Inc. 2025-present +# (C) Datadog, Inc. 2026-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) """Compression codec registry for kafka_actions. diff --git a/kafka_actions/datadog_checks/kafka_actions/compression/base.py b/kafka_actions/datadog_checks/kafka_actions/compression/base.py index 23a580fc5cca6..d1a615a770266 100644 --- a/kafka_actions/datadog_checks/kafka_actions/compression/base.py +++ b/kafka_actions/datadog_checks/kafka_actions/compression/base.py @@ -1,4 +1,4 @@ -# (C) Datadog, Inc. 2025-present +# (C) Datadog, Inc. 2026-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) """Base class for app-level payload compression codecs.""" diff --git a/kafka_actions/datadog_checks/kafka_actions/compression/registry.py b/kafka_actions/datadog_checks/kafka_actions/compression/registry.py index 13ca013f10c22..198a6abede51e 100644 --- a/kafka_actions/datadog_checks/kafka_actions/compression/registry.py +++ b/kafka_actions/datadog_checks/kafka_actions/compression/registry.py @@ -1,4 +1,4 @@ -# (C) Datadog, Inc. 2025-present +# (C) Datadog, Inc. 2026-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) """Lazy registry of compression codecs.""" diff --git a/kafka_actions/datadog_checks/kafka_actions/formats/__init__.py b/kafka_actions/datadog_checks/kafka_actions/formats/__init__.py index 49fc302239f69..1d15b2b2478b4 100644 --- a/kafka_actions/datadog_checks/kafka_actions/formats/__init__.py +++ b/kafka_actions/datadog_checks/kafka_actions/formats/__init__.py @@ -1,4 +1,4 @@ -# (C) Datadog, Inc. 2025-present +# (C) Datadog, Inc. 2026-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) """Format handler registry for kafka_actions. diff --git a/kafka_actions/datadog_checks/kafka_actions/formats/base.py b/kafka_actions/datadog_checks/kafka_actions/formats/base.py index 1e7f99e2f7c2b..03a3e3165dfc4 100644 --- a/kafka_actions/datadog_checks/kafka_actions/formats/base.py +++ b/kafka_actions/datadog_checks/kafka_actions/formats/base.py @@ -1,4 +1,4 @@ -# (C) Datadog, Inc. 2025-present +# (C) Datadog, Inc. 2026-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) """Base class for kafka_actions format handlers.""" diff --git a/kafka_actions/datadog_checks/kafka_actions/formats/builtins.py b/kafka_actions/datadog_checks/kafka_actions/formats/builtins.py index b27c86d982a66..d0a08dcf1599d 100644 --- a/kafka_actions/datadog_checks/kafka_actions/formats/builtins.py +++ b/kafka_actions/datadog_checks/kafka_actions/formats/builtins.py @@ -1,4 +1,4 @@ -# (C) Datadog, Inc. 2025-present +# (C) Datadog, Inc. 2026-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) """Built-in format handlers, registered via entry points in pyproject.toml.""" diff --git a/kafka_actions/datadog_checks/kafka_actions/formats/registry.py b/kafka_actions/datadog_checks/kafka_actions/formats/registry.py index 18a8f59ad023c..80ba2d8298d6c 100644 --- a/kafka_actions/datadog_checks/kafka_actions/formats/registry.py +++ b/kafka_actions/datadog_checks/kafka_actions/formats/registry.py @@ -1,4 +1,4 @@ -# (C) Datadog, Inc. 2025-present +# (C) Datadog, Inc. 2026-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) """Lazy registry of format handlers, populated from entry points + direct registration.""" From 371cd7998660ec0bb9cb72e310cae6718054c111 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Fri, 8 May 2026 18:28:49 -0600 Subject: [PATCH 3/3] [kafka_actions] fixup: rename changelog entry to PR number Co-Authored-By: Claude Opus 4.7 (1M context) --- .../changelog.d/{plugin-architecture.added => 23650.added} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename kafka_actions/changelog.d/{plugin-architecture.added => 23650.added} (100%) diff --git a/kafka_actions/changelog.d/plugin-architecture.added b/kafka_actions/changelog.d/23650.added similarity index 100% rename from kafka_actions/changelog.d/plugin-architecture.added rename to kafka_actions/changelog.d/23650.added