diff --git a/.ci/.matrix_exclude.yml b/.ci/.matrix_exclude.yml index 93d0fa4df..e49af465d 100644 --- a/.ci/.matrix_exclude.yml +++ b/.ci/.matrix_exclude.yml @@ -357,3 +357,6 @@ exclude: FRAMEWORK: httpx-0.14 - VERSION: python-3.13 FRAMEWORK: httpx-0.21 + # aiokafka + - VERSION: python-3.6 + FRAMEWORK: aiokafka-newest diff --git a/.ci/.matrix_framework.yml b/.ci/.matrix_framework.yml index 64d1dc8ef..eb595b188 100644 --- a/.ci/.matrix_framework.yml +++ b/.ci/.matrix_framework.yml @@ -58,3 +58,4 @@ FRAMEWORK: - grpc-newest - azurefunctions-newest - azure-newest + - aiokafka-newest diff --git a/.ci/.matrix_framework_full.yml b/.ci/.matrix_framework_full.yml index 6b3a6ea08..699fadf09 100644 --- a/.ci/.matrix_framework_full.yml +++ b/.ci/.matrix_framework_full.yml @@ -92,3 +92,4 @@ FRAMEWORK: #- grpc-1.24 # This appears to have problems with python>3.6? - azurefunctions-newest - azure-newest + - aiokafka-newest diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 5a6bed978..92dfe8502 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -29,6 +29,19 @@ endif::[] //===== Bug fixes // +=== Unreleased + +// Unreleased changes go here +// When the next release happens, nest these changes under the "Python Agent version 6.x" heading +[float] +===== Features + +* Add instrumentation for https://aiokafka.readthedocs.io/en/stable/[`aiokafka`] {pull}2521[#2521] + +[float] +===== Bug fixes + + [[release-notes-6.x]] === Python Agent version 6.x diff --git a/docs/reference/supported-technologies.md b/docs/reference/supported-technologies.md index 47ee5c12b..93a8b03f8 100644 --- a/docs/reference/supported-technologies.md +++ b/docs/reference/supported-technologies.md @@ -490,6 +490,24 @@ Collected trace data: * topic (if applicable) +#### aiokafka [automatic-instrumentation-db-aiokafka] + +Library: `aiokafka` (`>=0.12.0,<1`) + +Instrumented methods: + +* `aiokafka.AIOKafkaConsumer.getone`, +* `aiokafka.AIOKafkaConsumer.getmany`, +* `aiokafka.AIOKafkaProducer.send`, +* `aiokafka.AIOKafkaProducer.send_batch`, +* `aiokafka.AIOKafkaConsumer.__anext__` + +Collected trace data: + +* Destination (address and port) +* topic (if applicable) + + ### External HTTP requests [automatic-instrumentation-http] diff --git a/elasticapm/instrumentation/packages/asyncio/aiokafka.py b/elasticapm/instrumentation/packages/asyncio/aiokafka.py new file mode 100644 index 000000000..2e19bddc7 --- /dev/null +++ b/elasticapm/instrumentation/packages/asyncio/aiokafka.py @@ -0,0 +1,382 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import time +from enum import IntEnum +from typing import ( + TYPE_CHECKING, + Awaitable, + Callable, + Container, + Dict, + Iterable, + List, + MutableSequence, + Optional, + TypeVar, + cast, +) + +from elasticapm import Client, get_client +from elasticapm.conf.constants import OUTCOME, TRACEPARENT_BINARY_HEADER_NAME +from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule +from elasticapm.traces import DroppedSpan, Span, Transaction, capture_span, execution_context +from elasticapm.utils.disttracing import TraceParent + +if TYPE_CHECKING: + from aiokafka import AIOKafkaConsumer, AIOKafkaProducer, ConsumerRecord, TopicPartition # pragma: no cover + + +class _KafkaTimestampType(IntEnum): + NO_TIMESTAMP_TYPE = -1 + CREATE_TIME = 0 + LOG_APPEND_TIME = 1 + + +class AIOKafkaInstrumentation(AsyncAbstractInstrumentedModule): + """Instrument the aiokafka's consumer and producer + + Features: + - Like KafkaInstrumentation, if no transaction is active, it begins a new + transaction on asynchronous iteration over the consumer. + - Unlike KafkaInstrumentation, when an active transaction exists, it even + records StopAsyncIteration on asynchronous iteration over the consumer as + a span. + - It does not support automatic trace context propagation for messages + being sent via send_batch(). + """ + + instrument_list = [ + ("aiokafka", "AIOKafkaConsumer.getone"), + ("aiokafka", "AIOKafkaConsumer.getmany"), + ("aiokafka", "AIOKafkaProducer.send"), + ("aiokafka", "AIOKafkaProducer.send_batch"), + ("aiokafka", "AIOKafkaConsumer.__anext__"), + ] + name = "aiokafka" + creates_transactions = True + + SPAN_TYPE = SERVICE_TYPE = TRANSACTION_TYPE = "messaging" + SPAN_SUBTYPE = SERVICE_NAME = "kafka" + + T_Result = TypeVar("T_Result") + + async def call( + self, + module: str, + method: str, + wrapped: Callable[..., Awaitable[T_Result]], + instance: Optional[object], + args: tuple, + kwargs: dict, + ) -> T_Result: + + client = get_client() + if not client: + return await wrapped(*args, **kwargs) + + transaction = execution_context.get_transaction() + + if method == "AIOKafkaConsumer.__anext__": + # If no transaction exists, we create and start new ones implicitly + # like we do in KafkaInstrumentation. + + if transaction and transaction.transaction_type != self.TRANSACTION_TYPE: + # Somebody started a transaction outside of the consumer, + # so we will only capture subsequent getone() as a span. + return await wrapped(*args, **kwargs) + + # No transaction running, or this is a transaction started by us, + # so let's end it and start the next, + # unless a StopAsyncIteration is raised, at which point we do nothing. + if transaction: + client.end_transaction(result=OUTCOME.SUCCESS) + + result = await wrapped(*args, **kwargs) # May raise StopAsyncIteration + message = cast("ConsumerRecord", result) + + if client.should_ignore_topic(message.topic): + return result + + trace_parent = _extract_trace_parent_from_message_headers(message.headers) + transaction = client.begin_transaction(self.TRANSACTION_TYPE, trace_parent=trace_parent) + + if not transaction: + return result + + transaction.name = f"Kafka RECEIVE from {message.topic}" + self._enrich_transaction_context( + transaction, message.topic, timestamp_type=message.timestamp_type, timestamp=message.timestamp + ) + + return result + + elif not transaction: + return await wrapped(*args, **kwargs) + + elif method.startswith("AIOKafkaConsumer.get"): + return await self._trace_get( + wrapped, + cast(Optional["AIOKafkaConsumer"], instance), + args, + kwargs, + client=client, + ) + + else: + return await self._trace_send( + wrapped, + cast(Optional["AIOKafkaProducer"], instance), + args, + kwargs, + client=client, + trace_parent=transaction.trace_parent, + ) + + @classmethod + async def _trace_get( + cls, + wrapped: Callable[..., Awaitable[T_Result]], + instance: Optional["AIOKafkaConsumer"], + args: tuple, + kwargs: dict, + *, + client: Client, + ) -> T_Result: + """Trace the consumer's get() and getmany() by capturing a span""" + + with capture_span( + name="Kafka RECEIVE", + leaf=True, + span_type=cls.SPAN_TYPE, + span_subtype=cls.SPAN_SUBTYPE, + span_action="receive", + ) as span: + + result = await wrapped(*args, **kwargs) + + if not span or isinstance(span, DroppedSpan): + return result + + trace_topics = [ + topic for topic in _extract_topics_from_get_result(result) if not client.should_ignore_topic(topic) + ] + + if not trace_topics: + span.cancel() + return result + + span.name += f" from {', '.join(trace_topics)}" + cls._enrich_span_context(span, *trace_topics, consumer=instance) + + for message in _extract_messages_from_get_result(result, include_topics=trace_topics): + trace_parent = _extract_trace_parent_from_message_headers(message.headers) + if trace_parent: + span.add_link(trace_parent) + + return result + + @classmethod + async def _trace_send( + cls, + wrapped: Callable[..., Awaitable[T_Result]], + instance: Optional["AIOKafkaProducer"], + args: tuple, + kwargs: dict, + *, + client: Client, + trace_parent: TraceParent, + ) -> T_Result: + """Trace the producer's send() and send_batch() by capturing a span""" + + topic = _extract_topic_from_send_arguments(args, kwargs) + if client.should_ignore_topic(topic): + return await wrapped(*args, **kwargs) + + with capture_span( + name=f"Kafka SEND to {topic}", + leaf=True, + span_type=cls.SPAN_TYPE, + span_subtype=cls.SPAN_SUBTYPE, + span_action="send", + ) as span: + + if span and not isinstance(span, DroppedSpan): + trace_parent = trace_parent.copy_from(span_id=span.id) + + mutable_args = list(args) + _inject_trace_parent_into_send_arguments(mutable_args, kwargs, trace_parent) + + result = await wrapped(*mutable_args, **kwargs) + + if span and not isinstance(span, DroppedSpan): + cls._enrich_span_context(span, topic, producer=instance) + + return result + + @classmethod + def _enrich_span_context( + cls, + span: Span, + topic: str, + *topics: str, + producer: Optional["AIOKafkaProducer"] = None, + consumer: Optional["AIOKafkaConsumer"] = None, + ): + + destination_service = {"type": cls.SERVICE_TYPE, "name": cls.SERVICE_NAME} + service_framework = {"name": "Kafka"} + + span.context.setdefault("destination", {}).setdefault("service", {}).update(destination_service) + span.context.setdefault("service", {}).setdefault("framework", {}).update(service_framework) + + if not topics: + span.context["destination"]["service"]["resource"] = f"{cls.SERVICE_NAME}/{topic}" + span.context.setdefault("message", {}).setdefault("queue", {}).update({"name": topic}) + + if producer and producer.client.cluster.controller: + span.context["destination"]["address"] = producer.client.cluster.controller.host + span.context["destination"]["port"] = producer.client.cluster.controller.port + + @classmethod + def _enrich_transaction_context( + cls, + transaction: Transaction, + topic: str, + *, + timestamp_type: int, + timestamp: int, + ): + + destination_service = { + "type": cls.SERVICE_TYPE, + "name": cls.SERVICE_NAME, + "resource": f"{cls.SERVICE_NAME}/{topic}", + } + message_queue = {"name": topic} + service_framework = {"name": "Kafka"} + + transaction.context.setdefault("destination", {}).setdefault("service", {}).update(destination_service) + transaction.context.setdefault("message", {}).setdefault("queue", {}).update(message_queue) + transaction.context.setdefault("service", {}).setdefault("framework", {}).update(service_framework) + + if timestamp_type == _KafkaTimestampType.CREATE_TIME: + current_time_millis = int(round(time.time() * 1000)) + age = current_time_millis - timestamp + transaction.context["message"].setdefault("age", {}).update({"ms": age}) + + +def _extract_trace_parent_from_message_headers(headers: Optional[Iterable]) -> Optional[TraceParent]: + + for key, value in headers or []: + if key == TRACEPARENT_BINARY_HEADER_NAME: + return TraceParent.from_binary(value) + + return None + + +def _extract_topics_from_get_result(result) -> Iterable[str]: + + if hasattr(result, "topic"): + message = cast("ConsumerRecord", result) # from getone() + yield message.topic + + else: + messages = cast(Dict["TopicPartition", List["ConsumerRecord"]], result) # from getmany() + for topic_partition in messages: + yield topic_partition.topic + + +def _extract_messages_from_get_result(result, *, include_topics: Container[str] = ()) -> Iterable["ConsumerRecord"]: + + if hasattr(result, "topic"): + message = cast("ConsumerRecord", result) # from getone() + if message.topic in include_topics: + yield message + + else: + messages = cast(Dict["TopicPartition", List["ConsumerRecord"]], result) # from getmany() + for topic_partition in messages: + if topic_partition.topic not in include_topics: + continue + yield from messages[topic_partition] + + +def _has_append_method(obj: object) -> bool: + + return hasattr(obj, "append") and callable(getattr(obj, "append")) + + +def _extract_topic_from_send_arguments(args: tuple, kwargs: dict) -> str: + + if "topic" in kwargs: + return kwargs["topic"] + + elif _has_append_method(args[0]): + # The first argument of the producer's send_batch() may be 'BatchBuilder' + # which has 'append' method. If that's the case, the second one is 'topic'. + return args[1] + + return args[0] + + +def _inject_trace_parent_into_send_arguments(args: list, kwargs: dict, trace_parent: TraceParent): + + if "batch" in kwargs or args and _has_append_method(args[0]): + return # Injection is not practical as messages are already encoded in the batch + + if "headers" in kwargs: + headers = kwargs["headers"] + if headers is None: + headers = kwargs["headers"] = [] + + else: + # headers is the 6th parameter in send() + headers_position_in_args = 5 # 6th parameter, 0-indexed + for preceding_parameter in ["topic", "value", "key", "partition", "timestamp_ms"]: + if preceding_parameter in kwargs: + headers_position_in_args -= 1 + + try: + headers = args[headers_position_in_args] + except IndexError: + headers = kwargs["headers"] = [] + else: + if headers is None: + headers = args[headers_position_in_args] = [] + + if not isinstance(headers, MutableSequence): + # headers may also be a tuple, for example + raise TypeError(f"'headers' is not a MutableSequence, got {type(headers).__name__}") + + # Injecting trace parent after removing any existing one; Here, we retain + # even a header with zero elements as we are not in a position to remove it. + headers[:] = [header for header in headers if not header or header[0] != TRACEPARENT_BINARY_HEADER_NAME] + headers.append((TRACEPARENT_BINARY_HEADER_NAME, trace_parent.to_binary())) diff --git a/elasticapm/instrumentation/register.py b/elasticapm/instrumentation/register.py index b37aff1e9..b8196c74f 100644 --- a/elasticapm/instrumentation/register.py +++ b/elasticapm/instrumentation/register.py @@ -96,6 +96,7 @@ "elasticapm.instrumentation.packages.asyncio.redis_asyncio.RedisPipelineInstrumentation", "elasticapm.instrumentation.packages.asyncio.psycopg_async.AsyncPsycopgInstrumentation", "elasticapm.instrumentation.packages.grpc.GRPCAsyncServerInstrumentation", + "elasticapm.instrumentation.packages.asyncio.aiokafka.AIOKafkaInstrumentation", ] ) diff --git a/setup.cfg b/setup.cfg index 0a56a9b01..5515ccda9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -113,6 +113,7 @@ markers = kafka grpc azurestorage + aiokafka addopts=--random-order [isort] diff --git a/tests/instrumentation/asyncio_tests/aiokafka_tests.py b/tests/instrumentation/asyncio_tests/aiokafka_tests.py new file mode 100644 index 000000000..a17c793b7 --- /dev/null +++ b/tests/instrumentation/asyncio_tests/aiokafka_tests.py @@ -0,0 +1,429 @@ +# BSD 3-Clause License +# +# Copyright (c) 2022, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import asyncio +import os +from unittest.mock import MagicMock + +import pytest +import pytest_asyncio + +import elasticapm +from elasticapm.conf.constants import OUTCOME, SPAN, TRACEPARENT_BINARY_HEADER_NAME, TRANSACTION +from elasticapm.instrumentation.packages.asyncio.aiokafka import _inject_trace_parent_into_send_arguments +from elasticapm.utils.disttracing import TraceParent + +aiokafka = pytest.importorskip("aiokafka") +aiokafka_admin = pytest.importorskip("aiokafka.admin") + +pytestmark = [pytest.mark.aiokafka] + +KAFKA_HOST = os.environ.get("KAFKA_HOST") +if not KAFKA_HOST: + pytestmark.append(pytest.mark.skip("Skipping aiokafka tests, no KAFKA_HOST environment variable set")) + + +@pytest_asyncio.fixture(scope="function") +async def topics(): + topics = ["test", "foo", "bar"] + admin_client = aiokafka_admin.AIOKafkaAdminClient(bootstrap_servers=[f"{KAFKA_HOST}:9092"]) + + await admin_client.start() + await admin_client.create_topics( + [aiokafka_admin.NewTopic(name, num_partitions=1, replication_factor=1) for name in topics] + ) + try: + yield topics + finally: + await admin_client.delete_topics(topics) + + +@pytest_asyncio.fixture() +async def producer(): + producer = aiokafka.AIOKafkaProducer(bootstrap_servers=f"{KAFKA_HOST}:9092") + await producer.start() + await producer.client.bootstrap() + try: + yield producer + finally: + await producer.stop() + + +@pytest_asyncio.fixture() +async def consumer(topics): + consumer = aiokafka.AIOKafkaConsumer(bootstrap_servers=f"{KAFKA_HOST}:9092") + consumer.subscribe(topics=topics) + await consumer.start() + + async def consumer_close_later(): + await asyncio.sleep(1.0) + await consumer.stop() + + task = asyncio.create_task(consumer_close_later()) + try: + yield consumer + finally: + await task + + +@pytest.mark.asyncio +async def test_aiokafka_produce(instrument, elasticapm_client, producer, topics): + elasticapm_client.begin_transaction("test") + await producer.send("test", key=b"foo", value=b"bar") + elasticapm_client.end_transaction("test", "success") + transactions = elasticapm_client.events[TRANSACTION] + span = elasticapm_client.events[SPAN][0] + assert span["name"] == "Kafka SEND to test" + assert span["context"]["message"]["queue"]["name"] == "test" + assert span["context"]["destination"]["port"] == 9092 + assert span["context"]["destination"]["service"]["name"] == "kafka" + assert span["context"]["destination"]["service"]["resource"] == "kafka/test" + assert span["context"]["destination"]["service"]["type"] == "messaging" + + +@pytest.mark.asyncio +async def test_aiokafka_produce_ignore_topic(instrument, elasticapm_client, producer, topics): + elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") + elasticapm_client.begin_transaction("test") + await producer.send(topic="foo", key=b"foo", value=b"bar") + await producer.send("bar", key=b"foo", value=b"bar") + await producer.send("test", key=b"foo", value=b"bar") + elasticapm_client.end_transaction("test", "success") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + assert spans[0]["name"] == "Kafka SEND to test" + + +@pytest.mark.asyncio +async def test_aiokafka_consume(instrument, elasticapm_client, producer, consumer, topics): + async def delayed_send(): + await asyncio.sleep(0.2) + elasticapm_client.begin_transaction("foo") + await producer.send("test", key=b"foo", value=b"bar") + await producer.send("test", key=b"baz", value=b"bazzinga") + elasticapm_client.end_transaction("foo") + + task = asyncio.create_task(delayed_send()) + async for item in consumer: + with elasticapm.capture_span("foo"): + pass + await task + transactions = elasticapm_client.events[TRANSACTION] + spans = elasticapm_client.events[SPAN] + # the consumer transactions should have the same trace id as the transaction that triggered the messages + assert transactions[0]["trace_id"] == transactions[1]["trace_id"] == transactions[2]["trace_id"] + assert transactions[1]["name"] == "Kafka RECEIVE from test" + assert transactions[1]["type"] == "messaging" + assert transactions[1]["context"]["message"]["queue"]["name"] == "test" + + assert spans[2]["transaction_id"] == transactions[1]["id"] + assert spans[3]["transaction_id"] == transactions[2]["id"] + + +@pytest.mark.asyncio +async def test_aiokafka_consume_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): + async def delayed_send(): + await asyncio.sleep(0.2) + elasticapm_client.begin_transaction("foo") + await producer.send("test", key=b"foo", value=b"bar") + await producer.send("test", key=b"baz", value=b"bazzinga") + elasticapm_client.end_transaction("foo") + + task = asyncio.create_task(delayed_send()) + transaction = elasticapm_client.begin_transaction("foo") + async for item in consumer: + pass + await task + elasticapm_client.end_transaction("foo") + transactions = elasticapm_client.events[TRANSACTION] + assert len(transactions) == 2 + external_spans = elasticapm_client.spans_for_transaction(transactions[0]) + spans = elasticapm_client.spans_for_transaction(transactions[1]) + assert len(external_spans) == 2 + assert len(spans) == 3 + assert spans[0]["links"][0]["trace_id"] == external_spans[0]["trace_id"] + assert spans[1]["links"][0]["trace_id"] == external_spans[1]["trace_id"] + # It records the last span that was awaiting when stopping the consumer as a failure. + assert spans[2]["outcome"] == OUTCOME.FAILURE + assert "links" not in spans[2] + + +@pytest.mark.asyncio +async def test_aiokafka_consumer_ignore_topic(instrument, elasticapm_client, producer, consumer, topics): + elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") + + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send(topic="foo", key=b"foo", value=b"bar") + await producer.send("bar", key=b"foo", value=b"bar") + await producer.send("test", key=b"foo", value=b"bar") + + task = asyncio.create_task(delayed_send()) + async for item in consumer: + with elasticapm.capture_span("test"): + assert item + await task + transactions = elasticapm_client.events[TRANSACTION] + assert len(transactions) == 1 + assert transactions[0]["name"] == "Kafka RECEIVE from test" + + +@pytest.mark.asyncio +async def test_aiokafka_consumer_ignore_topic_ongoing_transaction( + instrument, elasticapm_client, producer, consumer, topics +): + elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") + + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send(topic="foo", key=b"foo", value=b"bar") + await producer.send("bar", key=b"foo", value=b"bar") + await producer.send("test", key=b"foo", value=b"bar") + + task = asyncio.create_task(delayed_send()) + transaction = elasticapm_client.begin_transaction("foo") + async for item in consumer: + pass + await task + elasticapm_client.end_transaction("foo") + transactions = elasticapm_client.events[TRANSACTION] + spans = elasticapm_client.spans_for_transaction(transactions[0]) + assert len(spans) == 2 + assert spans[0]["name"] == "Kafka RECEIVE from test" + # It records the last span that was awaiting when stopping the consumer as a failure. + assert spans[1]["outcome"] == OUTCOME.FAILURE + assert spans[1]["name"] == "Kafka RECEIVE" + + +@pytest.mark.asyncio +async def test_aiokafka_getmany_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send("test", key=b"foo", value=b"bar") + await producer.send("test", key=b"baz", value=b"bazzinga") + + task = asyncio.create_task(delayed_send()) + transaction = elasticapm_client.begin_transaction("foo") + results = await consumer.getmany(timeout_ms=1000) + await task + elasticapm_client.end_transaction("foo") + transactions = elasticapm_client.events[TRANSACTION] + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + assert spans[0]["name"] == "Kafka RECEIVE from test" + + +@pytest.mark.asyncio +async def test_aiokafka_no_client(instrument, producer, consumer, topics): + assert elasticapm.get_client() is None + # the following code shouldn't trigger any errors + await producer.send_and_wait("test", key=b"foo", value=b"bar") + async for item in consumer: + pass + + +@pytest.mark.asyncio +async def test_aiokafka_send_unsampled_transaction(instrument, elasticapm_client, producer, topics): + transaction_object = elasticapm_client.begin_transaction("transaction") + transaction_object.is_sampled = False + await producer.send("test", key=b"foo", value=b"bar") + elasticapm_client.end_transaction("foo") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 0 + + +@pytest.mark.asyncio +async def test_aiokafka_getmany_unsampled_transaction(instrument, elasticapm_client, consumer, topics): + transaction_object = elasticapm_client.begin_transaction("transaction") + transaction_object.is_sampled = False + await consumer.getmany(timeout_ms=50) + elasticapm_client.end_transaction("foo") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 0 + + +@pytest.mark.asyncio +async def test_aiokafka_consumer_unsampled_transaction_handles_stop_iteration( + instrument, elasticapm_client, producer, consumer, topics +): + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send("test", key=b"foo", value=b"bar") + + task = asyncio.create_task(delayed_send()) + transaction = elasticapm_client.begin_transaction("foo") + transaction.is_sampled = False + async for item in consumer: + pass + await task + elasticapm_client.end_transaction("foo") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 0 + + +@pytest.mark.asyncio +async def test_aiokafka_getmany_multiple_topics(instrument, elasticapm_client, producer, consumer, topics): + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send("foo", key=b"foo", value=b"bar") + await producer.send("bar", key=b"foo", value=b"bar") + await producer.send("test", key=b"baz", value=b"bazzinga") + + task = asyncio.create_task(delayed_send()) + await asyncio.sleep(0.5) # Wait a while so it can get all messages at once. + + elasticapm_client.config.update("1", ignore_message_queues="bar") + elasticapm_client.begin_transaction("foo") + results = await consumer.getmany() + await task + elasticapm_client.end_transaction() + + assert len(results) == 3 + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + assert spans[0]["name"] == "Kafka RECEIVE from foo, test" + + +@pytest.mark.asyncio +async def test_aiokafka_send_batch(instrument, elasticapm_client, producer, consumer, topics): + async def delayed_send(): + await asyncio.sleep(0.2) + + batch = producer.create_batch() + batch.append(key=b"foo", value=b"bar", timestamp=None) + batch.append(key=b"baz", value=b"bazzinga", timestamp=None) + partitions = await producer.partitions_for("test") + + elasticapm_client.begin_transaction("send") + await producer.send_batch(batch, "test", partition=partitions.pop()) + elasticapm_client.end_transaction() + + task = asyncio.create_task(delayed_send()) + elasticapm_client.begin_transaction("recv") + async for _ in consumer: + pass + await task + elasticapm_client.end_transaction() + await task + + tran_send, tran_recv = elasticapm_client.events[TRANSACTION] + spans_send = elasticapm_client.spans_for_transaction(tran_send) + spans_recv = elasticapm_client.spans_for_transaction(tran_recv) + + assert len(spans_send) == 1 + assert len(spans_recv) == 3 + assert "links" not in spans_recv[0] + assert "links" not in spans_recv[1] + # It records the last span that was awaiting when stopping the consumer as a failure. + assert spans_recv[2]["outcome"] == OUTCOME.FAILURE + assert "links" not in spans_recv[2] + assert all(spans_send[0]["trace_id"] != span_recv["trace_id"] for span_recv in spans_recv) + + +@pytest.mark.asyncio +async def test_aiokafka_consumer_non_recording_transaction(instrument, elasticapm_client, producer, consumer, topics): + elasticapm_client.config.update("1", recording=False) + + async def delayed_send(): + await asyncio.sleep(0.2) + await producer.send("test", key=b"foo", value=b"bar") + + task = asyncio.create_task(delayed_send()) + async for item in consumer: + pass + await task + transactions = elasticapm_client.events[TRANSACTION] + assert len(transactions) == 0 + + +def test_aiokafka_inject_trace_parent_into_send_arguments(): + trace_parent = TraceParent.from_string("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + assert trace_parent + traceparent_header = (TRACEPARENT_BINARY_HEADER_NAME, trace_parent.to_binary()) + headers_list = [("header_name", "header_value")] + headers_list_but_malformed = [tuple()] + headers_list_with_other_traceparent = [(TRACEPARENT_BINARY_HEADER_NAME, ...)] + headers_none = None + batch = aiokafka.AIOKafkaProducer.create_batch(self=MagicMock()) + + def func(*args, **kwargs): + mutable_args = list(args) + _inject_trace_parent_into_send_arguments(mutable_args, kwargs, trace_parent) + return mutable_args, kwargs + + # send(topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None) + args, kwargs = func("topic", key=..., value=...) + assert traceparent_header in kwargs["headers"] + + args, kwargs = func("topic", key=..., value=..., headers=headers_list) + assert traceparent_header in kwargs["headers"] + assert len(kwargs["headers"]) == 2 # The traceparent header should be appended + + args, kwargs = func("topic", key=..., value=..., headers=headers_list_but_malformed) + assert traceparent_header in kwargs["headers"] + assert len(kwargs["headers"]) == 2 # Injection should succeed despite malformed headers + + args, kwargs = func("topic", key=..., value=..., headers=headers_list_with_other_traceparent) + assert traceparent_header in kwargs["headers"] + assert len(kwargs["headers"]) == 1 # The traceparent header should be overwritten + + args, kwargs = func("topic", key=..., value=..., headers=headers_none) + assert traceparent_header in kwargs["headers"] + + args, kwargs = func("topic", "value", "key", "partition", "timestamp_ms") + assert traceparent_header in kwargs["headers"] + + args, kwargs = func("topic", "partition", headers_none, key=..., value=..., timestamp_ms=...) + assert traceparent_header in args[2] + + args, kwargs = func("topic", "value", "key", "partition", "timestamp_ms", headers_none) + assert traceparent_header in args[5] + + args, kwargs = func("topic", "value", "key", "partition", "timestamp_ms", headers_list) + assert traceparent_header in args[5] + + with pytest.raises(TypeError, match=r"\bMutableSequence\b"): + args, kwargs = func("topic", "value", headers=tuple()) + + with pytest.raises(TypeError, match=r"\bMutableSequence\b"): + args, kwargs = func("topic", "value", headers=dict()) + + # send_batch(batch, topic, *, partition) + args, kwargs = func(batch, "topic", partition=...) + assert "headers" not in kwargs + assert len(args) == 2 + + args, kwargs = func("topic", partition=..., batch=...) + assert "headers" not in kwargs + assert len(args) == 1 + + args, kwargs = func(topic=..., partition=..., batch=...) + assert "headers" not in kwargs + assert len(args) == 0 diff --git a/tests/requirements/reqs-aiokafka-newest.txt b/tests/requirements/reqs-aiokafka-newest.txt new file mode 100644 index 000000000..b80dd3847 --- /dev/null +++ b/tests/requirements/reqs-aiokafka-newest.txt @@ -0,0 +1,2 @@ +aiokafka +-r reqs-base.txt diff --git a/tests/scripts/envs/aiokafka.sh b/tests/scripts/envs/aiokafka.sh new file mode 100644 index 000000000..e55debde2 --- /dev/null +++ b/tests/scripts/envs/aiokafka.sh @@ -0,0 +1,5 @@ +export PYTEST_MARKER="-m aiokafka" +export DOCKER_DEPS="kafka" +export KAFKA_HOST="kafka" +export WAIT_FOR_HOST="kafka" +export WAIT_FOR_PORT=9092