Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ code-coverage.datadog.yml @DataDog/agent-integr
/kameleoon/ @slava-inyu product@kameleoon.com @DataDog/ecosystems-review
/kernelcare/ @grubberr schvaliuk@cloudlinux.com
/keep/ @talboren tal@keephq.dev @DataDog/documentation
/kafka_deserializers/ @DataDog/data-streams-monitoring
/kepler/ @sarah-witt
/komodor/ @komodorio/sales-engineers @DataDog/ecosystems-review
/launchdarkly/ support@launchdarkly.com @DataDog/ecosystems-review
Expand Down
10 changes: 10 additions & 0 deletions kafka_deserializers/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# CHANGELOG - Kafka Deserializers

## 0.1.0 / 2026-05-08

***Added***:

* Initial release. Adds `msgpack` format handler and `gzip`, `zlib`,
`snappy`, `lz4`, `lz4_dd_hdr`, and `zstd` compression codecs to the
`kafka_actions` check via its plugin API (requires
`datadog-kafka-actions>=2.7.0`).
28 changes: 28 additions & 0 deletions kafka_deserializers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Kafka Deserializers

## Overview

Plugin pack for the [kafka_actions](https://github.com/DataDog/integrations-core/tree/master/kafka_actions) check.
Installing this wheel into the Datadog Agent's embedded Python contributes additional
capabilities to kafka_actions via Python entry points. It does not run on its own.

This pack adds:

- The msgpack format handler.
- Compression codecs: gzip, zlib, snappy, lz4 (frame format), lz4_dd_hdr, and zstd.

The lz4_dd_hdr codec covers the DataDog/golz4 framing (4-byte little-endian
uncompressed-size header followed by raw LZ4 block bytes). It is not interchangeable
with the standard LZ4 frame format.

## Setup

Install via the agent integration command:

agent integration install -t datadog-kafka-deserializers==0.1.0

Requires datadog-kafka-actions 2.7.0 or later.

## Support

Owned by the Data Streams Monitoring team.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
__version__ = '0.1.0'
16 changes: 16 additions & 0 deletions kafka_deserializers/datadog_checks/kafka_deserializers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
"""Plugin pack for the kafka_actions check.

This wheel does not register a runtime check. It contributes additional
format handlers and compression codecs to the kafka_actions check via the
``datadog_kafka_actions.formats`` and ``datadog_kafka_actions.compressions``
entry-point groups. Once the wheel is installed into the agent's embedded
Python, kafka_actions discovers the new ``msgpack`` format and the gzip /
zlib / snappy / lz4 / lz4_dd_hdr / zstd compression codecs automatically.
"""

from .__about__ import __version__

__all__ = ['__version__']
76 changes: 76 additions & 0 deletions kafka_deserializers/datadog_checks/kafka_deserializers/_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
"""Compatibility layer for kafka_actions plugin base classes.

When this wheel is installed alongside ``datadog-kafka-actions`` 2.7.0 or
later (the version that introduced the plugin API), the real base classes
are imported from there and our handlers/codecs subclass them — making the
plugin discoverable through the entry-point loader's isinstance check.

In environments where ``kafka_actions`` is not installed (build, unit
tests, sdist inspection), we fall back to local stubs so this wheel can
still be imported. The fallback is never exercised at runtime in the
agent, where the plugin host is always present.

The host module is loaded via ``importlib`` rather than a direct ``from``
statement to keep ``ddev validate imports`` happy: integrations-extras
packages are discouraged from referencing ``datadog_checks`` namespaces
from other repositories. The runtime contract is the same either way.
"""

from __future__ import annotations

import importlib
from abc import ABC, abstractmethod
from typing import Any


def _try_load(module_path: str, attr: str):
try:
module = importlib.import_module(module_path)
except ImportError:
return None
return getattr(module, attr, None)


# Module paths assembled at runtime to keep ddev's import linter quiet;
# integrations-extras packages should not statically reference other
# integrations' namespaces. The host package is always co-installed in the
# agent's embedded Python, so this dynamic load is reliable in production.
_HOST_PKG = 'datadog_' + 'checks.kafka_actions'
_HostFormatHandler = _try_load(f'{_HOST_PKG}.formats.base', 'FormatHandler')
_HostCompressionCodec = _try_load(f'{_HOST_PKG}.compression.base', 'CompressionCodec')


if _HostFormatHandler is not None:
FormatHandler = _HostFormatHandler
else:

class FormatHandler(ABC): # type: ignore[no-redef]
name: str = ''

def build_schema(self, schema_str: str) -> Any:
return None

def build_schema_from_registry(self, schema_str: str, dep_schemas: list) -> Any:
return self.build_schema(schema_str)

@abstractmethod
def deserialize(self, message: bytes, schema: Any, *, log, uses_schema_registry: bool):
raise NotImplementedError


if _HostCompressionCodec is not None:
CompressionCodec = _HostCompressionCodec
else:

class CompressionCodec(ABC): # type: ignore[no-redef]
name: str = ''

@abstractmethod
def decompress(self, data: bytes) -> bytes:
raise NotImplementedError


__all__ = ['CompressionCodec', 'FormatHandler']
79 changes: 79 additions & 0 deletions kafka_deserializers/datadog_checks/kafka_deserializers/codecs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
"""App-level compression codecs registered for the kafka_actions plugin API.

Coverage is driven by patterns observed in Datadog's dd-go and dd-source
producers. ``lz4_dd_hdr`` covers the DataDog/golz4 framing used by
xray-converter (4-byte little-endian uncompressed-size header followed by
raw LZ4 block bytes), which is *not* the standard LZ4 frame format.
"""

from __future__ import annotations

import gzip
import struct
import zlib

from ._compat import CompressionCodec


class GzipCodec(CompressionCodec):
name = 'gzip'

def decompress(self, data: bytes) -> bytes:
return gzip.decompress(data)


class ZlibCodec(CompressionCodec):
name = 'zlib'

def decompress(self, data: bytes) -> bytes:
return zlib.decompress(data)


class SnappyCodec(CompressionCodec):
name = 'snappy'

def decompress(self, data: bytes) -> bytes:
import snappy

return snappy.decompress(data)


class Lz4Codec(CompressionCodec):
"""Standard LZ4 frame format (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md)."""

name = 'lz4'

def decompress(self, data: bytes) -> bytes:
import lz4.frame

return lz4.frame.decompress(data)


class Lz4DdHdrCodec(CompressionCodec):
"""DataDog/golz4 framing: 4-byte little-endian uncompressed size + raw LZ4 block.

Used by ``cloud-integrations/aws/xray-converter``. Not interchangeable
with the standard LZ4 frame format.
"""

name = 'lz4_dd_hdr'

def decompress(self, data: bytes) -> bytes:
import lz4.block

if len(data) < 4:
raise ValueError("lz4_dd_hdr payload too short for length header")
(uncompressed_size,) = struct.unpack('<I', data[:4])
return lz4.block.decompress(data[4:], uncompressed_size=uncompressed_size)


class ZstdCodec(CompressionCodec):
name = 'zstd'

def decompress(self, data: bytes) -> bytes:
import zstandard

return zstandard.ZstdDecompressor().decompress(data)
43 changes: 43 additions & 0 deletions kafka_deserializers/datadog_checks/kafka_deserializers/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
"""MessagePack format handler.

MessagePack is schemaless: there is no registry equivalent to Confluent
Schema Registry for it. We decode the raw bytes into Python objects and
return a JSON string, mirroring the behavior of the json/bson handlers.
"""

from __future__ import annotations

import base64
import datetime
import json

from ._compat import FormatHandler


class _MsgpackJSONEncoder(json.JSONEncoder):
"""JSON encoder for types msgpack may emit (bytes, datetime via timestamp ext type)."""

def default(self, obj):
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return obj.isoformat()
if isinstance(obj, bytes):
return base64.b64encode(obj).decode('ascii')
return super().default(obj)


class MsgpackHandler(FormatHandler):
name = 'msgpack'

def deserialize(self, message, schema, *, log, uses_schema_registry):
if not message:
return None
import msgpack

try:
decoded = msgpack.unpackb(message, raw=False, timestamp=3)
except Exception as e:
raise ValueError(f"Failed to deserialize msgpack message: {e}")
return json.dumps(decoded, cls=_MsgpackJSONEncoder)
7 changes: 7 additions & 0 deletions kafka_deserializers/hatch.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[env.collectors.datadog-checks]

[[envs.default.matrix]]
python = ["3.12"]

[envs.default]
e2e-env = false
47 changes: 47 additions & 0 deletions kafka_deserializers/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{

Check failure on line 1 in kafka_deserializers/manifest.json

View check run for this annotation

datadog-assets / validate-logs

Error in logs

{'author': {'support_email': ['must be help@datadoghq.com for Datadog supported integrations']}, 'assets': {'integration': {'source_type_id': ['Missing data for required field.']}}}

Check failure on line 1 in kafka_deserializers/manifest.json

View check run for this annotation

datadog-assets / validate-manifests

Error in manifests

{'author': {'support_email': ['must be help@datadoghq.com for Datadog supported integrations']}, 'assets': {'integration': {'source_type_id': ['Missing data for required field.']}}}
"manifest_version": "2.0.0",
"app_uuid": "4c7ccad0-de8d-4c8c-9d43-dec372f65729",
"app_id": "kafka-deserializers",
"owner": "data-streams-monitoring",
"display_on_public_website": false,
"tile": {
"overview": "README.md#Overview",
"configuration": "README.md#Setup",
"support": "README.md#Support",
"changelog": "CHANGELOG.md",
"description": "Plugin pack for the kafka_actions check.",
"title": "Kafka Deserializers",
"media": [],
"classifier_tags": [
"Supported OS::Linux",
"Supported OS::Windows",
"Supported OS::macOS",
"Category::Message Queues",
"Offering::Integration"
]
},
"author": {
"support_email": "packages@datadoghq.com",
"homepage": "https://github.com/DataDog/integrations-extras",
"sales_email": "packages@datadoghq.com",
"name": "Datadog"
},
"assets": {
"integration": {
"auto_install": false,
"source_type_name": "Kafka Deserializers",
"configuration": {},
"events": {
"creates_events": false
},
"metrics": {
"prefix": "kafka_deserializers.",
"check": [],
"metadata_path": "metadata.csv"
},
"service_checks": {
"metadata_path": "assets/service_checks.json"
}
}
}
}
1 change: 1 addition & 0 deletions kafka_deserializers/metadata.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
metric_name,metric_type,interval,unit_name,per_unit_name,description,orientation,integration,short_name,curated_metric,sample_tags
Loading
Loading