-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path__init__.py
More file actions
65 lines (54 loc) · 2.4 KB
/
__init__.py
File metadata and controls
65 lines (54 loc) · 2.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from pyformance import MetricsRegistry
from wavefront_pyformance.wavefront_reporter import WavefrontDirectReporter
import os
from datetime import datetime
from wavefront_pyformance import delta
reg = None
def wrapper(func):
"""
Returns the Wavefront Dispatch wrapper. The wrapper collects dispatch functions
standard metrics and reports it directly to the specified wavefront url. It
requires the following Environment variables to be set:
1.WAVEFRONT_URL : https://<INSTANCE>.wavefront.com
2.WAVEFRONT_API_TOKEN : Wavefront API token with Direct Data Ingestion permission
"""
def call_dispatch_function(wf_reporter, *args, **kwargs):
METRICS_PREFIX = "dispatch.function.wf."
# Register duration metrics
dispatch_function_duration_gauge = reg.gauge(METRICS_PREFIX + "duration")
# Register invocations metrics
dispatch_function_invocations_counter = delta.delta_counter(reg, METRICS_PREFIX + "invocations")
dispatch_function_invocations_counter.inc()
# Registry errors metrics
dispatch_erros_count = delta.delta_counter(reg, METRICS_PREFIX + "errors")
time_start = datetime.now()
try:
response = func(*args, **kwargs)
return response
except:
dispatch_erros_count.inc()
raise
finally:
time_taken = datetime.now() - time_start
dispatch_function_duration_gauge.set_value(time_taken.total_seconds() * 1000)
wf_reporter.report_now(registry=reg)
def wavefront_wrapper(*args, **kwargs):
print("Func has been decorated.")
# Initialize registry
global reg
reg = MetricsRegistry()
# Get wavefront secrets
context, payload = args[0], args[1]
server = context["secrets"].get("wavefront_server_url", "")
auth_token = context["secrets"].get("wavefront_auth_token", "")
# Initialize the wavefront direct reporter
wf_direct_reporter = WavefrontDirectReporter(server=server,
token=auth_token,
registry=reg,
prefix="")
call_dispatch_function(wf_direct_reporter,
*args,
**kwargs)
return wavefront_wrapper
def get_registry():
return reg