diff --git a/app/cwl_arbitrary_dag.py b/app/cwl_arbitrary_dag.py new file mode 100644 index 0000000..ee688c5 --- /dev/null +++ b/app/cwl_arbitrary_dag.py @@ -0,0 +1,208 @@ +def write_dag( + catalog_filepath, dag_id, cwl_workflow, cwl_args, description="DAG to execute a generic CWL workflow" +): + with open(catalog_filepath, "w") as file: + file.writelines( + f''' +""" +DAG to execute a generic CWL workflow. + +The Airflow KubernetesPodOperator starts a Docker container that includes the Docker engine and the CWL libraries. +The "cwl-runner" tool is invoked to execute the CWL workflow. +Parameter cwl_workflow: the URL of the CWL workflow to execute. +Parameter args_as_json: JSON string contained the specific values for the workflow specific inputs. +""" + +import json +import logging +import os +import shutil +from datetime import datetime + +from airflow.models.baseoperator import chain +from airflow.models.param import Param +from airflow.operators.python import PythonOperator, get_current_context +from airflow.utils.trigger_rule import TriggerRule +from kubernetes.client import models as k8s +from unity_sps_utils import SpsKubernetesPodOperator, get_affinity + +from airflow import DAG + +# The Kubernetes namespace within which the Pod is run (it must already exist) +POD_NAMESPACE = "sps" +POD_LABEL = "cwl_task" +SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.1.0" + +NODE_POOL_DEFAULT = "airflow-kubernetes-pod-operator" +NODE_POOL_HIGH_WORKLOAD = "airflow-kubernetes-pod-operator-high-workload" + +# The path of the working directory where the CWL workflow is executed +# (aka the starting directory for cwl-runner). +# This is fixed to the EFS /scratch directory in this DAG. +WORKING_DIR = "/scratch" + +# default parameters +DEFAULT_CWL_WORKFLOW = ( + "{cwl_workflow}" +) +DEFAULT_CWL_ARGUMENTS = json.dumps({cwl_args}) + +# unity_sps_sbg_debug.txt +CONTAINER_RESOURCES = k8s.V1ResourceRequirements( + requests={{ + # "cpu": "2660m", # 2.67 vCPUs, specified in milliCPUs + # "memory": "22Gi", # Rounded to 22 GiB for easier specification + "memory": "{{{{ params.request_memory }}}}", + "cpu": "{{{{ params.request_cpu }}}} ", + "ephemeral-storage": "{{{{ params.request_storage }}}} ", + }}, + # limits={{ + # # "cpu": "2660m", # Optional: set the same as requests if you want a fixed allocation + # # "memory": "22Gi", + # "ephemeral-storage": "30Gi" + # }}, +) + +# Default DAG configuration +dag_default_args = {{ + "owner": "unity-sps", + "depends_on_past": False, + "start_date": datetime.utcfromtimestamp(0), +}} + +dag = DAG( + dag_id="{dag_id}", + description="{description}", + tags=["CWL"], + is_paused_upon_creation=False, + catchup=False, + schedule=None, + max_active_runs=10, + max_active_tasks=30, + default_args=dag_default_args, + params={{ + "cwl_args": Param( + DEFAULT_CWL_ARGUMENTS, + type="string", + title="CWL workflow parameters", + description=("The job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"), + ), + "request_memory": Param( + "4Gi", + type="string", + enum=["8Gi", "16Gi", "32Gi", "64Gi", "128Gi", "256Gi"], + title="Docker container memory", + ), + "request_cpu": Param( + "4", + type="string", + enum=["2", "4", "8", "16", "32"], + title="Docker container CPU", + ), + "request_storage": Param( + "10Gi", + type="string", + enum=["10Gi", "50Gi", "100Gi", "200Gi", "300Gi"], + title="Docker container storage", + ), + }}, +) + + +def setup(ti=None, **context): + """ + Task that creates the working directory on the shared volume + and parses the input parameter values. + """ + context = get_current_context() + dag_run_id = context["dag_run"].run_id + local_dir = f"/shared-task-data/{{dag_run_id}}" + logging.info(f"Creating directory: {{local_dir}}") + os.makedirs(local_dir, exist_ok=True) + logging.info(f"Created directory: {{local_dir}}") + + # select the node pool based on what resources were requested + node_pool = NODE_POOL_DEFAULT + storage = context["params"]["request_storage"] # 100Gi + storage = int(storage[0:-2]) # 100 + memory = context["params"]["request_memory"] # 32Gi + memory = int(memory[0:-2]) # 32 + cpu = int(context["params"]["request_cpu"]) # 8 + + logging.info(f"Requesting storage={{storage}}Gi memory={{memory}}Gi CPU={{cpu}}") + if (storage > 30) or (memory > 32) or (cpu > 8): + node_pool = NODE_POOL_HIGH_WORKLOAD + logging.info(f"Selecting node pool={{node_pool}}") + ti.xcom_push(key="node_pool", value=node_pool) + + +setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) + +cwl_task = SpsKubernetesPodOperator( + retries=0, + task_id="cwl_task", + namespace=POD_NAMESPACE, + name="cwl-task-pod", + image=SPS_DOCKER_CWL_IMAGE, + service_account_name="airflow-worker", + in_cluster=True, + get_logs=True, + startup_timeout_seconds=1800, + arguments=["{{{{DEFAULT_CWL_WORKFLOW}}}}", "{{{{params.cwl_args}}}}"], + container_security_context={{"privileged": True}}, + container_resources=CONTAINER_RESOURCES, + container_logs=True, + volume_mounts=[ + k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{{{dag_run.run_id}}}}") + ], + volumes=[ + k8s.V1Volume( + name="workers-volume", + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"), + ) + ], + dag=dag, + node_selector={{"karpenter.sh/nodepool": "{{{{ti.xcom_pull(task_ids='Setup', key='node_pool')}}}}"}}, + labels={{"app": POD_LABEL}}, + annotations={{"karpenter.sh/do-not-disrupt": "true"}}, + # note: 'affinity' cannot yet be templated + affinity=get_affinity( + capacity_type=["spot"], + # instance_type=["t3.2xlarge"], + anti_affinity_label=POD_LABEL, + ), + on_finish_action="keep_pod", + is_delete_operator_pod=False, +) + + +def cleanup(**context): + """ + Tasks that deletes all data shared between Tasks + from the Kubernetes PersistentVolume + """ + dag_run_id = context["dag_run"].run_id + local_dir = f"/shared-task-data/{{dag_run_id}}" + if os.path.exists(local_dir): + shutil.rmtree(local_dir) + logging.info(f"Deleted directory: {{local_dir}}") + else: + logging.info(f"Directory does not exist, no need to delete: {{local_dir}}") + + +cleanup_task = PythonOperator( + task_id="Cleanup", python_callable=cleanup, dag=dag, trigger_rule=TriggerRule.ALL_DONE +) + +chain(setup_task, cwl_task, cleanup_task) +''' + ) + + +if __name__ == "__main__": + write_dag( + "./test_cwl_dag.py", + "test_cwl_dag", + "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/main/demos/echo_message.cwl", + {"message": "Hello Unity"}, + ) diff --git a/app/main.py b/app/main.py index 7005533..21d2abd 100644 --- a/app/main.py +++ b/app/main.py @@ -19,7 +19,7 @@ from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound from typing_extensions import Annotated -from . import config +from . import config, cwl_arbitrary_dag from .database import SessionLocal, crud, engine, models from .redis import RedisLock from .schemas.ogc_processes import ( @@ -28,6 +28,7 @@ JobList, LandingPage, Link, + Ogcapppkg, Process, ProcessList, ProcessSummary, @@ -269,20 +270,21 @@ def deploy_process( settings: Annotated[config.Settings, Depends(get_settings)], redis_locking_client: Annotated[RedisLock, Depends(get_redis_locking_client)], db: Session = Depends(get_db), - process: Process = Body(...), + app: Ogcapppkg = Body(...), ): """ Deploy a new process. **Note:** This is not an officially supported endpoint in the OGC Processes specification. """ - check_process_integrity(db, process.id, new_process=True) + check_process_integrity(db, app.processDescription.id, new_process=True) - with redis_locking_client.lock("deploy_process_" + process.id): # as lock: + with redis_locking_client.lock("deploy_process_" + app.processDescription.id): # as lock: pass # Acquire lock # Create process in DB w/ deployment_status field "deploying" + # Check if request is a CWL DAG, template # Check if DAG exists in Airflow # Check if file exists in DAG folder # Check if file exists in DAG catalog @@ -293,29 +295,51 @@ def deploy_process( # Update process in DB w/ deployment_status field "deployed" # Release lock - # Verify that the process_id corresponds with a DAG ID by filename in the DAG catalog - dag_filename = process.id + ".py" - dag_catalog_filepath = os.path.join(settings.DAG_CATALOG_DIRECTORY, dag_filename) - if not os.path.isfile(dag_catalog_filepath): - # If the file doesn't exist, list other files in the same directory - existing_files = os.listdir(settings.DAG_CATALOG_DIRECTORY) - existing_files_str = "\n".join(existing_files) # Create a string from the list of files + dag_filename = app.processDescription.id + ".py" - # Raise an exception with details about what files are actually there + if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)): + # Log warning that file already exists in the deployed dags directory raise HTTPException( status_code=fastapi_status.HTTP_409_CONFLICT, - detail=f"The process ID '{process.id}' does not have a matching DAG file named '{dag_filename}' in the DAG catalog.\nThe DAG catalog includes the following files:\n{existing_files_str}", + detail=f"The process ID '{app.processDescription.id}' already has a deployed DAG.", ) - if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)): - # Log warning that file already exists in the deployed dags directory - pass + if app.executionUnit.type == "application/cwl": + cwl_arbitrary_dag.write_dag( + os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename), + app.processDescription.id, + app.executionUnit.href, + app.processDescription.inputs, + app.processDescription.description, + ) + elif app.executionUnit.mediaType == "application/cwl+json": + cwl_arbitrary_dag.write_dag( + os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename), + app.processDescription.id, + app.executionUnit.value, + app.processDescription.inputs, + app.processDescription.description, + ) + else: + # Verify that the process_id corresponds with a DAG ID by filename in the DAG catalog + dag_catalog_filepath = os.path.join(settings.DAG_CATALOG_DIRECTORY, dag_filename) + if not os.path.isfile(dag_catalog_filepath): + # If the file doesn't exist and the executionunit wasn't provided, + # list other files in the same directory + existing_files = os.listdir(settings.DAG_CATALOG_DIRECTORY) + existing_files_str = "\n".join(existing_files) # Create a string from the list of files + + # Raise an exception with details about what files are actually there + raise HTTPException( + status_code=fastapi_status.HTTP_409_CONFLICT, + detail=f"The process ID '{app.processDescription.id}' does not have a matching DAG file named '{dag_filename}' in the DAG catalog.\nThe DAG catalog includes the following files:\n{existing_files_str}", + ) - # Copy DAG from the DAG catalog PVC to deployed PVC - shutil.copy2( - dag_catalog_filepath, - settings.DEPLOYED_DAGS_DIRECTORY, - ) + # Copy DAG from the DAG catalog PVC to deployed PVC + shutil.copy2( + dag_catalog_filepath, + settings.DEPLOYED_DAGS_DIRECTORY, + ) if not os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)): raise HTTPException( @@ -330,22 +354,22 @@ def deploy_process( timeout = 20 start_time = time.time() while time.time() - start_time < timeout: - response = requests.get(f"{settings.EMS_API_URL}/dags/{process.id}", auth=ems_api_auth) + response = requests.get(f"{settings.EMS_API_URL}/dags/{app.processDescription.id}", auth=ems_api_auth) data = response.json() if response.status_code == 404: pass elif data["is_paused"]: - pause_dag(settings.EMS_API_URL, process.id, ems_api_auth, pause=False) + pause_dag(settings.EMS_API_URL, app.processDescription.id, ems_api_auth, pause=False) elif data["is_active"]: break time.sleep(0.5) else: raise HTTPException( status_code=fastapi_status.HTTP_504_GATEWAY_TIMEOUT, - detail=f"Timeout waiting for DAG '{process.id}' to be available in Airflow.", + detail=f"Timeout waiting for DAG '{app.processDescription.id}' to be available in Airflow.", ) - return crud.create_process(db, process) + return crud.create_process(db, app.processDescription) @app.delete( diff --git a/app/schemas/ogc_processes.py b/app/schemas/ogc_processes.py index 99b104c..5d1c6eb 100644 --- a/app/schemas/ogc_processes.py +++ b/app/schemas/ogc_processes.py @@ -323,6 +323,7 @@ class Results(RootModel): class Type3(Enum): docker = "docker" oci = "oci" + catalog = "catalog" class Deployment(Enum):