-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathend_of_run_workflow.py
More file actions
53 lines (46 loc) · 1.75 KB
/
end_of_run_workflow.py
File metadata and controls
53 lines (46 loc) · 1.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import getpass
import os
from prefect import task, flow, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
from data_validation import read_all_streams
from linker import get_symlink_pairs
from export import export_amptek, has_amptek_keys
from dotenv import load_dotenv
def get_api_key_from_env(api_key=None):
logger = get_run_logger()
with open("/srv/container.secret", "r") as secrets:
load_dotenv(stream=secrets)
api_key = os.environ["TILED_API_KEY"]
return api_key
@task
def log_completion():
logger = get_run_logger()
logger.info("Complete")
@flow(task_runner=ConcurrentTaskRunner())
def end_of_run_workflow(stop_doc, api_key=None, dry_run=False):
logger = get_run_logger()
uid = stop_doc["run_start"]
if not api_key:
api_key = get_api_key_from_env(api_key=None)
logger.info(f"effective user: {getpass.getuser()}")
# Launch validation and linker concurrently.
det_map = {"900KW": "WAXS", "1M": "SAXS", "2M": "SAXS2M"}
linker_task = get_symlink_pairs.submit(uid, det_map=det_map, api_key=api_key, dry_run=dry_run)
logger.info("Launched linker task")
validation_task = read_all_streams.submit(uid, api_key=api_key)
logger.info("Launched validation task")
export_task = None
if not dry_run and has_amptek_keys(uid, api_key=api_key):
export_task = export_amptek.submit(uid)
logger.info("Launched amptek export task")
elif dry_run:
logger.info("Dry run: skipping amptek export")
else:
logger.info("Skipping export, amptek keys not present")
# Wait for completion.
logger.info("Waiting for tasks to complete")
validation_task.result()
linker_task.result()
if export_task:
export_task.result()
log_completion()