Skip to content

Commit e5363c4

Browse files
committed
init: influx + udp
0 parents  commit e5363c4

File tree

17 files changed

+470
-0
lines changed

17 files changed

+470
-0
lines changed

.gitignore

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
*.swo
2+
*.swp
3+
*.pyc
4+
*.pyo
5+
*.log
6+
*.lock
7+
.env*
8+
.venv*
9+
.pypyenv*
10+
.pytest_*
11+
.mypy_cache
12+
*.egg-info
13+
__pycache__
14+
.vscode*
15+
16+
!.gitkeep
17+
!/.gitignore

prometheus_push_client/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from .clients.bg import ThreadBGClient, AsyncioBGClient
2+
# TODO: clients.fg
3+
from .decorators import (
4+
influx_udp_async,
5+
graphite_udp_async,
6+
influx_udp_thread,
7+
graphite_udp_thread,
8+
)
9+
from .formats.influx import InfluxFormat
10+
from .formats.graphite import GraphiteFormat
11+
from .formats.prometheus import PrometheusFormat
12+
from .metrics import Counter, Gauge, Summary, Histogram, Info, Enum
13+
from .registry import PUSH_REGISTRY
14+
from .transports.udp import SyncUdpTransport, AioUdpTransport
15+
# TODO: transports.http

prometheus_push_client/clients/__init__.py

Whitespace-only changes.
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import asyncio
2+
import threading
3+
import time
4+
5+
6+
class BackgroundClient:
7+
8+
def __init__(self, format, transport, period=15.0, *args, **kwargs):
9+
self.format = format
10+
self.transport = transport
11+
self.period = float(period)
12+
self.stop_event = None
13+
14+
self._period_step = 0.5 # check event every 0.5 seconds
15+
16+
super().__init__(*args, **kwargs)
17+
18+
def sleep_steps_iter(self, period):
19+
n_full_steps, last_step = divmod(period, self._period_step)
20+
for _ in range(int(n_full_steps)):
21+
yield self._period_step
22+
if last_step > 0:
23+
yield last_step
24+
25+
26+
class ThreadBGClient(BackgroundClient, threading.Thread):
27+
def start(self):
28+
self.stop_event = threading.Event()
29+
self.transport.start()
30+
super().start()
31+
32+
def stop(self):
33+
self.stop_event.set()
34+
self.join()
35+
self.transport.stop()
36+
37+
def run(self):
38+
period = self.period
39+
40+
while not self.stop_event.is_set():
41+
for step in self.sleep_steps_iter(period):
42+
time.sleep(step)
43+
if self.stop_event.is_set():
44+
break
45+
46+
ts_start = time.time()
47+
samples_iter = self.format.iter_samples()
48+
try:
49+
self.transport.push_all(samples_iter)
50+
except Exception:
51+
pass # TODO log?
52+
period = self.period - (time.time() - ts_start)
53+
54+
55+
class AsyncioBGClient(BackgroundClient):
56+
async def start(self):
57+
self.stop_event = asyncio.Event()
58+
await self.transport.start()
59+
self._runner = asyncio.create_task(self.run())
60+
61+
async def stop(self):
62+
self.stop_event.set()
63+
await asyncio.wait([self._runner]) # TODO timeout, catch errors
64+
self.transport.stop()
65+
66+
async def run(self):
67+
period = self.period
68+
69+
while not self.stop_event.is_set():
70+
for step in self.sleep_steps_iter(period):
71+
try:
72+
await asyncio.sleep(step)
73+
except asyncio.CancelledError:
74+
self.stop_event.set()
75+
if self.stop_event.is_set():
76+
break
77+
78+
ts_start = time.time()
79+
samples_iter = self.format.iter_samples()
80+
try:
81+
await self.transport.push_all(samples_iter)
82+
except Exception:
83+
pass # TODO: log?
84+
period = self.period - (time.time() - ts_start)

prometheus_push_client/compat.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import asyncio
2+
3+
if hasattr(asyncio, "get_running_loop"):
4+
get_running_loop = asyncio.get_running_loop
5+
else: # < 3.7, may be unsafe outside coroutines
6+
get_running_loop = asyncio.get_event_loop
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import functools
2+
3+
import prometheus_push_client as ppc
4+
5+
6+
def _thread_bg(client):
7+
def _wrap_func(func):
8+
@functools.wraps(func)
9+
def _wrap_call(*args, **kwargs):
10+
client.start()
11+
try:
12+
return func(*args, **kwargs)
13+
finally:
14+
client.stop()
15+
return _wrap_call
16+
return _wrap_func
17+
18+
19+
def _async_bg(client):
20+
def _wrap_func(func):
21+
@functools.wraps(func)
22+
async def _wrap_call(*args, **kwargs):
23+
await client.start()
24+
try:
25+
return await func(*args, **kwargs)
26+
finally:
27+
await client.stop()
28+
return _wrap_call
29+
return _wrap_func
30+
31+
32+
def influx_udp_async(host, port, period=15.0):
33+
client = ppc.AsyncioBGClient(
34+
format=ppc.InfluxFormat(),
35+
transport=ppc.AioUdpTransport(host, port),
36+
period=period,
37+
)
38+
return _async_bg(client)
39+
40+
41+
def influx_udp_thread(host, port, period=15.0):
42+
client = ppc.ThreadBGClient(
43+
format=ppc.InfluxFormat(),
44+
transport=ppc.SyncUdpTransport(host, port),
45+
period=period,
46+
)
47+
return _thread_bg(client)
48+
49+
50+
def graphite_udp_async(host, port, period=15.0):
51+
client = ppc.AsyncioBGClient(
52+
format=ppc.GraphiteFormat(),
53+
transport=ppc.AioUdpTransport(host, port),
54+
period=period,
55+
)
56+
return _async_bg(client)
57+
58+
59+
def graphite_udp_thread(host, port, period=15.0):
60+
client = ppc.ThreadBGClient(
61+
format=ppc.GraphiteFormat(),
62+
transport=ppc.SyncUdpTransport(host, port),
63+
period=period,
64+
)
65+
return _thread_bg(client)

prometheus_push_client/formats/__init__.py

Whitespace-only changes.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import time
2+
3+
from prometheus_push_client.registry import PUSH_REGISTRY
4+
5+
6+
class BaseFormat:
7+
def __init__(self, registry=PUSH_REGISTRY):
8+
self.registry = registry
9+
10+
def generate_latest(self):
11+
samples = self.iter_samples()
12+
return b"\n".join(samples)
13+
14+
def iter_samples(self):
15+
for metric in self.registry.collect():
16+
for sample in metric.samples:
17+
line = self.format_sample(sample, metric)
18+
if isinstance(line, str):
19+
line = line.encode("utf-8")
20+
yield line
21+
22+
def format_sample(self, metric, sample):
23+
raise NotImplementedError()
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from prometheus_push_client.formats.base import BaseFormat
2+
3+
4+
class GraphiteFormat(BaseFormat):
5+
6+
FMT_DATAPOINT = "{measurement}:{value}|{dtype}" # no sampling?
7+
8+
DTYPES = {
9+
"counter": "c",
10+
"gauge": "g",
11+
"info": "s",
12+
"enum": "s", # TODO?
13+
# histogram and summary are always counters?
14+
"histogram": "c",
15+
"summary": "c",
16+
}
17+
18+
def format_sample(self, sample, metric):
19+
measurement_name = sample.name
20+
if sample.labels:
21+
tags = " ".join(f"{k}={v}" for k,v in sample.labels.items())
22+
measurement_name = f"{measurement_name},{tags}"
23+
24+
dtype = self.DTYPES.get(metric.type)
25+
value = sample.value
26+
27+
# https://prometheus.io/docs/practices/naming/#base-units
28+
if metric.type == "counter": # TODO: other types
29+
if metric.unit == "seconds":
30+
dtype = "ms"
31+
value *= 1000
32+
33+
if dtype is None:
34+
raise NotImplementedError(f"cant convert type:{metric.type} unit:{metric.unit}")
35+
36+
if isinstance(value, float):
37+
value = int(round(value)) # TODO
38+
39+
return self.FMT_DATAPOINT.format(
40+
measurement=measurement_name,
41+
value=value,
42+
dtype=dtype,
43+
)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from prometheus_push_client.formats.base import BaseFormat
2+
3+
4+
class InfluxFormat(BaseFormat):
5+
6+
FMT_SAMPLE = "{sample_name}{tag_set} {measurement_name}={value}{timestamp}"
7+
8+
def __init__(self, *args, **kwargs):
9+
super().__init__(*args, **kwargs)
10+
self.validate_metric_names(self.registry)
11+
12+
def validate_metric_names(self, registry):
13+
names_not_supported = []
14+
for metric_name in self.registry._names_to_collectors.keys():
15+
if "_" not in metric_name:
16+
names_not_supported.append(metric_name)
17+
18+
if names_not_supported:
19+
raise ValueError(
20+
"Following metrics can't be exported to Influx. Add `namespace`, "
21+
"`subsystem`, `unit`, or just come up with 'two_words' name: "
22+
f"{names_not_supported}"
23+
)
24+
25+
def format_sample(self, sample, *_):
26+
sample_name, measurement_name = sample.name.rsplit("_", 1)
27+
28+
tags = ""
29+
if sample.labels:
30+
tags = ",".join(f"{k}={v}" for k,v in sample.labels.items())
31+
tags = f",{tags}"
32+
33+
ts = ""
34+
if sample.timestamp:
35+
ts = f" {int(sample.timestamp * 1e9)}" # to nanoseconds
36+
37+
return self.FMT_SAMPLE.format(
38+
sample_name=sample_name,
39+
tag_set=tags,
40+
measurement_name=measurement_name,
41+
value=sample.value,
42+
timestamp=ts,
43+
)

0 commit comments

Comments
 (0)