From d1eb00c3dc1c54ce19563b178270037f3fd83beb Mon Sep 17 00:00:00 2001 From: Bradley Lunsford Date: Mon, 10 Jun 2024 11:00:30 -0700 Subject: [PATCH 1/9] initial work, simple file templating --- app/cwl_arbitrary_dag.py | 141 +++++++++++++++++++++++++++++++++++++++ app/main.py | 24 ++++--- 2 files changed, 155 insertions(+), 10 deletions(-) create mode 100644 app/cwl_arbitrary_dag.py diff --git a/app/cwl_arbitrary_dag.py b/app/cwl_arbitrary_dag.py new file mode 100644 index 0000000..198a2da --- /dev/null +++ b/app/cwl_arbitrary_dag.py @@ -0,0 +1,141 @@ +import os + + +def write_dag( + catalog_filepath, dag_id, cwl_workflow, cwl_args, description="DAG to execute a generic CWL workflow" +): + with open(catalog_filepath, "w") as file: + file.writelines( + f""" +# DAG to execute a generic CWL workflow. +# The Airflow KubernetesPodOperator starts a Docker container that includes the Docker engine and the CWL libraries. +# The "cwl-runner" tool is invoked to execute the CWL workflow. +# Parameter cwl_workflow: the URL of the CWL workflow to execute. +# Parameter args_as_json: JSON string contained the specific values for the workflow specific inputs. +import json +import os +import shutil +import uuid +from datetime import datetime + +from airflow.models.param import Param +from airflow.operators.python import PythonOperator, get_current_context +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from kubernetes.client import models as k8s + +from airflow import DAG + +# The Kubernetes Pod that executes the CWL-Docker container +# Must use elevated privileges to start/stop the Docker engine +POD_TEMPLATE_FILE = "/opt/airflow/dags/docker_cwl_pod.yaml" + +# The Kubernetes namespace within which the Pod is run (it must already exist) +POD_NAMESPACE = "airflow" + +# The path of the working directory where the CWL workflow is executed +# (aka the starting directory for cwl-runner) +WORKING_DIR = "/scratch" + +# Default DAG configuration +dag_default_args = {{"owner": "unity-sps", "depends_on_past": False, "start_date": datetime(2024, 1, 1, 0, 0)}} + +# The DAG +dag = DAG( + dag_id="{dag_id}", + description="{description}", + tags=["cwl", "unity-sps", "docker"], + is_paused_upon_creation=False, + catchup=False, + schedule=None, + max_active_runs=100, + default_args=dag_default_args, + params={{ + "cwl_workflow": Param( + "{cwl_workflow}", type="string", title="CWL workflow", description="The CWL workflow URL" + ), + "cwl_args": Param( + json.dumps({cwl_args}), + type="string", + title="CWL workflow parameters", + description="The job parameters encodes as a JSON string, or the URL of a JSON or YAML file", + ), + }}, +) + +# Environment variables +default_env_vars = {{}} + + +# This task that creates the working directory on the shared volume +def setup(ti=None, **context): + context = get_current_context() + dag_run_id = context["dag_run"].run_id + local_dir = os.path.dirname(f"/shared-task-data/{{dag_run_id}}") + os.makedirs(local_dir, exist_ok=True) + + +setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) + +# This section defines KubernetesPodOperator +cwl_task = KubernetesPodOperator( + namespace=POD_NAMESPACE, + name="cwl-task", + on_finish_action="delete_succeeded_pod", + hostnetwork=False, + startup_timeout_seconds=1000, + get_logs=True, + task_id="docker-cwl-task", + full_pod_spec=k8s.V1Pod( + metadata=k8s.V1ObjectMeta(name="docker-cwl-pod-" + uuid.uuid4().hex), + ), + pod_template_file=POD_TEMPLATE_FILE, + arguments=["{{{{ params.cwl_workflow }}}}", "{{{{ params.cwl_args }}}}"], + dag=dag, + volume_mounts=[ + k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{{{ dag_run.run_id }}}}") + ], + volumes=[ + k8s.V1Volume( + name="workers-volume", + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"), + ) + ], +) + + +def cleanup(**context): + dag_run_id = context["dag_run"].run_id + local_dir = f"/shared-task-data/{{dag_run_id}}" + if os.path.exists(local_dir): + shutil.rmtree(local_dir) + print(f"Deleted directory: {{local_dir}}") + else: + print(f"Directory does not exist, no need to delete: {{local_dir}}") + + +cleanup_task = PythonOperator( + task_id="Cleanup", + python_callable=cleanup, + dag=dag, +) + +setup_task >> cwl_task >> cleanup_task +""" + ) + + +if __name__ == "__main__": + write_dag( + ".", + "test_cwl_dag", + "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl", + { + "input_processing_labels": ["label1", "label2"], + "input_cmr_stac": "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z", + "input_unity_dapa_client": "40c2s0ulbhp9i0fmaph3su9jch", + "input_unity_dapa_api": "https://d3vc8w9zcq658.cloudfront.net", + "input_crid": "001", + "output_collection_id": "urn:nasa:unity:unity:dev:SBG-L1B_PRE___1", + "output_data_bucket": "sps-dev-ds-storage", + }, + ) diff --git a/app/main.py b/app/main.py index 7005533..4dde4ea 100644 --- a/app/main.py +++ b/app/main.py @@ -19,7 +19,7 @@ from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound from typing_extensions import Annotated -from . import config +from . import config, cwl_arbitrary_dag from .database import SessionLocal, crud, engine, models from .redis import RedisLock from .schemas.ogc_processes import ( @@ -297,15 +297,19 @@ def deploy_process( dag_filename = process.id + ".py" dag_catalog_filepath = os.path.join(settings.DAG_CATALOG_DIRECTORY, dag_filename) if not os.path.isfile(dag_catalog_filepath): - # If the file doesn't exist, list other files in the same directory - existing_files = os.listdir(settings.DAG_CATALOG_DIRECTORY) - existing_files_str = "\n".join(existing_files) # Create a string from the list of files - - # Raise an exception with details about what files are actually there - raise HTTPException( - status_code=fastapi_status.HTTP_409_CONFLICT, - detail=f"The process ID '{process.id}' does not have a matching DAG file named '{dag_filename}' in the DAG catalog.\nThe DAG catalog includes the following files:\n{existing_files_str}", - ) + if process.executionunit: + cwl_arbitrary_dag.write_dag(dag_catalog_filepath,process.id,process.executionunit.reference.href,dict(),process.processdescription) + else: + # If the file doesn't exist and the executionunit wasn't provided, + # list other files in the same directory + existing_files = os.listdir(settings.DAG_CATALOG_DIRECTORY) + existing_files_str = "\n".join(existing_files) # Create a string from the list of files + + # Raise an exception with details about what files are actually there + raise HTTPException( + status_code=fastapi_status.HTTP_409_CONFLICT, + detail=f"The process ID '{process.id}' does not have a matching DAG file named '{dag_filename}' in the DAG catalog.\nThe DAG catalog includes the following files:\n{existing_files_str}", + ) if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)): # Log warning that file already exists in the deployed dags directory From 4ca4a5f9e400b9a55a21aa061bab09e8b6801ec0 Mon Sep 17 00:00:00 2001 From: Bradley Lunsford Date: Mon, 17 Jun 2024 23:21:36 -0700 Subject: [PATCH 2/9] updating dag template --- app/cwl_arbitrary_dag.py | 139 +++++++++++++++++++++++---------------- app/main.py | 4 +- 2 files changed, 85 insertions(+), 58 deletions(-) diff --git a/app/cwl_arbitrary_dag.py b/app/cwl_arbitrary_dag.py index 198a2da..4229bd8 100644 --- a/app/cwl_arbitrary_dag.py +++ b/app/cwl_arbitrary_dag.py @@ -1,96 +1,120 @@ -import os - - def write_dag( catalog_filepath, dag_id, cwl_workflow, cwl_args, description="DAG to execute a generic CWL workflow" ): with open(catalog_filepath, "w") as file: file.writelines( - f""" -# DAG to execute a generic CWL workflow. -# The Airflow KubernetesPodOperator starts a Docker container that includes the Docker engine and the CWL libraries. -# The "cwl-runner" tool is invoked to execute the CWL workflow. -# Parameter cwl_workflow: the URL of the CWL workflow to execute. -# Parameter args_as_json: JSON string contained the specific values for the workflow specific inputs. + f''' +""" +DAG to execute a generic CWL workflow. + +The Airflow KubernetesPodOperator starts a Docker container that includes the Docker engine and the CWL libraries. +The "cwl-runner" tool is invoked to execute the CWL workflow. +Parameter cwl_workflow: the URL of the CWL workflow to execute. +Parameter args_as_json: JSON string contained the specific values for the workflow specific inputs. +""" + import json +import logging import os import shutil -import uuid from datetime import datetime +from airflow.models.baseoperator import chain from airflow.models.param import Param from airflow.operators.python import PythonOperator, get_current_context from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.utils.trigger_rule import TriggerRule from kubernetes.client import models as k8s +from unity_sps_utils import get_affinity from airflow import DAG -# The Kubernetes Pod that executes the CWL-Docker container -# Must use elevated privileges to start/stop the Docker engine -POD_TEMPLATE_FILE = "/opt/airflow/dags/docker_cwl_pod.yaml" - # The Kubernetes namespace within which the Pod is run (it must already exist) POD_NAMESPACE = "airflow" +POD_LABEL = "cwl_task" # The path of the working directory where the CWL workflow is executed -# (aka the starting directory for cwl-runner) +# (aka the starting directory for cwl-runner). +# This is fixed to the EFS /scratch directory in this DAG. WORKING_DIR = "/scratch" +# default parameters +DEFAULT_CWL_WORKFLOW = ( + "{cwl_workflow}" +) +DEFAULT_CWL_ARGUMENTS = json.dumps({cwl_args}) + +# Alternative arguments to execute SBG Pre-Process +# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" +# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml" + +# Alternative arguments to execute SBG end-to-end +# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl" +# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml" + + # Default DAG configuration -dag_default_args = {{"owner": "unity-sps", "depends_on_past": False, "start_date": datetime(2024, 1, 1, 0, 0)}} +dag_default_args = {{ + "owner": "unity-sps", + "depends_on_past": False, + "start_date": datetime.utcfromtimestamp(0), +}} + +# common parameters +INPUT_PROCESSING_LABELS = ["label1", "label2"] -# The DAG dag = DAG( dag_id="{dag_id}", description="{description}", - tags=["cwl", "unity-sps", "docker"], + tags=["CWL"], is_paused_upon_creation=False, catchup=False, schedule=None, - max_active_runs=100, + max_active_runs=10, + max_active_tasks=30, default_args=dag_default_args, params={{ "cwl_workflow": Param( - "{cwl_workflow}", type="string", title="CWL workflow", description="The CWL workflow URL" + DEFAULT_CWL_WORKFLOW, type="string", title="CWL workflow", description="The CWL workflow URL" ), "cwl_args": Param( - json.dumps({cwl_args}), + DEFAULT_CWL_ARGUMENTS, type="string", title="CWL workflow parameters", - description="The job parameters encodes as a JSON string, or the URL of a JSON or YAML file", + description="The job parameters encoded as a JSON string, or the URL of a JSON or YAML file", ), }}, ) -# Environment variables -default_env_vars = {{}} - -# This task that creates the working directory on the shared volume def setup(ti=None, **context): + """ + Task that creates the working directory on the shared volume. + """ context = get_current_context() dag_run_id = context["dag_run"].run_id - local_dir = os.path.dirname(f"/shared-task-data/{{dag_run_id}}") + local_dir = f"/shared-task-data/{{dag_run_id}}" + logging.info(f"Creating directory: {{local_dir}}") os.makedirs(local_dir, exist_ok=True) + logging.info(f"Created directory: {{local_dir}}") setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) -# This section defines KubernetesPodOperator cwl_task = KubernetesPodOperator( + retries=0, + task_id="cwl_task", namespace=POD_NAMESPACE, - name="cwl-task", - on_finish_action="delete_succeeded_pod", - hostnetwork=False, - startup_timeout_seconds=1000, + name="cwl-task-pod", + image="ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0", + service_account_name="airflow-worker", + in_cluster=True, get_logs=True, - task_id="docker-cwl-task", - full_pod_spec=k8s.V1Pod( - metadata=k8s.V1ObjectMeta(name="docker-cwl-pod-" + uuid.uuid4().hex), - ), - pod_template_file=POD_TEMPLATE_FILE, + startup_timeout_seconds=1200, arguments=["{{{{ params.cwl_workflow }}}}", "{{{{ params.cwl_args }}}}"], - dag=dag, + container_security_context={{"privileged": True}}, + # container_resources=CONTAINER_RESOURCES, + container_logs=True, volume_mounts=[ k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{{{ dag_run.run_id }}}}") ], @@ -100,42 +124,45 @@ def setup(ti=None, **context): persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"), ) ], + dag=dag, + node_selector={{"karpenter.sh/nodepool": "airflow-kubernetes-pod-operator"}}, + labels={{"app": POD_LABEL}}, + annotations={{"karpenter.sh/do-not-disrupt": "true"}}, + affinity=get_affinity( + capacity_type=["spot"], + instance_type=["r7i.xlarge"], + anti_affinity_label=POD_LABEL, + ), ) def cleanup(**context): + """ + Tasks that deletes all data shared between Tasks + from the Kubernetes PersistentVolume + """ dag_run_id = context["dag_run"].run_id local_dir = f"/shared-task-data/{{dag_run_id}}" if os.path.exists(local_dir): shutil.rmtree(local_dir) - print(f"Deleted directory: {{local_dir}}") + logging.info(f"Deleted directory: {{local_dir}}") else: - print(f"Directory does not exist, no need to delete: {{local_dir}}") + logging.info(f"Directory does not exist, no need to delete: {{local_dir}}") cleanup_task = PythonOperator( - task_id="Cleanup", - python_callable=cleanup, - dag=dag, + task_id="Cleanup", python_callable=cleanup, dag=dag, trigger_rule=TriggerRule.ALL_DONE ) -setup_task >> cwl_task >> cleanup_task -""" +chain(setup_task, cwl_task, cleanup_task) +''' ) if __name__ == "__main__": write_dag( - ".", + "./test_cwl_dag.py", "test_cwl_dag", - "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl", - { - "input_processing_labels": ["label1", "label2"], - "input_cmr_stac": "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z", - "input_unity_dapa_client": "40c2s0ulbhp9i0fmaph3su9jch", - "input_unity_dapa_api": "https://d3vc8w9zcq658.cloudfront.net", - "input_crid": "001", - "output_collection_id": "urn:nasa:unity:unity:dev:SBG-L1B_PRE___1", - "output_data_bucket": "sps-dev-ds-storage", - }, + "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/main/demos/echo_message.cwl", + {"message": "Hello Unity"}, ) diff --git a/app/main.py b/app/main.py index 4dde4ea..ee98712 100644 --- a/app/main.py +++ b/app/main.py @@ -297,8 +297,8 @@ def deploy_process( dag_filename = process.id + ".py" dag_catalog_filepath = os.path.join(settings.DAG_CATALOG_DIRECTORY, dag_filename) if not os.path.isfile(dag_catalog_filepath): - if process.executionunit: - cwl_arbitrary_dag.write_dag(dag_catalog_filepath,process.id,process.executionunit.reference.href,dict(),process.processdescription) + if process.executionunit.type == "application/cwl": + cwl_arbitrary_dag.write_dag(dag_catalog_filepath,process.id,process.executionunit.href,dict(),process.processdescription) else: # If the file doesn't exist and the executionunit wasn't provided, # list other files in the same directory From 23291efd4a7e4a0c3e60ae0662cb514e73de764c Mon Sep 17 00:00:00 2001 From: Bradley Lunsford Date: Mon, 17 Jun 2024 23:24:47 -0700 Subject: [PATCH 3/9] precommit formatting --- app/main.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/app/main.py b/app/main.py index ee98712..eb7f154 100644 --- a/app/main.py +++ b/app/main.py @@ -298,7 +298,13 @@ def deploy_process( dag_catalog_filepath = os.path.join(settings.DAG_CATALOG_DIRECTORY, dag_filename) if not os.path.isfile(dag_catalog_filepath): if process.executionunit.type == "application/cwl": - cwl_arbitrary_dag.write_dag(dag_catalog_filepath,process.id,process.executionunit.href,dict(),process.processdescription) + cwl_arbitrary_dag.write_dag( + dag_catalog_filepath, + process.id, + process.executionunit.href, + dict(), + process.processdescription, + ) else: # If the file doesn't exist and the executionunit wasn't provided, # list other files in the same directory From a6fdf86043706f904875f6e9694201188b4ec384 Mon Sep 17 00:00:00 2001 From: Bradley Lunsford Date: Sat, 17 Aug 2024 21:25:37 -0700 Subject: [PATCH 4/9] alterations to properly reuse model-generated classes --- app/main.py | 68 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 39 insertions(+), 29 deletions(-) diff --git a/app/main.py b/app/main.py index eb7f154..3b1e5b2 100644 --- a/app/main.py +++ b/app/main.py @@ -28,6 +28,7 @@ JobList, LandingPage, Link, + Ogcapppkg, Process, ProcessList, ProcessSummary, @@ -269,16 +270,16 @@ def deploy_process( settings: Annotated[config.Settings, Depends(get_settings)], redis_locking_client: Annotated[RedisLock, Depends(get_redis_locking_client)], db: Session = Depends(get_db), - process: Process = Body(...), + app: Ogcapppkg = Body(...), ): """ Deploy a new process. **Note:** This is not an officially supported endpoint in the OGC Processes specification. """ - check_process_integrity(db, process.id, new_process=True) + check_process_integrity(db, app.processDescription.id, new_process=True) - with redis_locking_client.lock("deploy_process_" + process.id): # as lock: + with redis_locking_client.lock("deploy_process_" + app.processDescription.id): # as lock: pass # Acquire lock @@ -293,19 +294,28 @@ def deploy_process( # Update process in DB w/ deployment_status field "deployed" # Release lock - # Verify that the process_id corresponds with a DAG ID by filename in the DAG catalog - dag_filename = process.id + ".py" - dag_catalog_filepath = os.path.join(settings.DAG_CATALOG_DIRECTORY, dag_filename) - if not os.path.isfile(dag_catalog_filepath): - if process.executionunit.type == "application/cwl": - cwl_arbitrary_dag.write_dag( - dag_catalog_filepath, - process.id, - process.executionunit.href, - dict(), - process.processdescription, - ) - else: + dag_filename = app.processDescription.id + ".py" + + if app.executionUnit.type == "application/cwl": + cwl_arbitrary_dag.write_dag( + os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename), + app.processDescription.id, + app.executionUnit.href, + dict(), + app.processDescription.description, + ) + elif app.executionUnit.mediaType == "application/cwl+json": + cwl_arbitrary_dag.write_dag( + os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename), + app.processDescription.id, + app.executionUnit.value, + dict(), + app.processDescription.description, + ) + else: + # Verify that the process_id corresponds with a DAG ID by filename in the DAG catalog + dag_catalog_filepath = os.path.join(settings.DAG_CATALOG_DIRECTORY, dag_filename) + if not os.path.isfile(dag_catalog_filepath): # If the file doesn't exist and the executionunit wasn't provided, # list other files in the same directory existing_files = os.listdir(settings.DAG_CATALOG_DIRECTORY) @@ -314,18 +324,18 @@ def deploy_process( # Raise an exception with details about what files are actually there raise HTTPException( status_code=fastapi_status.HTTP_409_CONFLICT, - detail=f"The process ID '{process.id}' does not have a matching DAG file named '{dag_filename}' in the DAG catalog.\nThe DAG catalog includes the following files:\n{existing_files_str}", + detail=f"The process ID '{app.processDescription.id}' does not have a matching DAG file named '{dag_filename}' in the DAG catalog.\nThe DAG catalog includes the following files:\n{existing_files_str}", ) - if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)): - # Log warning that file already exists in the deployed dags directory - pass + if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)): + # Log warning that file already exists in the deployed dags directory + pass - # Copy DAG from the DAG catalog PVC to deployed PVC - shutil.copy2( - dag_catalog_filepath, - settings.DEPLOYED_DAGS_DIRECTORY, - ) + # Copy DAG from the DAG catalog PVC to deployed PVC + shutil.copy2( + dag_catalog_filepath, + settings.DEPLOYED_DAGS_DIRECTORY, + ) if not os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)): raise HTTPException( @@ -340,22 +350,22 @@ def deploy_process( timeout = 20 start_time = time.time() while time.time() - start_time < timeout: - response = requests.get(f"{settings.EMS_API_URL}/dags/{process.id}", auth=ems_api_auth) + response = requests.get(f"{settings.EMS_API_URL}/dags/{app.processDescription.id}", auth=ems_api_auth) data = response.json() if response.status_code == 404: pass elif data["is_paused"]: - pause_dag(settings.EMS_API_URL, process.id, ems_api_auth, pause=False) + pause_dag(settings.EMS_API_URL, app.processDescription.id, ems_api_auth, pause=False) elif data["is_active"]: break time.sleep(0.5) else: raise HTTPException( status_code=fastapi_status.HTTP_504_GATEWAY_TIMEOUT, - detail=f"Timeout waiting for DAG '{process.id}' to be available in Airflow.", + detail=f"Timeout waiting for DAG '{app.processDescription.id}' to be available in Airflow.", ) - return crud.create_process(db, process) + return crud.create_process(db, app.processDescription) @app.delete( From c4649cdddcdff7fcaad4852ea6d7277c62ef92cd Mon Sep 17 00:00:00 2001 From: Bradley Lunsford Date: Sun, 18 Aug 2024 22:21:37 -0700 Subject: [PATCH 5/9] update arbitrary dag template --- app/cwl_arbitrary_dag.py | 95 +++++++++++++++++++++++++++++----------- app/main.py | 9 ++-- 2 files changed, 74 insertions(+), 30 deletions(-) diff --git a/app/cwl_arbitrary_dag.py b/app/cwl_arbitrary_dag.py index 4229bd8..59ef0d1 100644 --- a/app/cwl_arbitrary_dag.py +++ b/app/cwl_arbitrary_dag.py @@ -22,16 +22,19 @@ def write_dag( from airflow.models.baseoperator import chain from airflow.models.param import Param from airflow.operators.python import PythonOperator, get_current_context -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.utils.trigger_rule import TriggerRule from kubernetes.client import models as k8s -from unity_sps_utils import get_affinity +from unity_sps_utils import SpsKubernetesPodOperator, get_affinity from airflow import DAG # The Kubernetes namespace within which the Pod is run (it must already exist) -POD_NAMESPACE = "airflow" +POD_NAMESPACE = "sps" POD_LABEL = "cwl_task" +SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.1.0" + +NODE_POOL_DEFAULT = "airflow-kubernetes-pod-operator" +NODE_POOL_HIGH_WORKLOAD = "airflow-kubernetes-pod-operator-high-workload" # The path of the working directory where the CWL workflow is executed # (aka the starting directory for cwl-runner). @@ -44,14 +47,21 @@ def write_dag( ) DEFAULT_CWL_ARGUMENTS = json.dumps({cwl_args}) -# Alternative arguments to execute SBG Pre-Process -# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" -# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml" - -# Alternative arguments to execute SBG end-to-end -# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl" -# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml" - +# unity_sps_sbg_debug.txt +CONTAINER_RESOURCES = k8s.V1ResourceRequirements( + requests={{ + # "cpu": "2660m", # 2.67 vCPUs, specified in milliCPUs + # "memory": "22Gi", # Rounded to 22 GiB for easier specification + "memory": "{{{{ params.request_memory }}}}", + "cpu": "{{{{ params.request_cpu }}}} ", + "ephemeral-storage": "{{{{ params.request_storage }}}} ", + }}, + # limits={{ + # # "cpu": "2660m", # Optional: set the same as requests if you want a fixed allocation + # # "memory": "22Gi", + # "ephemeral-storage": "30Gi" + # }}, +) # Default DAG configuration dag_default_args = {{ @@ -60,12 +70,9 @@ def write_dag( "start_date": datetime.utcfromtimestamp(0), }} -# common parameters -INPUT_PROCESSING_LABELS = ["label1", "label2"] - dag = DAG( - dag_id="{dag_id}", - description="{description}", + dag_id="cwl_dag", + description="CWL DAG", tags=["CWL"], is_paused_upon_creation=False, catchup=False, @@ -81,7 +88,25 @@ def write_dag( DEFAULT_CWL_ARGUMENTS, type="string", title="CWL workflow parameters", - description="The job parameters encoded as a JSON string, or the URL of a JSON or YAML file", + description=("The job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"), + ), + "request_memory": Param( + "4Gi", + type="string", + enum=["8Gi", "16Gi", "32Gi", "64Gi", "128Gi", "256Gi"], + title="Docker container memory", + ), + "request_cpu": Param( + "4", + type="string", + enum=["2", "4", "8", "16", "32"], + title="Docker container CPU", + ), + "request_storage": Param( + "10Gi", + type="string", + enum=["10Gi", "50Gi", "100Gi", "200Gi", "300Gi"], + title="Docker container storage", ), }}, ) @@ -89,7 +114,8 @@ def write_dag( def setup(ti=None, **context): """ - Task that creates the working directory on the shared volume. + Task that creates the working directory on the shared volume + and parses the input parameter values. """ context = get_current_context() dag_run_id = context["dag_run"].run_id @@ -98,25 +124,39 @@ def setup(ti=None, **context): os.makedirs(local_dir, exist_ok=True) logging.info(f"Created directory: {{local_dir}}") + # select the node pool based on what resources were requested + node_pool = NODE_POOL_DEFAULT + storage = context["params"]["request_storage"] # 100Gi + storage = int(storage[0:-2]) # 100 + memory = context["params"]["request_memory"] # 32Gi + memory = int(memory[0:-2]) # 32 + cpu = int(context["params"]["request_cpu"]) # 8 + + logging.info(f"Requesting storage={{storage}}Gi memory={{memory}}Gi CPU={{cpu}}") + if (storage > 30) or (memory > 32) or (cpu > 8): + node_pool = NODE_POOL_HIGH_WORKLOAD + logging.info(f"Selecting node pool={{node_pool}}") + ti.xcom_push(key="node_pool", value=node_pool) + setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) -cwl_task = KubernetesPodOperator( +cwl_task = SpsKubernetesPodOperator( retries=0, task_id="cwl_task", namespace=POD_NAMESPACE, name="cwl-task-pod", - image="ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0", + image=SPS_DOCKER_CWL_IMAGE, service_account_name="airflow-worker", in_cluster=True, get_logs=True, - startup_timeout_seconds=1200, - arguments=["{{{{ params.cwl_workflow }}}}", "{{{{ params.cwl_args }}}}"], + startup_timeout_seconds=1800, + arguments=["{{{{params.cwl_workflow}}}}", "{{{{params.cwl_args}}}}"], container_security_context={{"privileged": True}}, - # container_resources=CONTAINER_RESOURCES, + container_resources=CONTAINER_RESOURCES, container_logs=True, volume_mounts=[ - k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{{{ dag_run.run_id }}}}") + k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{{{dag_run.run_id}}}}") ], volumes=[ k8s.V1Volume( @@ -125,14 +165,17 @@ def setup(ti=None, **context): ) ], dag=dag, - node_selector={{"karpenter.sh/nodepool": "airflow-kubernetes-pod-operator"}}, + node_selector={{"karpenter.sh/nodepool": "{{{{ti.xcom_pull(task_ids='Setup', key='node_pool')}}}}"}}, labels={{"app": POD_LABEL}}, annotations={{"karpenter.sh/do-not-disrupt": "true"}}, + # note: 'affinity' cannot yet be templated affinity=get_affinity( capacity_type=["spot"], - instance_type=["r7i.xlarge"], + # instance_type=["t3.2xlarge"], anti_affinity_label=POD_LABEL, ), + on_finish_action="keep_pod", + is_delete_operator_pod=False, ) diff --git a/app/main.py b/app/main.py index 3b1e5b2..df9fb7e 100644 --- a/app/main.py +++ b/app/main.py @@ -284,6 +284,7 @@ def deploy_process( # Acquire lock # Create process in DB w/ deployment_status field "deploying" + # Check if request is a CWL DAG, template # Check if DAG exists in Airflow # Check if file exists in DAG folder # Check if file exists in DAG catalog @@ -296,6 +297,10 @@ def deploy_process( dag_filename = app.processDescription.id + ".py" + if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)): + # Log warning that file already exists in the deployed dags directory + pass + if app.executionUnit.type == "application/cwl": cwl_arbitrary_dag.write_dag( os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename), @@ -327,10 +332,6 @@ def deploy_process( detail=f"The process ID '{app.processDescription.id}' does not have a matching DAG file named '{dag_filename}' in the DAG catalog.\nThe DAG catalog includes the following files:\n{existing_files_str}", ) - if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)): - # Log warning that file already exists in the deployed dags directory - pass - # Copy DAG from the DAG catalog PVC to deployed PVC shutil.copy2( dag_catalog_filepath, From 17c09af4a20abefd88b20c8d6d0b96bf405d9dcc Mon Sep 17 00:00:00 2001 From: Bradley Lunsford Date: Mon, 19 Aug 2024 13:49:22 -0700 Subject: [PATCH 6/9] forgot to update dag_id and description configuration --- app/cwl_arbitrary_dag.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/cwl_arbitrary_dag.py b/app/cwl_arbitrary_dag.py index 59ef0d1..3e2e7b9 100644 --- a/app/cwl_arbitrary_dag.py +++ b/app/cwl_arbitrary_dag.py @@ -71,8 +71,8 @@ def write_dag( }} dag = DAG( - dag_id="cwl_dag", - description="CWL DAG", + dag_id="{dag_id}", + description="{description}", tags=["CWL"], is_paused_upon_creation=False, catchup=False, From 125d9206eb99ad1f010d454db871ebc2b06162b8 Mon Sep 17 00:00:00 2001 From: Bradley Lunsford Date: Mon, 26 Aug 2024 09:55:22 -0700 Subject: [PATCH 7/9] Adding exception to prevent clobbering --- app/main.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/app/main.py b/app/main.py index df9fb7e..25d5427 100644 --- a/app/main.py +++ b/app/main.py @@ -299,7 +299,10 @@ def deploy_process( if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)): # Log warning that file already exists in the deployed dags directory - pass + raise HTTPException( + status_code=fastapi_status.HTTP_409_CONFLICT, + detail=f"The process ID '{app.processDescription.id}' already has a deployed DAG.", + ) if app.executionUnit.type == "application/cwl": cwl_arbitrary_dag.write_dag( From 541949be23497c344cab6acaf2e48cf427d23390 Mon Sep 17 00:00:00 2001 From: Bradley Lunsford Date: Mon, 26 Aug 2024 11:30:56 -0700 Subject: [PATCH 8/9] trying to add additional type to executionUnit validation --- app/schemas/ogc_processes.py | 1 + 1 file changed, 1 insertion(+) diff --git a/app/schemas/ogc_processes.py b/app/schemas/ogc_processes.py index 99b104c..5d1c6eb 100644 --- a/app/schemas/ogc_processes.py +++ b/app/schemas/ogc_processes.py @@ -323,6 +323,7 @@ class Results(RootModel): class Type3(Enum): docker = "docker" oci = "oci" + catalog = "catalog" class Deployment(Enum): From f2fd4874e3ba8f07209877eb67ae41c4847dd7a4 Mon Sep 17 00:00:00 2001 From: Bradley Lunsford Date: Sun, 8 Sep 2024 21:50:20 -0700 Subject: [PATCH 9/9] pulling inputs from request also dropping cwl_workflow from the arguments, given this is provided by the request --- app/cwl_arbitrary_dag.py | 5 +---- app/main.py | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/app/cwl_arbitrary_dag.py b/app/cwl_arbitrary_dag.py index 3e2e7b9..ee688c5 100644 --- a/app/cwl_arbitrary_dag.py +++ b/app/cwl_arbitrary_dag.py @@ -81,9 +81,6 @@ def write_dag( max_active_tasks=30, default_args=dag_default_args, params={{ - "cwl_workflow": Param( - DEFAULT_CWL_WORKFLOW, type="string", title="CWL workflow", description="The CWL workflow URL" - ), "cwl_args": Param( DEFAULT_CWL_ARGUMENTS, type="string", @@ -151,7 +148,7 @@ def setup(ti=None, **context): in_cluster=True, get_logs=True, startup_timeout_seconds=1800, - arguments=["{{{{params.cwl_workflow}}}}", "{{{{params.cwl_args}}}}"], + arguments=["{{{{DEFAULT_CWL_WORKFLOW}}}}", "{{{{params.cwl_args}}}}"], container_security_context={{"privileged": True}}, container_resources=CONTAINER_RESOURCES, container_logs=True, diff --git a/app/main.py b/app/main.py index 25d5427..21d2abd 100644 --- a/app/main.py +++ b/app/main.py @@ -309,7 +309,7 @@ def deploy_process( os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename), app.processDescription.id, app.executionUnit.href, - dict(), + app.processDescription.inputs, app.processDescription.description, ) elif app.executionUnit.mediaType == "application/cwl+json": @@ -317,7 +317,7 @@ def deploy_process( os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename), app.processDescription.id, app.executionUnit.value, - dict(), + app.processDescription.inputs, app.processDescription.description, ) else: