From cda82b7daab5d93e390bcc6a90446aaedcd000d7 Mon Sep 17 00:00:00 2001 From: Youhei Sakurai Date: Fri, 2 Jan 2026 16:29:57 +0900 Subject: [PATCH 01/10] Add aiokafka support (AIOKafkaInstrumentation) --- .../packages/asyncio/aiokafka.py | 372 +++++++++++++++ elasticapm/instrumentation/register.py | 1 + setup.cfg | 1 + .../asyncio_tests/aiokafka_tests.py | 429 ++++++++++++++++++ tests/requirements/reqs-aiokafka-newest.txt | 2 + tests/scripts/envs/aiokafka.sh | 5 + 6 files changed, 810 insertions(+) create mode 100644 elasticapm/instrumentation/packages/asyncio/aiokafka.py create mode 100644 tests/instrumentation/asyncio_tests/aiokafka_tests.py create mode 100644 tests/requirements/reqs-aiokafka-newest.txt create mode 100644 tests/scripts/envs/aiokafka.sh diff --git a/elasticapm/instrumentation/packages/asyncio/aiokafka.py b/elasticapm/instrumentation/packages/asyncio/aiokafka.py new file mode 100644 index 000000000..df8e974f8 --- /dev/null +++ b/elasticapm/instrumentation/packages/asyncio/aiokafka.py @@ -0,0 +1,372 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import time +from collections.abc import Awaitable, Callable, Container, Iterable, MutableSequence +from enum import IntEnum +from typing import TYPE_CHECKING, Dict, List, Optional, TypeVar, cast + +from elasticapm import Client, get_client +from elasticapm.conf.constants import OUTCOME, TRACEPARENT_BINARY_HEADER_NAME +from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule +from elasticapm.traces import DroppedSpan, Span, Transaction, capture_span, execution_context +from elasticapm.utils.disttracing import TraceParent + +if TYPE_CHECKING: + from aiokafka import AIOKafkaConsumer, AIOKafkaProducer, ConsumerRecord, TopicPartition # pragma: no cover + + +class _KafkaTimestampType(IntEnum): + NO_TIMESTAMP_TYPE = -1 + CREATE_TIME = 0 + LOG_APPEND_TIME = 1 + + +class AIOKafkaInstrumentation(AsyncAbstractInstrumentedModule): + """Instrument the aiokafka's consumer and producer + + Features: + - Like KafkaInstrumentation, it begins a new transaction on asynchronous + iteration over the consumer if no transaction is active. + - Unlike KafkaInstrumentation, when an active transaction exists, it also + records the last span that awaits a message before StopAsyncIteration + arises. + - Automatic trace context propagation is not supported for messages being + sent via send_batch(). + """ + + instrument_list = [ + ("aiokafka", "AIOKafkaConsumer.getone"), + ("aiokafka", "AIOKafkaConsumer.getmany"), + ("aiokafka", "AIOKafkaProducer.send"), + ("aiokafka", "AIOKafkaProducer.send_batch"), + ("aiokafka", "AIOKafkaConsumer.__anext__"), + ] + name = "aiokafka" + creates_transactions = True + + SPAN_TYPE = SERVICE_TYPE = TRANSACTION_TYPE = "messaging" + SPAN_SUBTYPE = SERVICE_NAME = "kafka" + + T_Result = TypeVar("T_Result") + + async def call( + self, + module: str, + method: str, + wrapped: Callable[..., Awaitable[T_Result]], + instance: Optional[object], + args: tuple, + kwargs: dict, + ) -> T_Result: + + client = get_client() + if not client: + return await wrapped(*args, **kwargs) + + transaction = execution_context.get_transaction() + + if method == "AIOKafkaConsumer.__anext__": + # If no transaction exists, we create and start new ones implicitly + # like we do in KafkaInstrumentation. + + if transaction and transaction.transaction_type != self.TRANSACTION_TYPE: + # Somebody started a transaction outside of the consumer, + # so we will only capture subsequent getone() as a span. + return await wrapped(*args, **kwargs) + + # No transaction running, or this is a transaction started by us, + # so let's end it and start the next, + # unless a StopAsyncIteration is raised, at which point we do nothing. + if transaction: + client.end_transaction(result=OUTCOME.SUCCESS) + + # May raise StopAsyncIteration + result = await wrapped(*args, **kwargs) + message = cast("ConsumerRecord", result) + + if client.should_ignore_topic(message.topic): + return result + + trace_parent = _extract_trace_parent_from_message_headers(message.headers) + transaction = client.begin_transaction(self.TRANSACTION_TYPE, trace_parent=trace_parent) + + if not transaction: + return result + + transaction.name = f"Kafka RECEIVE from {message.topic}" + self._enrich_transaction_context( + transaction, message.topic, timestamp_type=message.timestamp_type, timestamp=message.timestamp + ) + + return result + + elif not transaction: + return await wrapped(*args, **kwargs) + + elif method.startswith("AIOKafkaConsumer.get"): + return await self._trace_get( + wrapped, + cast(Optional["AIOKafkaConsumer"], instance), + args, + kwargs, + client=client, + ) + + else: + return await self._trace_send( + wrapped, + cast(Optional["AIOKafkaProducer"], instance), + args, + kwargs, + client=client, + trace_parent=transaction.trace_parent, + ) + + @classmethod + async def _trace_get( + cls, + wrapped: Callable[..., Awaitable[T_Result]], + instance: Optional["AIOKafkaConsumer"], + args: tuple, + kwargs: dict, + *, + client: Client, + ) -> T_Result: + """Trace the consumer's get() and getmany() by capturing a span""" + + with capture_span( + name="Kafka RECEIVE", + leaf=True, + span_type=cls.SPAN_TYPE, + span_subtype=cls.SPAN_SUBTYPE, + span_action="receive", + ) as span: + + result = await wrapped(*args, **kwargs) + + if not span or isinstance(span, DroppedSpan): + return result + + trace_topics = [ + topic for topic in _extract_topics_from_get_result(result) if not client.should_ignore_topic(topic) + ] + + if not trace_topics: + span.cancel() + return result + + span.name += f" from {', '.join(trace_topics)}" + cls._enrich_span_context(span, *trace_topics, consumer=instance) + + for message in _extract_messages_from_get_result(result, include_topics=trace_topics): + trace_parent = _extract_trace_parent_from_message_headers(message.headers) + if trace_parent: + span.add_link(trace_parent) + + return result + + @classmethod + async def _trace_send( + cls, + wrapped: Callable[..., Awaitable[T_Result]], + instance: Optional["AIOKafkaProducer"], + args: tuple, + kwargs: dict, + *, + client: Client, + trace_parent: TraceParent, + ) -> T_Result: + """Trace the producer's send() and send_batch() by capturing a span""" + + topic = _extract_topic_from_send_arguments(args, kwargs) + if client.should_ignore_topic(topic): + return await wrapped(*args, **kwargs) + + with capture_span( + name=f"Kafka SEND to {topic}", + leaf=True, + span_type=cls.SPAN_TYPE, + span_subtype=cls.SPAN_SUBTYPE, + span_action="send", + ) as span: + + if span and not isinstance(span, DroppedSpan): + trace_parent = trace_parent.copy_from(span_id=span.id) + + mutable_args = list(args) + _inject_trace_parent_into_send_arguments(mutable_args, kwargs, trace_parent) + + result = await wrapped(*mutable_args, **kwargs) + + if span and not isinstance(span, DroppedSpan): + cls._enrich_span_context(span, topic, producer=instance) + + return result + + @classmethod + def _enrich_span_context( + cls, + span: Span, + topic: str, + *topics: str, + producer: Optional["AIOKafkaProducer"] = None, + consumer: Optional["AIOKafkaConsumer"] = None, + ): + + destination_service = {"type": cls.SERVICE_TYPE, "name": cls.SERVICE_NAME} + service_framework = {"name": "Kafka"} + + span.context.setdefault("destination", {}).setdefault("service", {}).update(destination_service) + span.context.setdefault("service", {}).setdefault("framework", {}).update(service_framework) + + if not topics: + span.context["destination"]["service"]["resource"] = f"{cls.SERVICE_NAME}/{topic}" + span.context.setdefault("message", {}).setdefault("queue", {}).update({"name": topic}) + + if producer and producer.client.cluster.controller: + span.context["destination"]["address"] = producer.client.cluster.controller.host + span.context["destination"]["port"] = producer.client.cluster.controller.port + + @classmethod + def _enrich_transaction_context( + cls, + transaction: Transaction, + topic: str, + *, + timestamp_type: int, + timestamp: int, + ): + + destination_service = { + "type": cls.SERVICE_TYPE, + "name": cls.SERVICE_NAME, + "resource": f"{cls.SERVICE_NAME}/{topic}", + } + message_queue = {"name": topic} + service_framework = {"name": "Kafka"} + + transaction.context.setdefault("destination", {}).setdefault("service", {}).update(destination_service) + transaction.context.setdefault("message", {}).setdefault("queue", {}).update(message_queue) + transaction.context.setdefault("service", {}).setdefault("framework", {}).update(service_framework) + + if timestamp_type == _KafkaTimestampType.CREATE_TIME: + current_time_millis = int(round(time.time() * 1000)) + age = current_time_millis - timestamp + transaction.context["message"].setdefault("age", {}).update({"ms": age}) + + +def _extract_trace_parent_from_message_headers(headers: Optional[Iterable]) -> Optional[TraceParent]: + + for key, value in headers or []: + if key == TRACEPARENT_BINARY_HEADER_NAME: + return TraceParent.from_binary(value) + + return None + + +def _extract_topics_from_get_result(result) -> Iterable[str]: + + if hasattr(result, "topic"): + message = cast("ConsumerRecord", result) # from getone() + yield message.topic + + else: + messages = cast(Dict["TopicPartition", List["ConsumerRecord"]], result) # from getmany() + for topic_partition in messages: + yield topic_partition.topic + + +def _extract_messages_from_get_result(result, *, include_topics: Container[str] = ()) -> Iterable["ConsumerRecord"]: + + if hasattr(result, "topic"): + message = cast("ConsumerRecord", result) # from getone() + if message.topic in include_topics: + yield message + + else: + messages = cast(Dict["TopicPartition", List["ConsumerRecord"]], result) # from getmany() + for topic_partition in messages: + if topic_partition.topic not in include_topics: + continue + yield from messages[topic_partition] + + +def _has_append_method(obj: object) -> bool: + + return hasattr(obj, "append") and callable(getattr(obj, "append")) + + +def _extract_topic_from_send_arguments(args: tuple, kwargs: dict) -> str: + + if "topic" in kwargs: + return kwargs["topic"] + + elif _has_append_method(args[0]): + # The first argument of the producer's send_batch() may be 'BatchBuilder' + # which has 'append' method. If that's the case, the second one is 'topic'. + return args[1] + + return args[0] + + +def _inject_trace_parent_into_send_arguments(args: list, kwargs: dict, trace_parent: TraceParent): + + if "batch" in kwargs or args and _has_append_method(args[0]): + return # Injection is not practical as messages are already encoded in the batch + + if "headers" in kwargs: + headers = kwargs["headers"] + if headers is None: + headers = kwargs["headers"] = [] + + else: + # headers is the 6th parameter in send() + headers_position_in_args = 5 # 6th parameter, 0-indexed + for preceding_parameter in ["topic", "value", "key", "partition", "timestamp_ms"]: + if preceding_parameter in kwargs: + headers_position_in_args -= 1 + + try: + headers = args[headers_position_in_args] + except IndexError: + headers = kwargs["headers"] = [] + else: + if headers is None: + headers = args[headers_position_in_args] = [] + + if not isinstance(headers, MutableSequence): + # headers may also be a tuple, for example + raise TypeError(f"'headers' is not a MutableSequence, got {type(headers).__name__}") + + # Injecting trace parent after removing any existing one; Here, we retain + # even a header with zero elements as we are not in a position to remove it. + headers[:] = [header for header in headers if not header or header[0] != TRACEPARENT_BINARY_HEADER_NAME] + headers.append((TRACEPARENT_BINARY_HEADER_NAME, trace_parent.to_binary())) diff --git a/elasticapm/instrumentation/register.py b/elasticapm/instrumentation/register.py index b37aff1e9..b8196c74f 100644 --- a/elasticapm/instrumentation/register.py +++ b/elasticapm/instrumentation/register.py @@ -96,6 +96,7 @@ "elasticapm.instrumentation.packages.asyncio.redis_asyncio.RedisPipelineInstrumentation", "elasticapm.instrumentation.packages.asyncio.psycopg_async.AsyncPsycopgInstrumentation", "elasticapm.instrumentation.packages.grpc.GRPCAsyncServerInstrumentation", + "elasticapm.instrumentation.packages.asyncio.aiokafka.AIOKafkaInstrumentation", ] ) diff --git a/setup.cfg b/setup.cfg index 0a56a9b01..5515ccda9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -113,6 +113,7 @@ markers = kafka grpc azurestorage + aiokafka addopts=--random-order [isort] diff --git a/tests/instrumentation/asyncio_tests/aiokafka_tests.py b/tests/instrumentation/asyncio_tests/aiokafka_tests.py new file mode 100644 index 000000000..c28f74d1c --- /dev/null +++ b/tests/instrumentation/asyncio_tests/aiokafka_tests.py @@ -0,0 +1,429 @@ +# BSD 3-Clause License +# +# Copyright (c) 2022, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import asyncio +import os +from unittest.mock import MagicMock + +import pytest +import pytest_asyncio + +import elasticapm +from elasticapm.conf.constants import OUTCOME, SPAN, TRACEPARENT_BINARY_HEADER_NAME, TRANSACTION +from elasticapm.instrumentation.packages.asyncio.aiokafka import _inject_trace_parent_into_send_arguments +from elasticapm.utils.disttracing import TraceParent + +kafka = pytest.importorskip("aiokafka") + +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer +from aiokafka.admin import AIOKafkaAdminClient, NewTopic + +pytestmark = [pytest.mark.aiokafka] + +KAFKA_HOST = os.environ.get("KAFKA_HOST") +if not KAFKA_HOST: + pytestmark.append(pytest.mark.skip("Skipping aiokafka tests, no KAFKA_HOST environment variable set")) + + +@pytest_asyncio.fixture(scope="function") +async def topics(): + topics = ["test", "foo", "bar"] + admin_client = AIOKafkaAdminClient(bootstrap_servers=[f"{KAFKA_HOST}:9092"]) + + await admin_client.start() + await admin_client.create_topics([NewTopic(name, num_partitions=1, replication_factor=1) for name in topics]) + try: + yield topics + finally: + await admin_client.delete_topics(topics) + + +@pytest_asyncio.fixture() +async def producer(): + producer = AIOKafkaProducer(bootstrap_servers=f"{KAFKA_HOST}:9092") + await producer.start() + await producer.client.bootstrap() + try: + yield producer + finally: + await producer.stop() + + +@pytest_asyncio.fixture() +async def consumer(topics): + consumer = AIOKafkaConsumer(bootstrap_servers=f"{KAFKA_HOST}:9092") + consumer.subscribe(topics=topics) + await consumer.start() + + async def consumer_close_later(): + await asyncio.sleep(1.0) + await consumer.stop() + + task = asyncio.create_task(consumer_close_later()) + try: + yield consumer + finally: + await task + + +@pytest.mark.asyncio +async def test_aiokafka_produce(instrument, elasticapm_client, producer, topics): + elasticapm_client.begin_transaction("test") + await producer.send("test", key=b"foo", value=b"bar") + elasticapm_client.end_transaction("test", "success") + transactions = elasticapm_client.events[TRANSACTION] + span = elasticapm_client.events[SPAN][0] + assert span["name"] == "Kafka SEND to test" + assert span["context"]["message"]["queue"]["name"] == "test" + assert span["context"]["destination"]["port"] == 9092 + assert span["context"]["destination"]["service"]["name"] == "kafka" + assert span["context"]["destination"]["service"]["resource"] == "kafka/test" + assert span["context"]["destination"]["service"]["type"] == "messaging" + + +@pytest.mark.asyncio +async def test_aiokafka_produce_ignore_topic(instrument, elasticapm_client, producer, topics): + elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") + elasticapm_client.begin_transaction("test") + await producer.send(topic="foo", key=b"foo", value=b"bar") + await producer.send("bar", key=b"foo", value=b"bar") + await producer.send("test", key=b"foo", value=b"bar") + elasticapm_client.end_transaction("test", "success") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + assert spans[0]["name"] == "Kafka SEND to test" + + +@pytest.mark.asyncio +async def test_aiokafka_consume(instrument, elasticapm_client, producer, consumer, topics): + async def delayed_send(): + await asyncio.sleep(0.2) + elasticapm_client.begin_transaction("foo") + await producer.send("test", key=b"foo", value=b"bar") + await producer.send("test", key=b"baz", value=b"bazzinga") + elasticapm_client.end_transaction("foo") + + task = asyncio.create_task(delayed_send()) + async for item in consumer: + with elasticapm.capture_span("foo"): + pass + await task + transactions = elasticapm_client.events[TRANSACTION] + spans = elasticapm_client.events[SPAN] + # the consumer transactions should have the same trace id as the transaction that triggered the messages + assert transactions[0]["trace_id"] == transactions[1]["trace_id"] == transactions[2]["trace_id"] + assert transactions[1]["name"] == "Kafka RECEIVE from test" + assert transactions[1]["type"] == "messaging" + assert transactions[1]["context"]["message"]["queue"]["name"] == "test" + + assert spans[2]["transaction_id"] == transactions[1]["id"] + assert spans[3]["transaction_id"] == transactions[2]["id"] + + +@pytest.mark.asyncio +async def test_aiokafka_consume_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): + async def delayed_send(): + await asyncio.sleep(0.2) + elasticapm_client.begin_transaction("foo") + await producer.send("test", key=b"foo", value=b"bar") + await producer.send("test", key=b"baz", value=b"bazzinga") + elasticapm_client.end_transaction("foo") + + task = asyncio.create_task(delayed_send()) + transaction = elasticapm_client.begin_transaction("foo") + async for item in consumer: + pass + await task + elasticapm_client.end_transaction("foo") + transactions = elasticapm_client.events[TRANSACTION] + assert len(transactions) == 2 + external_spans = elasticapm_client.spans_for_transaction(transactions[0]) + spans = elasticapm_client.spans_for_transaction(transactions[1]) + assert len(external_spans) == 2 + assert len(spans) == 3 + assert spans[0]["links"][0]["trace_id"] == external_spans[0]["trace_id"] + assert spans[1]["links"][0]["trace_id"] == external_spans[1]["trace_id"] + # It records the last span that was awaiting when stopping the consumer as a failure. + assert spans[2]["outcome"] == OUTCOME.FAILURE + assert "links" not in spans[2] + + +@pytest.mark.asyncio +async def test_aiokafka_consumer_ignore_topic(instrument, elasticapm_client, producer, consumer, topics): + elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") + + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send(topic="foo", key=b"foo", value=b"bar") + await producer.send("bar", key=b"foo", value=b"bar") + await producer.send("test", key=b"foo", value=b"bar") + + task = asyncio.create_task(delayed_send()) + async for item in consumer: + with elasticapm.capture_span("test"): + assert item + await task + transactions = elasticapm_client.events[TRANSACTION] + assert len(transactions) == 1 + assert transactions[0]["name"] == "Kafka RECEIVE from test" + + +@pytest.mark.asyncio +async def test_aiokafka_consumer_ignore_topic_ongoing_transaction( + instrument, elasticapm_client, producer, consumer, topics +): + elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") + + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send(topic="foo", key=b"foo", value=b"bar") + await producer.send("bar", key=b"foo", value=b"bar") + await producer.send("test", key=b"foo", value=b"bar") + + task = asyncio.create_task(delayed_send()) + transaction = elasticapm_client.begin_transaction("foo") + async for item in consumer: + pass + await task + elasticapm_client.end_transaction("foo") + transactions = elasticapm_client.events[TRANSACTION] + spans = elasticapm_client.spans_for_transaction(transactions[0]) + assert len(spans) == 2 + assert spans[0]["name"] == "Kafka RECEIVE from test" + # It records the last span that was awaiting when stopping the consumer as a failure. + assert spans[1]["outcome"] == OUTCOME.FAILURE + assert spans[1]["name"] == "Kafka RECEIVE" + + +@pytest.mark.asyncio +async def test_aiokafka_getmany_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send("test", key=b"foo", value=b"bar") + await producer.send("test", key=b"baz", value=b"bazzinga") + + task = asyncio.create_task(delayed_send()) + transaction = elasticapm_client.begin_transaction("foo") + results = await consumer.getmany(timeout_ms=1000) + await task + elasticapm_client.end_transaction("foo") + transactions = elasticapm_client.events[TRANSACTION] + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + assert spans[0]["name"] == "Kafka RECEIVE from test" + + +@pytest.mark.asyncio +async def test_aiokafka_no_client(instrument, producer, consumer, topics): + assert elasticapm.get_client() is None + # the following code shouldn't trigger any errors + await producer.send_and_wait("test", key=b"foo", value=b"bar") + async for item in consumer: + pass + + +@pytest.mark.asyncio +async def test_aiokafka_send_unsampled_transaction(instrument, elasticapm_client, producer, topics): + transaction_object = elasticapm_client.begin_transaction("transaction") + transaction_object.is_sampled = False + await producer.send("test", key=b"foo", value=b"bar") + elasticapm_client.end_transaction("foo") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 0 + + +@pytest.mark.asyncio +async def test_aiokafka_getmany_unsampled_transaction(instrument, elasticapm_client, consumer, topics): + transaction_object = elasticapm_client.begin_transaction("transaction") + transaction_object.is_sampled = False + await consumer.getmany(timeout_ms=50) + elasticapm_client.end_transaction("foo") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 0 + + +@pytest.mark.asyncio +async def test_aiokafka_consumer_unsampled_transaction_handles_stop_iteration( + instrument, elasticapm_client, producer, consumer, topics +): + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send("test", key=b"foo", value=b"bar") + + task = asyncio.create_task(delayed_send()) + transaction = elasticapm_client.begin_transaction("foo") + transaction.is_sampled = False + async for item in consumer: + pass + await task + elasticapm_client.end_transaction("foo") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 0 + + +@pytest.mark.asyncio +async def test_aiokafka_getmany_multiple_topics(instrument, elasticapm_client, producer, consumer, topics): + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send("foo", key=b"foo", value=b"bar") + await producer.send("bar", key=b"foo", value=b"bar") + await producer.send("test", key=b"baz", value=b"bazzinga") + + task = asyncio.create_task(delayed_send()) + await asyncio.sleep(0.5) # Wait a while so it can get all messages at once. + + elasticapm_client.config.update("1", ignore_message_queues="bar") + elasticapm_client.begin_transaction("foo") + results = await consumer.getmany() + await task + elasticapm_client.end_transaction() + + assert len(results) == 3 + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + assert spans[0]["name"] == "Kafka RECEIVE from foo, test" + + +@pytest.mark.asyncio +async def test_aiokafka_send_batch(instrument, elasticapm_client, producer, consumer, topics): + async def delayed_send(): + await asyncio.sleep(0.2) + + batch = producer.create_batch() + batch.append(key=b"foo", value=b"bar", timestamp=None) + batch.append(key=b"baz", value=b"bazzinga", timestamp=None) + partitions = await producer.partitions_for("test") + + elasticapm_client.begin_transaction("send") + await producer.send_batch(batch, "test", partition=partitions.pop()) + elasticapm_client.end_transaction() + + task = asyncio.create_task(delayed_send()) + elasticapm_client.begin_transaction("recv") + async for _ in consumer: + pass + await task + elasticapm_client.end_transaction() + await task + + tran_send, tran_recv = elasticapm_client.events[TRANSACTION] + spans_send = elasticapm_client.spans_for_transaction(tran_send) + spans_recv = elasticapm_client.spans_for_transaction(tran_recv) + + assert len(spans_send) == 1 + assert len(spans_recv) == 3 + assert "links" not in spans_recv[0] + assert "links" not in spans_recv[1] + # It records the last span that was awaiting when stopping the consumer as a failure. + assert spans_recv[2]["outcome"] == OUTCOME.FAILURE + assert "links" not in spans_recv[2] + assert all(spans_send[0]["trace_id"] != span_recv["trace_id"] for span_recv in spans_recv) + + +@pytest.mark.asyncio +async def test_aiokafka_consumer_non_recording_transaction(instrument, elasticapm_client, producer, consumer, topics): + elasticapm_client.config.update("1", recording=False) + + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send("test", key=b"foo", value=b"bar") + + task = asyncio.create_task(delayed_send()) + async for item in consumer: + pass + await task + transactions = elasticapm_client.events[TRANSACTION] + assert len(transactions) == 0 + + +def test_aiokafka_inject_trace_parent_into_send_arguments(): + trace_parent = TraceParent.from_string("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + assert trace_parent + traceparent_header = (TRACEPARENT_BINARY_HEADER_NAME, trace_parent.to_binary()) + headers_list = [("header_name", "header_value")] + headers_list_but_malformed = [tuple()] + headers_list_with_other_traceparent = [(TRACEPARENT_BINARY_HEADER_NAME, ...)] + headers_none = None + batch = AIOKafkaProducer.create_batch(self=MagicMock()) + + def func(*args, **kwargs): + mutable_args = list(args) + _inject_trace_parent_into_send_arguments(mutable_args, kwargs, trace_parent) + return mutable_args, kwargs + + # send(topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None) + args, kwargs = func("topic", key=..., value=...) + assert traceparent_header in kwargs["headers"] + + args, kwargs = func("topic", key=..., value=..., headers=headers_list) + assert traceparent_header in kwargs["headers"] + assert len(kwargs["headers"]) == 2 # The traceparent header should be appended + + args, kwargs = func("topic", key=..., value=..., headers=headers_list_but_malformed) + assert traceparent_header in kwargs["headers"] + assert len(kwargs["headers"]) == 2 # Injection should succeed despite malformed headers + + args, kwargs = func("topic", key=..., value=..., headers=headers_list_with_other_traceparent) + assert traceparent_header in kwargs["headers"] + assert len(kwargs["headers"]) == 1 # The traceparent header should be overwritten + + args, kwargs = func("topic", key=..., value=..., headers=headers_none) + assert traceparent_header in kwargs["headers"] + + args, kwargs = func("topic", "value", "key", "partition", "timestamp_ms") + assert traceparent_header in kwargs["headers"] + + args, kwargs = func("topic", "partition", headers_none, key=..., value=..., timestamp_ms=...) + assert traceparent_header in args[2] + + args, kwargs = func("topic", "value", "key", "partition", "timestamp_ms", headers_none) + assert traceparent_header in args[5] + + args, kwargs = func("topic", "value", "key", "partition", "timestamp_ms", headers_list) + assert traceparent_header in args[5] + + with pytest.raises(TypeError, match=r"\bMutableSequence\b"): + args, kwargs = func("topic", "value", headers=tuple()) + + with pytest.raises(TypeError, match=r"\bMutableSequence\b"): + args, kwargs = func("topic", "value", headers=dict()) + + # send_batch(batch, topic, *, partition) + args, kwargs = func(batch, "topic", partition=...) + assert "headers" not in kwargs + assert len(args) == 2 + + args, kwargs = func("topic", partition=..., batch=...) + assert "headers" not in kwargs + assert len(args) == 1 + + args, kwargs = func(topic=..., partition=..., batch=...) + assert "headers" not in kwargs + assert len(args) == 0 diff --git a/tests/requirements/reqs-aiokafka-newest.txt b/tests/requirements/reqs-aiokafka-newest.txt new file mode 100644 index 000000000..b80dd3847 --- /dev/null +++ b/tests/requirements/reqs-aiokafka-newest.txt @@ -0,0 +1,2 @@ +aiokafka +-r reqs-base.txt diff --git a/tests/scripts/envs/aiokafka.sh b/tests/scripts/envs/aiokafka.sh new file mode 100644 index 000000000..e55debde2 --- /dev/null +++ b/tests/scripts/envs/aiokafka.sh @@ -0,0 +1,5 @@ +export PYTEST_MARKER="-m aiokafka" +export DOCKER_DEPS="kafka" +export KAFKA_HOST="kafka" +export WAIT_FOR_HOST="kafka" +export WAIT_FOR_PORT=9092 From 45549b29e57701e9cc8a15c23193c3baa46e8ee2 Mon Sep 17 00:00:00 2001 From: Youhei Sakurai Date: Fri, 2 Jan 2026 18:12:24 +0900 Subject: [PATCH 02/10] Refine docstring --- .../instrumentation/packages/asyncio/aiokafka.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/elasticapm/instrumentation/packages/asyncio/aiokafka.py b/elasticapm/instrumentation/packages/asyncio/aiokafka.py index df8e974f8..dcf43aaff 100644 --- a/elasticapm/instrumentation/packages/asyncio/aiokafka.py +++ b/elasticapm/instrumentation/packages/asyncio/aiokafka.py @@ -53,13 +53,13 @@ class AIOKafkaInstrumentation(AsyncAbstractInstrumentedModule): """Instrument the aiokafka's consumer and producer Features: - - Like KafkaInstrumentation, it begins a new transaction on asynchronous - iteration over the consumer if no transaction is active. - - Unlike KafkaInstrumentation, when an active transaction exists, it also - records the last span that awaits a message before StopAsyncIteration - arises. - - Automatic trace context propagation is not supported for messages being - sent via send_batch(). + - Like KafkaInstrumentation, if no transaction is active, it begins a new + transaction on asynchronous iteration over the consumer. + - Unlike KafkaInstrumentation, when an active transaction exists, it even + records StopAsyncIteration on asynchronous iteration over the consumer as + a span. + - It does not support automatic trace context propagation for messages + being sent via send_batch(). """ instrument_list = [ From 5799e9abda02e849120baf2c896ba87c7efa62cd Mon Sep 17 00:00:00 2001 From: Youhei Sakurai Date: Sat, 3 Jan 2026 11:57:33 +0900 Subject: [PATCH 03/10] Return to typing from collections.abc --- .../instrumentation/packages/asyncio/aiokafka.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/elasticapm/instrumentation/packages/asyncio/aiokafka.py b/elasticapm/instrumentation/packages/asyncio/aiokafka.py index dcf43aaff..44c95dc3a 100644 --- a/elasticapm/instrumentation/packages/asyncio/aiokafka.py +++ b/elasticapm/instrumentation/packages/asyncio/aiokafka.py @@ -29,9 +29,20 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import time -from collections.abc import Awaitable, Callable, Container, Iterable, MutableSequence from enum import IntEnum -from typing import TYPE_CHECKING, Dict, List, Optional, TypeVar, cast +from typing import ( + TYPE_CHECKING, + Awaitable, + Callable, + Container, + Dict, + Iterable, + List, + MutableSequence, + Optional, + TypeVar, + cast, +) from elasticapm import Client, get_client from elasticapm.conf.constants import OUTCOME, TRACEPARENT_BINARY_HEADER_NAME From 9c197dd53a875777e46174bfdff65bec0bd2fc9f Mon Sep 17 00:00:00 2001 From: Youhei Sakurai Date: Sun, 4 Jan 2026 13:52:26 +0900 Subject: [PATCH 04/10] Update doc and changelog for aiokafka --- CHANGELOG.asciidoc | 13 +++++++++++++ docs/reference/supported-technologies.md | 18 ++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 5a6bed978..92dfe8502 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -29,6 +29,19 @@ endif::[] //===== Bug fixes // +=== Unreleased + +// Unreleased changes go here +// When the next release happens, nest these changes under the "Python Agent version 6.x" heading +[float] +===== Features + +* Add instrumentation for https://aiokafka.readthedocs.io/en/stable/[`aiokafka`] {pull}2521[#2521] + +[float] +===== Bug fixes + + [[release-notes-6.x]] === Python Agent version 6.x diff --git a/docs/reference/supported-technologies.md b/docs/reference/supported-technologies.md index 47ee5c12b..93a8b03f8 100644 --- a/docs/reference/supported-technologies.md +++ b/docs/reference/supported-technologies.md @@ -490,6 +490,24 @@ Collected trace data: * topic (if applicable) +#### aiokafka [automatic-instrumentation-db-aiokafka] + +Library: `aiokafka` (`>=0.12.0,<1`) + +Instrumented methods: + +* `aiokafka.AIOKafkaConsumer.getone`, +* `aiokafka.AIOKafkaConsumer.getmany`, +* `aiokafka.AIOKafkaProducer.send`, +* `aiokafka.AIOKafkaProducer.send_batch`, +* `aiokafka.AIOKafkaConsumer.__anext__` + +Collected trace data: + +* Destination (address and port) +* topic (if applicable) + + ### External HTTP requests [automatic-instrumentation-http] From 6bfbbf89f5ed9d00a8f118e48768f7bcab92a66d Mon Sep 17 00:00:00 2001 From: Youhei Sakurai Date: Sun, 4 Jan 2026 13:52:58 +0900 Subject: [PATCH 05/10] Add aiokafka-newest to CI files --- .ci/.matrix_framework.yml | 1 + .ci/.matrix_framework_full.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/.ci/.matrix_framework.yml b/.ci/.matrix_framework.yml index 64d1dc8ef..eb595b188 100644 --- a/.ci/.matrix_framework.yml +++ b/.ci/.matrix_framework.yml @@ -58,3 +58,4 @@ FRAMEWORK: - grpc-newest - azurefunctions-newest - azure-newest + - aiokafka-newest diff --git a/.ci/.matrix_framework_full.yml b/.ci/.matrix_framework_full.yml index 6b3a6ea08..699fadf09 100644 --- a/.ci/.matrix_framework_full.yml +++ b/.ci/.matrix_framework_full.yml @@ -92,3 +92,4 @@ FRAMEWORK: #- grpc-1.24 # This appears to have problems with python>3.6? - azurefunctions-newest - azure-newest + - aiokafka-newest From a0c44418bf66ecade5111be14d0f5af276ce698e Mon Sep 17 00:00:00 2001 From: Youhei Sakurai Date: Sun, 4 Jan 2026 14:44:49 +0900 Subject: [PATCH 06/10] Modify the test code to use pytest.importorskip() --- .../asyncio_tests/aiokafka_tests.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/instrumentation/asyncio_tests/aiokafka_tests.py b/tests/instrumentation/asyncio_tests/aiokafka_tests.py index c28f74d1c..a17c793b7 100644 --- a/tests/instrumentation/asyncio_tests/aiokafka_tests.py +++ b/tests/instrumentation/asyncio_tests/aiokafka_tests.py @@ -40,10 +40,8 @@ from elasticapm.instrumentation.packages.asyncio.aiokafka import _inject_trace_parent_into_send_arguments from elasticapm.utils.disttracing import TraceParent -kafka = pytest.importorskip("aiokafka") - -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer -from aiokafka.admin import AIOKafkaAdminClient, NewTopic +aiokafka = pytest.importorskip("aiokafka") +aiokafka_admin = pytest.importorskip("aiokafka.admin") pytestmark = [pytest.mark.aiokafka] @@ -55,10 +53,12 @@ @pytest_asyncio.fixture(scope="function") async def topics(): topics = ["test", "foo", "bar"] - admin_client = AIOKafkaAdminClient(bootstrap_servers=[f"{KAFKA_HOST}:9092"]) + admin_client = aiokafka_admin.AIOKafkaAdminClient(bootstrap_servers=[f"{KAFKA_HOST}:9092"]) await admin_client.start() - await admin_client.create_topics([NewTopic(name, num_partitions=1, replication_factor=1) for name in topics]) + await admin_client.create_topics( + [aiokafka_admin.NewTopic(name, num_partitions=1, replication_factor=1) for name in topics] + ) try: yield topics finally: @@ -67,7 +67,7 @@ async def topics(): @pytest_asyncio.fixture() async def producer(): - producer = AIOKafkaProducer(bootstrap_servers=f"{KAFKA_HOST}:9092") + producer = aiokafka.AIOKafkaProducer(bootstrap_servers=f"{KAFKA_HOST}:9092") await producer.start() await producer.client.bootstrap() try: @@ -78,7 +78,7 @@ async def producer(): @pytest_asyncio.fixture() async def consumer(topics): - consumer = AIOKafkaConsumer(bootstrap_servers=f"{KAFKA_HOST}:9092") + consumer = aiokafka.AIOKafkaConsumer(bootstrap_servers=f"{KAFKA_HOST}:9092") consumer.subscribe(topics=topics) await consumer.start() @@ -371,7 +371,7 @@ def test_aiokafka_inject_trace_parent_into_send_arguments(): headers_list_but_malformed = [tuple()] headers_list_with_other_traceparent = [(TRACEPARENT_BINARY_HEADER_NAME, ...)] headers_none = None - batch = AIOKafkaProducer.create_batch(self=MagicMock()) + batch = aiokafka.AIOKafkaProducer.create_batch(self=MagicMock()) def func(*args, **kwargs): mutable_args = list(args) From ddff8dc4f14d31b7ad0782daa4a23299ce14cbbf Mon Sep 17 00:00:00 2001 From: Youhei Sakurai Date: Sun, 4 Jan 2026 14:47:32 +0900 Subject: [PATCH 07/10] Exclude Py 3.6 for aiokafka-newest --- .ci/.matrix_exclude.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.ci/.matrix_exclude.yml b/.ci/.matrix_exclude.yml index 93d0fa4df..d3a71a29a 100644 --- a/.ci/.matrix_exclude.yml +++ b/.ci/.matrix_exclude.yml @@ -357,3 +357,6 @@ exclude: FRAMEWORK: httpx-0.14 - VERSION: python-3.13 FRAMEWORK: httpx-0.21 + # aiokafka + - PYTHON_VERSION: python-3.6 + FRAMEWORK: aiokafka-newest From afef6f052e7d170e8829daee1b8838bc1b168760 Mon Sep 17 00:00:00 2001 From: Youhei Sakurai Date: Sun, 4 Jan 2026 14:49:39 +0900 Subject: [PATCH 08/10] Correct PYTHON_VERSION to VERSION --- .ci/.matrix_exclude.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci/.matrix_exclude.yml b/.ci/.matrix_exclude.yml index d3a71a29a..e49af465d 100644 --- a/.ci/.matrix_exclude.yml +++ b/.ci/.matrix_exclude.yml @@ -358,5 +358,5 @@ exclude: - VERSION: python-3.13 FRAMEWORK: httpx-0.21 # aiokafka - - PYTHON_VERSION: python-3.6 + - VERSION: python-3.6 FRAMEWORK: aiokafka-newest From 08b4889b19b51c49323a87e5fbedd8677e54297f Mon Sep 17 00:00:00 2001 From: Youhei Sakurai Date: Sun, 4 Jan 2026 15:21:23 +0900 Subject: [PATCH 09/10] Change to do the minimum in capture_span() context --- .../packages/asyncio/aiokafka.py | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/elasticapm/instrumentation/packages/asyncio/aiokafka.py b/elasticapm/instrumentation/packages/asyncio/aiokafka.py index 44c95dc3a..2e19bddc7 100644 --- a/elasticapm/instrumentation/packages/asyncio/aiokafka.py +++ b/elasticapm/instrumentation/packages/asyncio/aiokafka.py @@ -119,8 +119,7 @@ async def call( if transaction: client.end_transaction(result=OUTCOME.SUCCESS) - # May raise StopAsyncIteration - result = await wrapped(*args, **kwargs) + result = await wrapped(*args, **kwargs) # May raise StopAsyncIteration message = cast("ConsumerRecord", result) if client.should_ignore_topic(message.topic): @@ -183,26 +182,26 @@ async def _trace_get( result = await wrapped(*args, **kwargs) - if not span or isinstance(span, DroppedSpan): - return result + if not span or isinstance(span, DroppedSpan): + return result - trace_topics = [ - topic for topic in _extract_topics_from_get_result(result) if not client.should_ignore_topic(topic) - ] + trace_topics = [ + topic for topic in _extract_topics_from_get_result(result) if not client.should_ignore_topic(topic) + ] - if not trace_topics: - span.cancel() - return result + if not trace_topics: + span.cancel() + return result - span.name += f" from {', '.join(trace_topics)}" - cls._enrich_span_context(span, *trace_topics, consumer=instance) + span.name += f" from {', '.join(trace_topics)}" + cls._enrich_span_context(span, *trace_topics, consumer=instance) - for message in _extract_messages_from_get_result(result, include_topics=trace_topics): - trace_parent = _extract_trace_parent_from_message_headers(message.headers) - if trace_parent: - span.add_link(trace_parent) + for message in _extract_messages_from_get_result(result, include_topics=trace_topics): + trace_parent = _extract_trace_parent_from_message_headers(message.headers) + if trace_parent: + span.add_link(trace_parent) - return result + return result @classmethod async def _trace_send( @@ -237,10 +236,10 @@ async def _trace_send( result = await wrapped(*mutable_args, **kwargs) - if span and not isinstance(span, DroppedSpan): - cls._enrich_span_context(span, topic, producer=instance) + if span and not isinstance(span, DroppedSpan): + cls._enrich_span_context(span, topic, producer=instance) - return result + return result @classmethod def _enrich_span_context( From 8bb18a5860ce04db230befa454eb59d1d997ff9c Mon Sep 17 00:00:00 2001 From: Youhei Sakurai Date: Sun, 4 Jan 2026 15:33:21 +0900 Subject: [PATCH 10/10] Empty commit to rerun GH actions