Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions ddtrace/profiling/collector/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
from asyncio.locks import Lock
import typing
import asyncio

from .. import collector
from . import _lock


class _ProfiledAsyncioLock(_lock._ProfiledLock):
pass


class _ProfiledAsyncioSemaphore(_lock._ProfiledLock):
pass


class _ProfiledAsyncioBoundedSemaphore(_lock._ProfiledLock):
pass


class AsyncioLockCollector(_lock.LockCollector):
"""Record asyncio.Lock usage."""

PROFILED_LOCK_CLASS = _ProfiledAsyncioLock
MODULE = asyncio
PATCHED_LOCK_NAME = "Lock"


class AsyncioSemaphoreCollector(_lock.LockCollector):
"""Record asyncio.Semaphore usage."""

PROFILED_LOCK_CLASS = _ProfiledAsyncioSemaphore
MODULE = asyncio
PATCHED_LOCK_NAME = "Semaphore"


class AsyncioBoundedSemaphoreCollector(_lock.LockCollector):
"""Record asyncio.BoundedSemaphore usage."""

def _start_service(self) -> None:
"""Start collecting lock usage."""
try:
import asyncio
except ImportError as e:
raise collector.CollectorUnavailable(e)
self._asyncio_module = asyncio
return super(AsyncioLockCollector, self)._start_service()

def _get_patch_target(self) -> typing.Type[Lock]:
return self._asyncio_module.Lock

def _set_patch_target(
self,
value: typing.Any,
) -> None:
self._asyncio_module.Lock = value # type: ignore[misc]
PROFILED_LOCK_CLASS = _ProfiledAsyncioBoundedSemaphore
MODULE = asyncio
PATCHED_LOCK_NAME = "BoundedSemaphore"
2 changes: 2 additions & 0 deletions ddtrace/profiling/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ def start_collector(collector_class: Type[collector.Collector]) -> None:
("threading", lambda _: start_collector(threading.ThreadingSemaphoreCollector)),
("threading", lambda _: start_collector(threading.ThreadingBoundedSemaphoreCollector)),
("asyncio", lambda _: start_collector(asyncio.AsyncioLockCollector)),
("asyncio", lambda _: start_collector(asyncio.AsyncioSemaphoreCollector)),
("asyncio", lambda _: start_collector(asyncio.AsyncioBoundedSemaphoreCollector)),
]

for module, hook in self._collectors_on_import:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
features:
- |
profiling: Add support for ``asyncio.BoundedSemaphore`` locking type profiling in Python.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
profiling: Add support for ``asyncio.BoundedSemaphore`` locking type profiling in Python.
profiling: Add support for ``asyncio.BoundedSemaphore`` lock type profiling in Python.

Unlike ``threading.BoundedSemaphore``, the asyncio version does not use internal locks,
so no internal lock detection is needed.
Comment on lines +5 to +6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need this in the changelog – as far as I know it's customer facing so we don't need to mention this which is an implementation detail

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
features:
- |
profiling: Add support for ``asyncio.Semaphore`` locking type profiling in Python.
Also refactored asyncio collectors to use base class attributes (``MODULE`` and ``PATCHED_LOCK_NAME``)
for consistency with the threading module collectors.
Comment on lines +5 to +6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here regarding mentioning implementation details – I don't think we should leak those to the customer-facing changelog

148 changes: 122 additions & 26 deletions tests/profiling/collector/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import glob
import os
import sys
from typing import Type
from typing import Union
import uuid

import pytest

from ddtrace import ext
from ddtrace.internal.datadog.profiling import ddup
from ddtrace.profiling.collector import asyncio as collector_asyncio
from ddtrace.profiling.collector.asyncio import AsyncioBoundedSemaphoreCollector
from ddtrace.profiling.collector.asyncio import AsyncioLockCollector
from ddtrace.profiling.collector.asyncio import AsyncioSemaphoreCollector
from tests.profiling.collector import pprof_utils
from tests.profiling.collector import test_collector
from tests.profiling.collector.lock_utils import get_lock_linenos
Expand All @@ -20,16 +24,56 @@

PY_311_OR_ABOVE = sys.version_info[:2] >= (3, 11)

# Type aliases for supported classes
LockTypeClass = Union[Type[asyncio.Lock], Type[asyncio.Semaphore], Type[asyncio.BoundedSemaphore]]
LockTypeInst = Union[asyncio.Lock, asyncio.Semaphore, asyncio.BoundedSemaphore]
Comment on lines +28 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of this instead?

Suggested change
LockTypeClass = Union[Type[asyncio.Lock], Type[asyncio.Semaphore], Type[asyncio.BoundedSemaphore]]
LockTypeInst = Union[asyncio.Lock, asyncio.Semaphore, asyncio.BoundedSemaphore]
LockTypeInst = Union[asyncio.Lock, asyncio.Semaphore, asyncio.BoundedSemaphore]
LockTypeClass = Type[LockTypeInst]


def test_repr():
test_collector._test_repr(
collector_asyncio.AsyncioLockCollector,
"AsyncioLockCollector(status=<ServiceStatus.STOPPED: 'stopped'>, capture_pct=1.0, nframes=64, tracer=None)", # noqa: E501
)
CollectorTypeClass = Union[
Type[AsyncioLockCollector],
Type[AsyncioSemaphoreCollector],
Type[AsyncioBoundedSemaphoreCollector],
]
CollectorTypeInst = Union[AsyncioLockCollector, AsyncioSemaphoreCollector, AsyncioBoundedSemaphoreCollector]


@pytest.mark.parametrize(
"collector_class,expected_repr",
[
(
AsyncioLockCollector,
"AsyncioLockCollector(status=<ServiceStatus.STOPPED: 'stopped'>, capture_pct=1.0, nframes=64, tracer=None)",
),
(
AsyncioSemaphoreCollector,
"AsyncioSemaphoreCollector(status=<ServiceStatus.STOPPED: 'stopped'>, capture_pct=1.0, nframes=64, tracer=None)", # noqa: E501
),
(
AsyncioBoundedSemaphoreCollector,
"AsyncioBoundedSemaphoreCollector(status=<ServiceStatus.STOPPED: 'stopped'>, capture_pct=1.0, nframes=64, tracer=None)", # noqa: E501
),
],
)
def test_collector_repr(collector_class: CollectorTypeClass, expected_repr: str) -> None:
test_collector._test_repr(collector_class, expected_repr)


@pytest.mark.asyncio
class TestAsyncioLockCollector:
class BaseAsyncioLockCollectorTest:
"""Base test class for asyncio lock collectors.

Child classes must implement:
- collector_class: The collector class to test
- lock_class: The asyncio lock class to test
"""

@property
def collector_class(self) -> CollectorTypeClass:
raise NotImplementedError("Child classes must implement collector_class")

@property
def lock_class(self) -> LockTypeClass:
raise NotImplementedError("Child classes must implement lock_class")

def setup_method(self, method):
self.test_name = method.__qualname__ if PY_311_OR_ABOVE else method.__name__
self.output_prefix = "/tmp" + os.sep + self.test_name
Expand All @@ -51,16 +95,17 @@ def teardown_method(self):
except Exception as e:
print("Error while deleting file: ", e)

async def test_asyncio_lock_events(self):
with collector_asyncio.AsyncioLockCollector(capture_pct=100):
lock = asyncio.Lock() # !CREATE! test_asyncio_lock_events
await lock.acquire() # !ACQUIRE! test_asyncio_lock_events
async def test_lock_events(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async def test_lock_events(self):
async def test_lock_events(self) -> None:

I think we need this in order to type-check the body of our tests. By default, mypy doesn't type-check functions whose signature is untyped. (I may be wrong though.)

"""Test basic acquire/release event profiling."""
with self.collector_class(capture_pct=100):
lock = self.lock_class() # !CREATE! asyncio_test_lock_events
await lock.acquire() # !ACQUIRE! asyncio_test_lock_events
assert lock.locked()
lock.release() # !RELEASE! test_asyncio_lock_events
lock.release() # !RELEASE! asyncio_test_lock_events

ddup.upload()

linenos = get_lock_linenos("test_asyncio_lock_events")
linenos = get_lock_linenos("asyncio_test_lock_events")
profile = pprof_utils.parse_newest_profile(self.output_filename)
expected_thread_id = _thread.get_ident()
pprof_utils.assert_lock_events(
Expand All @@ -85,29 +130,30 @@ async def test_asyncio_lock_events(self):
],
)

async def test_asyncio_lock_events_tracer(self, tracer):
async def test_lock_events_tracer(self, tracer):
"""Test event profiling with tracer integration."""
tracer._endpoint_call_counter_span_processor.enable()
resource = str(uuid.uuid4())
span_type = ext.SpanTypes.WEB

with collector_asyncio.AsyncioLockCollector(capture_pct=100, tracer=tracer):
lock = asyncio.Lock() # !CREATE! test_asyncio_lock_events_tracer_1
await lock.acquire() # !ACQUIRE! test_asyncio_lock_events_tracer_1
with self.collector_class(capture_pct=100, tracer=tracer):
lock = self.lock_class() # !CREATE! asyncio_test_lock_events_tracer_1
await lock.acquire() # !ACQUIRE! asyncio_test_lock_events_tracer_1
with tracer.trace("test", resource=resource, span_type=span_type) as t:
lock2 = asyncio.Lock() # !CREATE! test_asyncio_lock_events_tracer_2
await lock2.acquire() # !ACQUIRE! test_asyncio_lock_events_tracer_2
lock.release() # !RELEASE! test_asyncio_lock_events_tracer_1
lock2 = self.lock_class() # !CREATE! asyncio_test_lock_events_tracer_2
await lock2.acquire() # !ACQUIRE! asyncio_test_lock_events_tracer_2
lock.release() # !RELEASE! asyncio_test_lock_events_tracer_1
span_id = t.span_id
lock2.release() # !RELEASE! test_asyncio_lock_events_tracer_2
lock2.release() # !RELEASE! asyncio_test_lock_events_tracer_2

lock_ctx = asyncio.Lock() # !CREATE! test_asyncio_lock_events_tracer_3
async with lock_ctx: # !ACQUIRE! !RELEASE! test_asyncio_lock_events_tracer_3
lock_ctx = self.lock_class() # !CREATE! asyncio_test_lock_events_tracer_3
async with lock_ctx: # !ACQUIRE! !RELEASE! asyncio_test_lock_events_tracer_3
pass
ddup.upload(tracer=tracer)

linenos_1 = get_lock_linenos("test_asyncio_lock_events_tracer_1")
linenos_2 = get_lock_linenos("test_asyncio_lock_events_tracer_2")
linenos_3 = get_lock_linenos("test_asyncio_lock_events_tracer_3", with_stmt=True)
linenos_1 = get_lock_linenos("asyncio_test_lock_events_tracer_1")
linenos_2 = get_lock_linenos("asyncio_test_lock_events_tracer_2")
linenos_3 = get_lock_linenos("asyncio_test_lock_events_tracer_3", with_stmt=True)

profile = pprof_utils.parse_newest_profile(self.output_filename)
expected_thread_id = _thread.get_ident()
Expand Down Expand Up @@ -167,3 +213,53 @@ async def test_asyncio_lock_events_tracer(self, tracer):
),
],
)


class TestAsyncioLockCollector(BaseAsyncioLockCollectorTest):
"""Test asyncio.Lock profiling."""

@property
def collector_class(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have type hints here?

return AsyncioLockCollector

@property
def lock_class(self):
return asyncio.Lock


class TestAsyncioSemaphoreCollector(BaseAsyncioLockCollectorTest):
"""Test asyncio.Semaphore profiling."""

@property
def collector_class(self):
return AsyncioSemaphoreCollector

@property
def lock_class(self):
return asyncio.Semaphore


class TestAsyncioBoundedSemaphoreCollector(BaseAsyncioLockCollectorTest):
"""Test asyncio.BoundedSemaphore profiling."""

@property
def collector_class(self):
return AsyncioBoundedSemaphoreCollector

@property
def lock_class(self):
return asyncio.BoundedSemaphore

async def test_bounded_behavior_preserved(self):
"""Test that profiling wrapper preserves BoundedSemaphore's bounded behavior.

This verifies the wrapper doesn't interfere with BoundedSemaphore's unique characteristic:
raising ValueError when releasing beyond the initial value.
"""
with self.collector_class(capture_pct=100):
bs = asyncio.BoundedSemaphore(1)
await bs.acquire()
bs.release()
# BoundedSemaphore should raise ValueError when releasing more than initial value
with pytest.raises(ValueError, match="BoundedSemaphore released too many times"):
bs.release()
2 changes: 2 additions & 0 deletions tests/profiling/test_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ def test_default_collectors():
pass
else:
assert any(isinstance(c, asyncio.AsyncioLockCollector) for c in p._profiler._collectors)
assert any(isinstance(c, asyncio.AsyncioSemaphoreCollector) for c in p._profiler._collectors)
assert any(isinstance(c, asyncio.AsyncioBoundedSemaphoreCollector) for c in p._profiler._collectors)
p.stop(flush=False)


Expand Down
Loading