Skip to content

Commit 8bf420d

Browse files
feat: add worker queue and active task OTel metrics
- Add worker_active_tasks UpDownCounter driven by pre/post_execute hooks - Add worker_prefetched_tasks UpDownCounter via on_prefetch_queue_add/remove hooks in receiver - Add worker_cpu_utilization and worker_memory_utilization observable gauges (worker process only) - Add tests for all new metrics
1 parent c6270e8 commit 8bf420d

3 files changed

Lines changed: 103 additions & 4 deletions

File tree

taskiq/middlewares/opentelemetry_middleware.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import logging
2+
from collections.abc import Generator
23
from contextlib import AbstractContextManager
34
from datetime import datetime, timezone
45
from importlib.metadata import version
5-
from typing import Any, Generator, TypeVar
6-
import psutil
6+
from typing import Any, TypeVar
77

8+
import psutil
89
from packaging.version import Version, parse
910

1011
try:
@@ -223,11 +224,24 @@ def __init__(
223224
description="Worker memory utilization in bytes. Only for worker processes",
224225
)
225226

226-
def _observe_memory(self, _options: Any) -> Generator[Observation, None, None]:
227+
# 8- Number of tasks executing
228+
self.number_of_broker_active_tasks = self._meter.create_up_down_counter(
229+
"worker_active_tasks",
230+
unit="1",
231+
description="Number of tasks currently executing in the worker.",
232+
)
233+
# 9- Number of tasks executing
234+
self.number_of_broker_prefetched_tasks = self._meter.create_up_down_counter(
235+
"worker_prefetched_tasks",
236+
unit="1",
237+
description="Number of tasks currently prefetched in the worker.",
238+
)
239+
240+
def _observe_memory(self, options: Any) -> Generator[Observation, None, None]:
227241
if self.broker and self.broker.is_worker_process:
228242
yield Observation(self._process.memory_info().rss)
229243

230-
def _observe_cpu(self, _options: Any) -> Generator[Observation, None, None]:
244+
def _observe_cpu(self, options: Any) -> Generator[Observation, None, None]:
231245
if self.broker and self.broker.is_worker_process:
232246
yield Observation(self._process.cpu_percent())
233247

@@ -298,6 +312,10 @@ def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
298312
activation.__enter__() # pylint: disable=E1101
299313
attach_context(message, span, activation, token)
300314
message.labels[_TASK_RECEIVED_TIME_KEY] = datetime.now(timezone.utc).timestamp()
315+
self.number_of_broker_active_tasks.add(
316+
1,
317+
attributes={"task_name": message.task_name},
318+
)
301319
return message
302320

303321
def post_save( # pylint: disable=R6301
@@ -424,3 +442,16 @@ def post_execute(
424442
amount=task_receive_time - task_send_time,
425443
attributes={"task_name": message.task_name},
426444
)
445+
446+
self.number_of_broker_active_tasks.add(
447+
-1,
448+
attributes={"task_name": message.task_name},
449+
)
450+
451+
def on_prefetch_queue_add(self) -> None:
452+
"""This hook is called after task is added to the worker prefetch queue."""
453+
self.number_of_broker_prefetched_tasks.add(1)
454+
455+
def on_prefetch_queue_remove(self) -> None:
456+
"""This hook is called after task is removed from the worker prefetch queue."""
457+
self.number_of_broker_prefetched_tasks.add(-1)

taskiq/receiver/receiver.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,12 @@ async def prefetcher(
383383
current_message = asyncio.create_task(iterator.__anext__()) # type: ignore
384384
fetched_tasks += 1
385385
await queue.put(message)
386+
# Custom hooks for OTel and any future instrumentations
387+
for middleware in reversed(self.broker.middlewares):
388+
if hasattr(middleware, "on_prefetch_queue_add"):
389+
await maybe_awaitable(
390+
middleware.on_prefetch_queue_add(), # type: ignore
391+
)
386392
except (asyncio.CancelledError, StopAsyncIteration):
387393
break
388394
# We don't want to fetch new messages if we are shutting down.
@@ -434,6 +440,13 @@ def task_cb(task: "asyncio.Task[Any]") -> None:
434440
logger.info("No more tasks to wait for. Shutting down.")
435441
break
436442

443+
# Custom hooks for OTel and any future instrumentations
444+
for middleware in reversed(self.broker.middlewares):
445+
if hasattr(middleware, "on_prefetch_queue_remove"):
446+
await maybe_awaitable(
447+
middleware.on_prefetch_queue_remove(), # type: ignore
448+
)
449+
437450
task = asyncio.create_task(
438451
self.callback(message=message, raise_err=False),
439452
)

tests/opentelemetry/test_metrics.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from opentelemetry.test.test_base import TestBase
77

88
from taskiq.instrumentation import TaskiqInstrumentor
9+
from taskiq.middlewares.opentelemetry_middleware import OpenTelemetryMiddleware
910

1011
from .taskiq_test_tasks import (
1112
broker,
@@ -58,6 +59,7 @@ async def test() -> None:
5859
"task_success",
5960
"task_execution_time",
6061
"task_wait_time",
62+
"worker_active_tasks",
6163
}
6264
found = {
6365
metric.name
@@ -167,3 +169,56 @@ async def test() -> None:
167169
points = self._get_data_points("tasks_sent")
168170
self.assertEqual(len(points), 1)
169171
self.assertEqual(points[0].value, 3)
172+
173+
def test_active_tasks_counter(self) -> None:
174+
async def test() -> None:
175+
for _ in range(3):
176+
await task_add.kiq(1, 2)
177+
await broker.wait_all()
178+
179+
asyncio.run(test())
180+
181+
points = self._get_data_points("worker_active_tasks")
182+
# all 3 tasks share the same task_name so they aggregate into one data point
183+
self.assertEqual(len(points), 1)
184+
# net zero: pre_execute incremented, post_execute decremented for each task
185+
self.assertEqual(points[0].value, 0)
186+
self.assertIn("task_name", points[0].attributes)
187+
self.assertEqual(
188+
points[0].attributes.get("task_name"),
189+
"tests.opentelemetry.taskiq_test_tasks:task_add",
190+
)
191+
192+
def test_prefetch_queue_counter(self) -> None:
193+
middleware = next(
194+
m for m in broker.middlewares if isinstance(m, OpenTelemetryMiddleware)
195+
)
196+
middleware.on_prefetch_queue_add()
197+
middleware.on_prefetch_queue_add()
198+
middleware.on_prefetch_queue_add()
199+
middleware.on_prefetch_queue_remove()
200+
201+
points = self._get_data_points("worker_prefetched_tasks")
202+
self.assertEqual(len(points), 1)
203+
self.assertEqual(points[0].value, 2)
204+
205+
def test_worker_resource_metrics_when_worker_process(self) -> None:
206+
middleware = next(
207+
m for m in broker.middlewares if isinstance(m, OpenTelemetryMiddleware)
208+
)
209+
middleware.set_broker(broker)
210+
broker.is_worker_process = True
211+
try:
212+
metrics_data = self.reader.get_metrics_data()
213+
self.assertIsNotNone(metrics_data)
214+
found = {
215+
metric.name
216+
for rm in metrics_data.resource_metrics # type: ignore[union-attr]
217+
for sm in rm.scope_metrics
218+
for metric in sm.metrics
219+
}
220+
self.assertIn("worker_cpu_utilization", found)
221+
self.assertIn("worker_memory_utilization", found)
222+
finally:
223+
broker.is_worker_process = False
224+
middleware.set_broker(None) # type: ignore[arg-type]

0 commit comments

Comments
 (0)