-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathend_of_run_workflow.py
More file actions
93 lines (70 loc) · 3.13 KB
/
end_of_run_workflow.py
File metadata and controls
93 lines (70 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import traceback
from prefect import task, flow, get_run_logger
from prefect.blocks.notifications import SlackWebhook
from prefect.context import FlowRunContext
from xanes_exporter import xanes_exporter
from xrf_hdf5_exporter import xrf_hdf5_exporter
from logscan import logscan
from dotenv import load_dotenv
import os
from data_validation import get_run
CATALOG_NAME = "srx"
def get_api_key_from_env():
with open("/srv/container.secret", "r") as secrets:
load_dotenv(stream=secrets)
api_key = os.environ["TILED_API_KEY"]
return api_key
def slack(func):
"""
Send a message to mon-prefect slack channel about the flow-run status.
Send a message to mon-bluesky slack channel if the bluesky-run failed.
NOTE: the name of this inner function is the same as the real end_of_workflow() function because
when the decorator is used, Prefect sees the name of this inner function as the name of
the flow. To keep the naming of workflows consistent, the name of this inner function had to match the expected name.
"""
def end_of_run_workflow(stop_doc, api_key=None, dry_run=False):
flow_run_name = FlowRunContext.get().flow_run.dict().get("name")
# Load slack credentials that are saved in Prefect.
mon_prefect = SlackWebhook.load("mon-prefect")
mon_bluesky = SlackWebhook.load("mon-bluesky")
# Get the uid.
uid = stop_doc["run_start"]
# Get Tiled API key, if not set already
if not api_key:
api_key = get_api_key_from_env()
# Get the scan_id.
run = get_run(uid, api_key=api_key)
scan_id = run.start["scan_id"]
# Send a message to mon-bluesky if bluesky-run failed.
if stop_doc.get("exit_status") == "fail":
mon_bluesky.notify(
f":bangbang: {CATALOG_NAME} bluesky-run failed. (*{flow_run_name}*)\n ```run_start: {uid}\nscan_id: {scan_id}``` ```reason: {stop_doc.get('reason', 'none')}```"
)
try:
result = func(stop_doc, api_key=api_key, dry_run=dry_run)
# Send a message to mon-prefect if flow-run is successful.
mon_prefect.notify(
f":white_check_mark: {CATALOG_NAME} flow-run successful. (*{flow_run_name}*)\n ```run_start: {uid}\nscan_id: {scan_id}```"
)
return result
except Exception as e:
tb = traceback.format_exception_only(e)
# Send a message to mon-prefect if flow-run failed.
mon_prefect.notify(
f":bangbang: {CATALOG_NAME} flow-run failed. (*{flow_run_name}*)\n ```run_start: {uid}\nscan_id: {scan_id}``` ```{tb[-1]}```"
)
raise
return end_of_run_workflow
@task
def log_completion():
logger = get_run_logger()
logger.info("Complete")
@flow
@slack
def end_of_run_workflow(stop_doc, api_key=None, dry_run=False):
uid = stop_doc["run_start"]
# data_validation(uid, return_state=True, api_key=api)
xanes_exporter(uid, api_key=api_key, dry_run=dry_run)
xrf_hdf5_exporter(uid, api_key=api_key, dry_run=dry_run)
logscan(uid, api_key=api_key, dry_run=dry_run)
log_completion()