Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 208 additions & 0 deletions app/cwl_arbitrary_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
def write_dag(
catalog_filepath, dag_id, cwl_workflow, cwl_args, description="DAG to execute a generic CWL workflow"
):
with open(catalog_filepath, "w") as file:
file.writelines(
f'''
"""
DAG to execute a generic CWL workflow.

The Airflow KubernetesPodOperator starts a Docker container that includes the Docker engine and the CWL libraries.
The "cwl-runner" tool is invoked to execute the CWL workflow.
Parameter cwl_workflow: the URL of the CWL workflow to execute.
Parameter args_as_json: JSON string contained the specific values for the workflow specific inputs.
"""

import json
import logging
import os
import shutil
from datetime import datetime

from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.operators.python import PythonOperator, get_current_context
from airflow.utils.trigger_rule import TriggerRule
from kubernetes.client import models as k8s
from unity_sps_utils import SpsKubernetesPodOperator, get_affinity

from airflow import DAG

# The Kubernetes namespace within which the Pod is run (it must already exist)
POD_NAMESPACE = "sps"
POD_LABEL = "cwl_task"
SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.1.0"

NODE_POOL_DEFAULT = "airflow-kubernetes-pod-operator"
NODE_POOL_HIGH_WORKLOAD = "airflow-kubernetes-pod-operator-high-workload"

# The path of the working directory where the CWL workflow is executed
# (aka the starting directory for cwl-runner).
# This is fixed to the EFS /scratch directory in this DAG.
WORKING_DIR = "/scratch"

# default parameters
DEFAULT_CWL_WORKFLOW = (
"{cwl_workflow}"
)
DEFAULT_CWL_ARGUMENTS = json.dumps({cwl_args})

# unity_sps_sbg_debug.txt
CONTAINER_RESOURCES = k8s.V1ResourceRequirements(
requests={{
# "cpu": "2660m", # 2.67 vCPUs, specified in milliCPUs
# "memory": "22Gi", # Rounded to 22 GiB for easier specification
"memory": "{{{{ params.request_memory }}}}",
"cpu": "{{{{ params.request_cpu }}}} ",
"ephemeral-storage": "{{{{ params.request_storage }}}} ",
}},
# limits={{
# # "cpu": "2660m", # Optional: set the same as requests if you want a fixed allocation
# # "memory": "22Gi",
# "ephemeral-storage": "30Gi"
# }},
)

# Default DAG configuration
dag_default_args = {{
"owner": "unity-sps",
"depends_on_past": False,
"start_date": datetime.utcfromtimestamp(0),
}}

dag = DAG(
dag_id="{dag_id}",
description="{description}",
tags=["CWL"],
is_paused_upon_creation=False,
catchup=False,
schedule=None,
max_active_runs=10,
max_active_tasks=30,
default_args=dag_default_args,
params={{
"cwl_args": Param(
DEFAULT_CWL_ARGUMENTS,
type="string",
title="CWL workflow parameters",
description=("The job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"),
),
"request_memory": Param(
"4Gi",
type="string",
enum=["8Gi", "16Gi", "32Gi", "64Gi", "128Gi", "256Gi"],
title="Docker container memory",
),
"request_cpu": Param(
"4",
type="string",
enum=["2", "4", "8", "16", "32"],
title="Docker container CPU",
),
"request_storage": Param(
"10Gi",
type="string",
enum=["10Gi", "50Gi", "100Gi", "200Gi", "300Gi"],
title="Docker container storage",
),
}},
)


def setup(ti=None, **context):
"""
Task that creates the working directory on the shared volume
and parses the input parameter values.
"""
context = get_current_context()
dag_run_id = context["dag_run"].run_id
local_dir = f"/shared-task-data/{{dag_run_id}}"
logging.info(f"Creating directory: {{local_dir}}")
os.makedirs(local_dir, exist_ok=True)
logging.info(f"Created directory: {{local_dir}}")

# select the node pool based on what resources were requested
node_pool = NODE_POOL_DEFAULT
storage = context["params"]["request_storage"] # 100Gi
storage = int(storage[0:-2]) # 100
memory = context["params"]["request_memory"] # 32Gi
memory = int(memory[0:-2]) # 32
cpu = int(context["params"]["request_cpu"]) # 8

logging.info(f"Requesting storage={{storage}}Gi memory={{memory}}Gi CPU={{cpu}}")
if (storage > 30) or (memory > 32) or (cpu > 8):
node_pool = NODE_POOL_HIGH_WORKLOAD
logging.info(f"Selecting node pool={{node_pool}}")
ti.xcom_push(key="node_pool", value=node_pool)


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)

cwl_task = SpsKubernetesPodOperator(
retries=0,
task_id="cwl_task",
namespace=POD_NAMESPACE,
name="cwl-task-pod",
image=SPS_DOCKER_CWL_IMAGE,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
startup_timeout_seconds=1800,
arguments=["{{{{DEFAULT_CWL_WORKFLOW}}}}", "{{{{params.cwl_args}}}}"],
container_security_context={{"privileged": True}},
container_resources=CONTAINER_RESOURCES,
container_logs=True,
volume_mounts=[
k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{{{dag_run.run_id}}}}")
],
volumes=[
k8s.V1Volume(
name="workers-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"),
)
],
dag=dag,
node_selector={{"karpenter.sh/nodepool": "{{{{ti.xcom_pull(task_ids='Setup', key='node_pool')}}}}"}},
labels={{"app": POD_LABEL}},
annotations={{"karpenter.sh/do-not-disrupt": "true"}},
# note: 'affinity' cannot yet be templated
affinity=get_affinity(
capacity_type=["spot"],
# instance_type=["t3.2xlarge"],
anti_affinity_label=POD_LABEL,
),
on_finish_action="keep_pod",
is_delete_operator_pod=False,
)


def cleanup(**context):
"""
Tasks that deletes all data shared between Tasks
from the Kubernetes PersistentVolume
"""
dag_run_id = context["dag_run"].run_id
local_dir = f"/shared-task-data/{{dag_run_id}}"
if os.path.exists(local_dir):
shutil.rmtree(local_dir)
logging.info(f"Deleted directory: {{local_dir}}")
else:
logging.info(f"Directory does not exist, no need to delete: {{local_dir}}")


cleanup_task = PythonOperator(
task_id="Cleanup", python_callable=cleanup, dag=dag, trigger_rule=TriggerRule.ALL_DONE
)

chain(setup_task, cwl_task, cleanup_task)
'''
)


if __name__ == "__main__":
write_dag(
"./test_cwl_dag.py",
"test_cwl_dag",
"https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/main/demos/echo_message.cwl",
{"message": "Hello Unity"},
)
74 changes: 49 additions & 25 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from typing_extensions import Annotated

from . import config
from . import config, cwl_arbitrary_dag
from .database import SessionLocal, crud, engine, models
from .redis import RedisLock
from .schemas.ogc_processes import (
Expand All @@ -28,6 +28,7 @@
JobList,
LandingPage,
Link,
Ogcapppkg,
Process,
ProcessList,
ProcessSummary,
Expand Down Expand Up @@ -269,20 +270,21 @@ def deploy_process(
settings: Annotated[config.Settings, Depends(get_settings)],
redis_locking_client: Annotated[RedisLock, Depends(get_redis_locking_client)],
db: Session = Depends(get_db),
process: Process = Body(...),
app: Ogcapppkg = Body(...),
):
"""
Deploy a new process.

**Note:** This is not an officially supported endpoint in the OGC Processes specification.
"""
check_process_integrity(db, process.id, new_process=True)
check_process_integrity(db, app.processDescription.id, new_process=True)

with redis_locking_client.lock("deploy_process_" + process.id): # as lock:
with redis_locking_client.lock("deploy_process_" + app.processDescription.id): # as lock:
pass

# Acquire lock
# Create process in DB w/ deployment_status field "deploying"
# Check if request is a CWL DAG, template
# Check if DAG exists in Airflow
# Check if file exists in DAG folder
# Check if file exists in DAG catalog
Expand All @@ -293,29 +295,51 @@ def deploy_process(
# Update process in DB w/ deployment_status field "deployed"
# Release lock

# Verify that the process_id corresponds with a DAG ID by filename in the DAG catalog
dag_filename = process.id + ".py"
dag_catalog_filepath = os.path.join(settings.DAG_CATALOG_DIRECTORY, dag_filename)
if not os.path.isfile(dag_catalog_filepath):
# If the file doesn't exist, list other files in the same directory
existing_files = os.listdir(settings.DAG_CATALOG_DIRECTORY)
existing_files_str = "\n".join(existing_files) # Create a string from the list of files
dag_filename = app.processDescription.id + ".py"

# Raise an exception with details about what files are actually there
if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)):
# Log warning that file already exists in the deployed dags directory
raise HTTPException(
status_code=fastapi_status.HTTP_409_CONFLICT,
detail=f"The process ID '{process.id}' does not have a matching DAG file named '{dag_filename}' in the DAG catalog.\nThe DAG catalog includes the following files:\n{existing_files_str}",
detail=f"The process ID '{app.processDescription.id}' already has a deployed DAG.",
)

if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)):
# Log warning that file already exists in the deployed dags directory
pass
if app.executionUnit.type == "application/cwl":
cwl_arbitrary_dag.write_dag(
os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename),
app.processDescription.id,
app.executionUnit.href,
app.processDescription.inputs,
app.processDescription.description,
)
elif app.executionUnit.mediaType == "application/cwl+json":
cwl_arbitrary_dag.write_dag(
os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename),
app.processDescription.id,
app.executionUnit.value,
app.processDescription.inputs,
app.processDescription.description,
)
else:
# Verify that the process_id corresponds with a DAG ID by filename in the DAG catalog
dag_catalog_filepath = os.path.join(settings.DAG_CATALOG_DIRECTORY, dag_filename)
if not os.path.isfile(dag_catalog_filepath):
# If the file doesn't exist and the executionunit wasn't provided,
# list other files in the same directory
existing_files = os.listdir(settings.DAG_CATALOG_DIRECTORY)
existing_files_str = "\n".join(existing_files) # Create a string from the list of files

# Raise an exception with details about what files are actually there
raise HTTPException(
status_code=fastapi_status.HTTP_409_CONFLICT,
detail=f"The process ID '{app.processDescription.id}' does not have a matching DAG file named '{dag_filename}' in the DAG catalog.\nThe DAG catalog includes the following files:\n{existing_files_str}",
)

# Copy DAG from the DAG catalog PVC to deployed PVC
shutil.copy2(
dag_catalog_filepath,
settings.DEPLOYED_DAGS_DIRECTORY,
)
# Copy DAG from the DAG catalog PVC to deployed PVC
shutil.copy2(
dag_catalog_filepath,
settings.DEPLOYED_DAGS_DIRECTORY,
)

if not os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)):
raise HTTPException(
Expand All @@ -330,22 +354,22 @@ def deploy_process(
timeout = 20
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(f"{settings.EMS_API_URL}/dags/{process.id}", auth=ems_api_auth)
response = requests.get(f"{settings.EMS_API_URL}/dags/{app.processDescription.id}", auth=ems_api_auth)
data = response.json()
if response.status_code == 404:
pass
elif data["is_paused"]:
pause_dag(settings.EMS_API_URL, process.id, ems_api_auth, pause=False)
pause_dag(settings.EMS_API_URL, app.processDescription.id, ems_api_auth, pause=False)
elif data["is_active"]:
break
time.sleep(0.5)
else:
raise HTTPException(
status_code=fastapi_status.HTTP_504_GATEWAY_TIMEOUT,
detail=f"Timeout waiting for DAG '{process.id}' to be available in Airflow.",
detail=f"Timeout waiting for DAG '{app.processDescription.id}' to be available in Airflow.",
)

return crud.create_process(db, process)
return crud.create_process(db, app.processDescription)


@app.delete(
Expand Down
1 change: 1 addition & 0 deletions app/schemas/ogc_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ class Results(RootModel):
class Type3(Enum):
docker = "docker"
oci = "oci"
catalog = "catalog"


class Deployment(Enum):
Expand Down