Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/content/multiprocess/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ it's common to have processes rather than threads to handle large workloads.
To handle this the client library can be put in multiprocess mode.
This comes with a number of limitations:

- Registries can not be used as normal, all instantiated metrics are exported
- Registries can not be used as normal
- Registering metrics to a registry later used by a `MultiProcessCollector`
may cause duplicate metrics to be exported
- Filtering on metrics works but is inefficient
- Custom collectors do not work (e.g. cpu and memory metrics)
- Gauges cannot use `set_function`
- Info and Enum metrics do not work
Expand Down
5 changes: 4 additions & 1 deletion prometheus_client/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str,
self._lock = Lock()
self._target_info: Optional[Dict[str, str]] = {}
self.set_target_info(target_info)
self._collectors_with_no_names: List[Collector] = []

def register(self, collector: Collector) -> None:
"""Add a collector to the registry."""
Expand All @@ -46,6 +47,8 @@ def register(self, collector: Collector) -> None:
for name in names:
self._names_to_collectors[name] = collector
self._collector_to_names[collector] = names
if not names:
self._collectors_with_no_names.append(collector)

def unregister(self, collector: Collector) -> None:
"""Remove a collector from the registry."""
Expand Down Expand Up @@ -148,7 +151,7 @@ def __init__(self, names: Iterable[str], registry: CollectorRegistry):
self._registry = registry

def collect(self) -> Iterable[Metric]:
collectors = set()
collectors = set(self._registry._collectors_with_no_names)
target_info_metric = None
with self._registry._lock:
if 'target_info' in self._name_set and self._registry._target_info:
Expand Down
29 changes: 29 additions & 0 deletions tests/test_asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,32 @@ def test_qs_parsing(self):
asyncio.new_event_loop().run_until_complete(
self.communicator.wait()
)

def test_qs_parsing_multi(self):
"""Only metrics that match the 'name[]' query string param appear"""

app = make_asgi_app(self.registry)
metrics = [
("asdf", "first test metric", 1),
("bsdf", "second test metric", 2),
("csdf", "third test metric", 3)
]

for m in metrics:
self.increment_metrics(*m)

self.seed_app(app)
self.scope['query_string'] = "&".join([f"name[]={m[0]}_total" for m in metrics[0:2]]).encode("utf-8")
self.send_default_request()

outputs = self.get_all_output()
response_body = outputs[1]
output = response_body['body'].decode('utf8')

self.assert_metrics(output, *metrics[0])
self.assert_metrics(output, *metrics[1])
self.assert_not_metrics(output, *metrics[2])

asyncio.new_event_loop().run_until_complete(
self.communicator.wait()
)
9 changes: 9 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,15 @@ def test_restricted_registry_does_not_call_extra(self):
self.assertEqual([m], list(registry.restricted_registry(['s_sum']).collect()))
mock_collector.collect.assert_not_called()

def test_restricted_registry_collects_no_names_collectors(self):
from unittest.mock import MagicMock
registry = CollectorRegistry()
mock_collector = MagicMock()
mock_collector.describe.return_value = []
registry.register(mock_collector)
self.assertEqual(list(registry.restricted_registry(['metric']).collect()), [])
mock_collector.collect.assert_called()

def test_restricted_registry_does_not_yield_while_locked(self):
registry = CollectorRegistry(target_info={'foo': 'bar'})
Summary('s', 'help', registry=registry).observe(7)
Expand Down
29 changes: 29 additions & 0 deletions tests/test_multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,35 @@ def add_label(key, value):

self.assertEqual(metrics['h'].samples, expected_histogram)

def test_restrict(self):
pid = 0
values.ValueClass = MultiProcessValue(lambda: pid)
labels = {i: i for i in 'abcd'}

def add_label(key, value):
l = labels.copy()
l[key] = value
return l

c = Counter('c', 'help', labelnames=labels.keys(), registry=None)
g = Gauge('g', 'help', labelnames=labels.keys(), registry=None)

c.labels(**labels).inc(1)
g.labels(**labels).set(1)

pid = 1

c.labels(**labels).inc(1)
g.labels(**labels).set(1)

metrics = {m.name: m for m in self.registry.restricted_registry(['c_total']).collect()}

self.assertEqual(metrics.keys(), {'c'})

self.assertEqual(
metrics['c'].samples, [Sample('c_total', labels, 2.0)]
)

def test_collect_preserves_help(self):
pid = 0
values.ValueClass = MultiProcessValue(lambda: pid)
Expand Down