Skip to content

[Design]: CWL Job Submission in DiracX #858

@ryuwd

Description

@ryuwd

This task/design documents the first steps towards integrating CWL into DiracX.

cc @aldbr

Status: Draft for Discussion

Context: The Three-Level CWL Model

DIRAC CWL workflows operate at three levels, determined by which hints are present (not by CWL class type):

Level DIRAC Concept CWL Hint Determines
Production Production/Request dirac:Production Input dataset sourcing, orchestrates transformations
Transformation Transformation dirac:Transformation Job template — grouping, input queries
Job Job dirac:Job (new, replaces dirac:Scheduling + dirac:ExecutionHooks) Single execution: scheduling, I/O, hooks

A CWL Workflow with dirac:Production creates Transformations from its steps. Each Transformation is a job template that creates many Jobs. The dirac:Job hint lives at the Job level — it tells DIRAC how to schedule and manage a single execution.

This design focuses on the Job level: the dirac:Job hint and the diracX submission endpoint.

Problem Statement

At the moment in DIRAC, CWL jobs are submitted to DIRAC by:

  1. Setting Executable = "dirac-cwl-exec" in the JDL
  2. Shipping the CWL definition (job.json) as an InputSandbox file
  3. At runtime on the worker node, __createCWLJobWrapper clones the dirac-cwl repo, installs pixi, downloads the sandbox, and runs the CWL

This has several problems:

  • The CWL workflow definition is opaque to DIRAC — it's just a sandbox blob, not queryable or inspectable
  • dirac:Scheduling and dirac:ExecutionHooks are separate hints that duplicate concepts already present in JDL (Site, Priority, OutputSandbox, etc.)
  • The existing convert_to_jdl() in submission_clients.py only maps ~40% of available JDL fields
  • There is no native diracX API for CWL submission — everything goes through the JDL path
  • The worker-node shim git clones dirac-cwl and installs pixi on every job

Goals

  1. Unified dirac:Job hint — replace dirac:Scheduling and dirac:ExecutionHooks with a single versioned hint
  2. Dedicated workflows table — store CWL definitions once, content-addressed by SHA-256; jobs reference a workflow, not embed it
  3. job_workflow_params table — per-job parameters stored separately, lightweight; 10k parametric jobs = 1 workflow row + 10k param rows
  4. New diracX endpointPOST /api/jobs/ accepts CWL + input YAML(s) directly
  5. Models in diracXJobSubmissionModel, JobHint, and related types live in diracX (migrated from dirac-cwl)
  6. No git clone on worker nodes — dirac-cwl is installed in the diracX environment; job wrapper is accessed via importlib.resources
  7. Fail fast — strict validation of all CWL ID references, types, and hint fields at submission time

Design

1. The dirac:Job Hint

Design principles

  1. Use standard CWL where possible — don't duplicate what CWL already provides natively via requirements
  2. Execution hooks are not user-configured — they are determined automatically by type; the submitter doesn't choose them
  3. Derive what you canjob_name from CWL label/id, processors from ResourceRequirement, etc.
  4. Reference CWL I/O by source ID — instead of duplicating file lists, use source: to point to CWL input/output IDs
  5. Versioned schema — the hint carries a schema_version to enable forward-compatible evolution

What CWL already provides (via requirements)

These standard CWL constructs map directly to JDL fields without needing a dirac:Job field:

CWL Requirement CWL Field JDL Equivalent Notes
ResourceRequirement coresMin MinNumberOfProcessors
ResourceRequirement coresMax MaxNumberOfProcessors
ResourceRequirement ramMin MinRAM
ResourceRequirement ramMax MaxRAM
ToolTimeLimit timelimit Wall-clock seconds; see CPUTime
DockerRequirement dockerPull Container support TBD (unrelated to Platform)
CUDARequirement (presence) Tags: ["GPU"] Implies GPU tag
MPIRequirement (presence) Not supported — raises NotImplementedError
(CWL task) label or id JobName Derived automatically

CPUTime and CPU work

DIRAC's CPUTime is normalized CPU work in HS06-seconds (wall_time * CPUNormalizationFactor), not wall-clock time. CWL's ToolTimeLimit is wall-clock seconds. These are fundamentally different units.

Approach: The dirac:Job hint provides an explicit cpu_work field representing normalized HS06-seconds — the same unit DIRAC uses internally. This avoids ambiguity:

  • cpu_work in dirac:Job → maps directly to JDL CPUTime (HS06-seconds)
  • ToolTimeLimit (if present) → used by cwltool for local execution; not translated to CPUTime

The normalization factor itself can be calculated by DB12 on the worker node. Users who think in wall-clock terms can compute cpu_work = wall_seconds * estimated_HS06_factor.

What dirac:Job adds (DIRAC-specific, no CWL equivalent)

cwlVersion: v1.2
class: CommandLineTool

requirements:
  - class: ResourceRequirement
    coresMin: 1
    coresMax: 4
    ramMin: 2048        # MB

hints:
  - class: dirac:Job
    schema_version: "1.0"

    # --- Scheduling ---
    priority: 5
    cpu_work: 864000              # HS06-seconds (= CPUTime in JDL)
    platform: "x86_64-el9"
    sites:
      - LCG.CERN.cern
      - LCG.IN2P3.fr
    banned_sites:
      - LCG.RAL.uk
    tags: ["GPU"]                 # additional tags beyond auto-derived ones

    # --- Job metadata ---
    type: "User"                  # determines execution hooks automatically
    group: "lhcb_analysis"
    log_level: "INFO"

    # --- I/O: reference CWL inputs/outputs by source ID ---
    input_sandbox:
      - source: helper_script               # CWL input ID (type: File) → job root
      - source: config_files                # CWL input ID (type: File[])
        path: "conf/"                       # relative to job working directory
    input_data:
      - source: input_lfns                  # CWL input ID (type: File[])
    output_sandbox:
      - source: stderr_log                  # CWL output ID
    output_data:
      - source: result_file                 # CWL output ID
        output_path: "/lhcb/user/r/roneil/output/"
        output_se: ["SE-USER"]
      - source: histogram                   # CWL output ID
        output_path: "/lhcb/user/r/roneil/histos/"
        output_se: ["SE-AUXILIARY"]

label: "my-analysis-job"   # → becomes JobName

inputs:
  - id: helper_script
    type: File
  - id: config_files
    type: File[]
  - id: input_lfns
    type: File[]
  - id: config_param
    type: string

outputs:
  - id: result_file
    type: File
    outputBinding:
      glob: "result.root"
  - id: histogram
    type: File
    outputBinding:
      glob: "histos.root"
  - id: stderr_log
    type: File
    outputBinding:
      glob: "std.err"

$namespaces:
  dirac: "schemas/dirac-metadata.json#/$defs/"

Note: no baseCommand — the executor is always the dirac-cwl runner. The CWL task defines what to run; DIRAC handles how to run it.

Key design decisions

type instead of job_type: Shorter, cleaner, and consistent with CWL's own class: convention. Maps to JDL JobType. Determines execution hooks automatically (see Execution hooks).

Sites: sites + banned_sites as flat lists:

# Run only at these sites:
sites:
  - LCG.CERN.cern
  - LCG.IN2P3.fr

# Exclude specific sites:
banned_sites:
  - LCG.RAL.uk

DIRAC computes the effective set as Sites - BannedSites. If sites is omitted or empty, the job can run anywhere (equivalent to Site = ANY). Both fields are optional flat lists — simple to read and write, no nesting required. The semantics mirror DIRAC's native model directly.

I/O by CWL source ID using CWL-idiomatic source: syntax:

Each I/O entry uses source: to reference a CWL input or output by its id, mirroring CWL's own outputSource convention:

  • input_sandbox: [{source: helper_script}] — the CWL input with id: helper_script (must be type: File or File[]) will be uploaded to the DIRAC sandbox store and delivered to the worker node. An optional path: specifies a relative directory within the job working directory (e.g., path: "conf/" places the file(s) in <job_root>/conf/). If omitted, files land in the job root
  • input_data: [{source: input_lfns}] — the CWL input with id: input_lfns will be resolved as LFN paths and registered as InputData in the JDL for data-driven scheduling
  • output_sandbox: [{source: stderr_log}] — the CWL output with id: stderr_log will be uploaded to the sandbox store after execution
  • output_data: [{source: result_file, output_path: "/lhcb/...", output_se: ["SE-USER"]}] — the CWL output with id: result_file will be registered in the file catalog at the given LFN path, on the specified storage element(s)

The source: syntax is:

  • CWL-idiomatic — consistent with how CWL references inputs/outputs elsewhere
  • Extensible — per-entry metadata (like output_se, output_path, path) lives alongside the source reference
  • Per-output SE — each output_data entry specifies its own output_se, allowing different outputs to go to different storage elements (e.g., large data to tape, small histograms to disk)

All referenced IDs are strictly validated at submission time — the translation layer verifies that each source ID exists in the CWL task's inputs/outputs and has a compatible type (File or File[]). Invalid references fail the submission immediately.

Schema versioning: Every dirac:Job hint must carry a schema_version field. This enables the system to:

  • Reject hints with unsupported versions
  • Evolve the schema without breaking existing workflows
  • Provide clear error messages when a workflow targets a newer schema version

Field mapping summary

Source Field JDL Equivalent In dirac:Job?
CWL label/id (auto) JobName No — derived
ResourceRequirement.coresMin (auto) MinNumberOfProcessors No — CWL native
ResourceRequirement.coresMax (auto) MaxNumberOfProcessors No — CWL native
ResourceRequirement.ramMin (auto) MinRAM No — CWL native
ResourceRequirement.ramMax (auto) MaxRAM No — CWL native
CUDARequirement (auto) Tags += ["GPU"] No — CWL native
MPIRequirement NotImplementedError
dirac:Job schema_version Yes — required
dirac:Job cpu_work CPUTime Yes — HS06-seconds
dirac:Job priority Priority Yes
dirac:Job platform Platform Yes
dirac:Job sites Site Yes
dirac:Job banned_sites BannedSites Yes
dirac:Job tags Tags (merged with auto) Yes
dirac:Job type JobType Yes
dirac:Job group JobGroup Yes
dirac:Job log_level LogLevel Yes
dirac:Job input_sandbox[].source InputSandbox Yes — CWL input IDs
dirac:Job input_sandbox[].path (worker-side) Yes — relative directory
dirac:Job input_data[].source InputData Yes — CWL input IDs
dirac:Job output_sandbox[].source OutputSandbox Yes — CWL output IDs
dirac:Job output_data[].source OutputData Yes — CWL output ID
dirac:Job output_data[].output_path OutputPath Yes — per-output LFN path
dirac:Job output_data[].output_se OutputSE Yes — per-output SE list
(system) Executable dirac-cwl-exec No — always set
(system) Owner, OwnerGroup, VO (from auth) No — injected
(system) Status, MinorStatus (managed) No
(system) JobID (auto) No

Execution hooks: automatic, not user-configured

Execution hooks are derived from type:

  • type: "User"QueryBasedPlugin (default)
  • type: "MCSimulation" → VO-specific simulation plugin
  • etc.

The hook plugin registry (currently in dirac-cwl, eventually migrated to diracX as entrypoints) handles discovery by VO and type. The dirac:Job hint does not expose hook_plugin or hook_config — these are internal to the system.

output_data (with per-output output_se and output_path) and output_sandbox remain in dirac:Job because they are user-specified data management choices, not hook configuration.

2. Storage Model: workflows + job_workflow_params

Instead of embedding CWL into each JDL row (which would duplicate the CWL blob across thousands of parametric jobs), CWL definitions are stored once in a dedicated table, and jobs reference them.

Schema

CREATE TABLE workflows (
    workflow_id  CHAR(64) PRIMARY KEY,  -- SHA-256 of the CWL content
    cwl          MEDIUMTEXT NOT NULL,    -- CWL YAML (original, uncompressed)
    persistent   BOOL NOT NULL DEFAULT FALSE,
    created_at   DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- New columns on the existing Jobs table:
ALTER TABLE Jobs
    ADD COLUMN workflow_id      CHAR(64) DEFAULT NULL,     -- FK → workflows.workflow_id
    ADD COLUMN workflow_params  JSON DEFAULT NULL,          -- immutable per-job input parameters
    ADD FOREIGN KEY (workflow_id) REFERENCES workflows(workflow_id);

workflow_params is an immutable JSON column — once set at job creation, it is never updated. It holds the per-job CWL input parameters (the content of an input YAML). Non-CWL jobs leave both columns NULL.

How it works

  1. Workflow insertion — on submission, the CWL content is SHA-256 hashed. If the hash already exists in workflows, the insert is skipped (content-addressed, immutable). Workflows are never edited — a changed CWL produces a new hash.

  2. Job creation — each job row in Jobs gets a workflow_id reference and its own workflow_params JSON. This is where per-job variation lives — co-located with the job, no extra join needed.

  3. Parametric jobs — submitting 10k jobs with the same CWL but different parameters produces:

    • 1 row in workflows (insert-if-not-exists)
    • 10k rows in Jobs with the same workflow_id but different workflow_params (lightweight JSON)
  4. persistent flag — controls cleanup behavior:

    • persistent = FALSE (default): ad-hoc user jobs; workflow row can be cleaned up when no jobs reference it
    • persistent = TRUE: production/transformation workflows; retained indefinitely
  5. Worker-side retrieval — the job wrapper fetches the CWL via diracX API using the workflow_id from its Jobs row, and reads input parameters from workflow_params. No sandbox involved for the workflow definition.

Parameter mapping via dirac:Job hint

The dirac:Job hint tells DIRAC which CWL inputs should be promoted to job-level parameters visible to the scheduler:

hints:
  - class: dirac:Job
    schema_version: "1.0"
    type: User
    # ... scheduling fields ...

    # Which CWL inputs become job-visible parameters
    input_data:
      - source: input_lfns      # CWL input ID → resolved to InputData for scheduling
    input_sandbox:
      - source: helper_script   # CWL input ID → files uploaded to sandbox

At submission time, the translation layer reads these mappings to populate JDL fields (for the transition period) or job attributes (post-JDL) from the per-job parameters.

Why not CWL-in-JDL?

Concern CWL-in-JDL (previous) workflows table (current)
10k parametric jobs 10k copies of compressed CWL 1 workflow row + 10k param rows
Storage ~16MB CWL blob per JDL row CWL stored once
Queryability Opaque base64 blob CWL stored as readable YAML
Immutability Mutable (JDL can be updated) Content-addressed, immutable
Cleanup Tied to JDL lifecycle Independent lifecycle via persistent flag

3. New diracX Endpoint: POST /api/jobs/

Request format

The endpoint accepts a CWL workflow file plus one or more input YAML files. Each input YAML produces a separate job:

POST /api/jobs/
Content-Type: multipart/form-data

workflow: <wf.cwl>           # CWL workflow/tool definition (YAML)
inputs[]: <input1.yaml>      # Input parameters for job 1
inputs[]: <input2.yaml>      # Input parameters for job 2

This produces 2 jobs:

  • Job 1: run wf.cwl with input1.yaml
  • Job 2: run wf.cwl with input2.yaml

If no input files are provided, a single job is created with no inputs (suitable for tools with no required inputs or all defaults).

Translation flow

POST /api/jobs/
  │
  ▼
Router (diracx-routers)
  │  Parses multipart: CWL YAML + input YAML(s)
  │  Validates CWL via JobSubmissionModel (pydantic)
  │  Validates schema_version
  ▼
Logic (diracx-logic)
  │  SHA-256 hash CWL → INSERT INTO workflows IF NOT EXISTS
  │  For each input YAML:
  │    Extracts dirac:Job hint from CWL task
  │    Extracts ResourceRequirement, CUDARequirement, etc.
  │    Derives JobName from CWL label/id
  │    Resolves I/O: source IDs → file paths/LFNs (strict validation)
  │    Maps all → JDL fields (transition period)
  │    Calls existing submit_jdl_jobs() with generated JDL
  │    Sets workflow_id + workflow_params (JSON) on Jobs row
  ▼
DB
  │  CWL stored once in workflows table
  │  JDL stored in JobJDLs (transition period)
  │  Job attrs + workflow_id + workflow_params in Jobs table
  ▼
Returns list[InsertedJob]

During the transition period, JDL is still generated for compatibility with existing DIRAC infrastructure (matcher, optimizer, etc.). The workflows table + workflow_params column are the source of truth for the CWL definition and per-job parameters. Once JDL is fully retired, the JDL generation step is removed.

Translation logic (new functions in diracx-logic)

import hashlib
import json

from cwl_utils.parser import save
from cwl_utils.parser.cwl_v1_2 import (
    ResourceRequirement, CUDARequirement, MPIRequirement,
)


SUPPORTED_SCHEMA_VERSIONS = {"1.0"}


def compute_workflow_id(cwl_yaml: str) -> str:
    """Content-address a CWL workflow by its SHA-256 hash."""
    return hashlib.sha256(cwl_yaml.encode()).hexdigest()


async def submit_cwl_jobs(
    cwl_yaml: str,
    input_yamls: list[str],
    db: JobDB,
) -> list[InsertedJob]:
    """Submit CWL jobs: store workflow once, create one job per input YAML."""
    workflow_id = compute_workflow_id(cwl_yaml)

    # INSERT IF NOT EXISTS — idempotent, content-addressed
    await db.insert_workflow(workflow_id, cwl_yaml, persistent=False)

    task = parse_cwl(cwl_yaml)
    job_hint = JobHint.from_cwl(task)

    if job_hint.schema_version not in SUPPORTED_SCHEMA_VERSIONS:
        raise ValueError(
            f"Unsupported dirac:Job schema_version '{job_hint.schema_version}'. "
            f"Supported: {SUPPORTED_SCHEMA_VERSIONS}"
        )

    inserted = []
    for input_yaml in input_yamls:
        inputs = parse_inputs(input_yaml) if input_yaml else None
        workflow_params = json.loads(input_yaml) if input_yaml else None

        # Generate JDL for transition period
        jdl = cwl_to_jdl(task, job_hint, inputs)

        # Submit via existing pipeline
        jobs = await submit_jdl_jobs([jdl])

        # Set workflow reference + immutable params on job row
        for job in jobs:
            await db.set_workflow_ref(
                job.job_id,
                workflow_id=workflow_id,
                workflow_params=workflow_params,
            )
        inserted.extend(jobs)

    return inserted


def cwl_to_jdl(
    task: CommandLineTool | Workflow | ExpressionTool,
    job_hint: JobHint,
    inputs: JobInputModel | None,
) -> str:
    """Convert a CWL task with dirac:Job hint into a JDL string.

    This is a transition-period function — once JDL is retired,
    job attributes are populated directly from the hint + CWL.
    """
    jdl_fields = {
        "Executable": "dirac-cwl-exec",
        "JobType": job_hint.type,
        "Priority": job_hint.priority,
        "LogLevel": job_hint.log_level,
    }

    if job_hint.cpu_work:
        jdl_fields["CPUTime"] = job_hint.cpu_work
    if job_hint.platform:
        jdl_fields["Platform"] = job_hint.platform

    # Derive JobName from CWL label/id
    task_label = getattr(task, "label", None)
    task_id = getattr(task, "id", None)
    if task_label:
        jdl_fields["JobName"] = task_label
    elif task_id and task_id != ".":
        jdl_fields["JobName"] = task_id.split("#")[-1].split("/")[-1]

    # Extract from CWL requirements (standard CWL, not dirac:Job)
    tags = set(job_hint.tags or [])
    for req in (getattr(task, "requirements", None) or []):
        if isinstance(req, ResourceRequirement):
            if req.coresMin:
                jdl_fields["MinNumberOfProcessors"] = int(req.coresMin)
            if req.coresMax:
                jdl_fields["MaxNumberOfProcessors"] = int(req.coresMax)
            if req.ramMin:
                jdl_fields["MinRAM"] = int(req.ramMin)
            if req.ramMax:
                jdl_fields["MaxRAM"] = int(req.ramMax)
        elif isinstance(req, CUDARequirement):
            tags.add("GPU")
        elif isinstance(req, MPIRequirement):
            raise NotImplementedError(
                "MPIRequirement is not yet supported for DIRAC CWL jobs"
            )

    # Auto-derive processor tags
    min_proc = jdl_fields.get("MinNumberOfProcessors", 1)
    max_proc = jdl_fields.get("MaxNumberOfProcessors")
    if min_proc and min_proc > 1:
        tags.add("MultiProcessor")
    if min_proc and max_proc and min_proc == max_proc:
        tags.add(f"{min_proc}Processors")

    if tags:
        jdl_fields["Tags"] = list(tags)

    # Sites
    if job_hint.sites:
        jdl_fields["Site"] = job_hint.sites
    if job_hint.banned_sites:
        jdl_fields["BannedSites"] = job_hint.banned_sites

    if job_hint.group:
        jdl_fields["JobGroup"] = job_hint.group

    # Resolve I/O from CWL input/output source IDs
    cwl_input_ids = {_extract_id(inp.id): inp for inp in (task.inputs or [])}
    cwl_output_ids = {_extract_id(out.id): out for out in (task.outputs or [])}

    # InputSandbox
    if job_hint.input_sandbox:
        sandbox_files = []
        for ref in job_hint.input_sandbox:
            _validate_cwl_id(ref.source, cwl_input_ids, "input", ["File", "File[]"])
            if inputs and ref.source in inputs.cwl:
                sandbox_files.extend(_extract_file_paths(inputs.cwl[ref.source]))
        if sandbox_files:
            jdl_fields["InputSandbox"] = sandbox_files

    # InputData
    if job_hint.input_data:
        lfns = []
        for ref in job_hint.input_data:
            _validate_cwl_id(ref.source, cwl_input_ids, "input", ["File", "File[]"])
            if inputs and ref.source in inputs.cwl:
                lfns.extend(_extract_lfn_paths(inputs.cwl[ref.source]))
        if lfns:
            jdl_fields["InputData"] = lfns

    # OutputSandbox
    if job_hint.output_sandbox:
        sandbox_outputs = []
        for ref in job_hint.output_sandbox:
            _validate_cwl_id(ref.source, cwl_output_ids, "output", ["File", "File[]"])
            out = cwl_output_ids[ref.source]
            if hasattr(out, "outputBinding") and out.outputBinding:
                sandbox_outputs.append(out.outputBinding.glob)
        if sandbox_outputs:
            jdl_fields["OutputSandbox"] = sandbox_outputs

    # OutputData (per-output SE and path)
    if job_hint.output_data:
        output_files = []
        all_ses = set()
        for entry in job_hint.output_data:
            _validate_cwl_id(entry.source, cwl_output_ids, "output", ["File", "File[]"])
            out = cwl_output_ids[entry.source]
            if hasattr(out, "outputBinding") and out.outputBinding:
                output_files.append(out.outputBinding.glob)
            all_ses.update(entry.output_se)
        if output_files:
            jdl_fields["OutputData"] = output_files
            jdl_fields["OutputPath"] = job_hint.output_data[0].output_path
            jdl_fields["OutputSE"] = list(all_ses)

    return format_as_jdl(jdl_fields)


def _extract_id(cwl_id: str) -> str:
    """Extract short ID from CWL full URI (e.g., 'file.cwl#input1' → 'input1')."""
    return cwl_id.split("#")[-1].split("/")[-1]

4. Changes to DIRAC Worker-Side Execution

Current flow (__createCWLJobWrapper):

git clone dirac-cwl → install pixi → download sandbox (gets job.json) → run wrapper

New flow:

Fetch CWL from workflows table (via diracX API) → read workflow_params from Jobs row → write job.json → run job wrapper (via importlib.resources)

Since dirac-cwl is installed as a package in the diracX environment, the job wrapper template is accessed via importlib.resources — no git clone or pixi install needed.

Changes in __createCWLJobWrapper in Utils.py:

  1. Accept jobParams as a parameter (already available in createJobWrapper)
  2. Fetch CWL definition from diracX API using workflow_id from the job
  3. Read workflow_params (per-job input parameters) from the job
  4. Write CWL + params to local files (task.cwl, params.json)
  5. Remove the git clone, pixi install, and dirac-wms-job-get-input steps
  6. Load the job wrapper template via importlib.resources.files("dirac_cwl.job")
  7. InputSandbox is still used for actual input files — just not for the workflow definition

5. Pydantic Models (in diracX)

These models live in diracX (migrated from dirac-cwl). The old SchedulingHint and ExecutionHooksHint classes are removed — there is no backward compatibility layer.

class IOSource(BaseModel):
    """Reference to a CWL input or output by its ID."""
    source: str             # CWL input/output ID
    path: str | None = None # relative path within job working directory (input_sandbox only)


class OutputDataEntry(BaseModel):
    """Output data entry with per-output SE and LFN path."""
    source: str                    # CWL output ID
    output_path: str               # LFN destination path
    output_se: list[str] = ["SE-USER"]


class JobHint(BaseModel, Hint):
    """Unified DIRAC-specific hint for job scheduling and I/O.

    Resource requirements (cores, RAM) are expressed via standard CWL
    requirements, not in this hint.

    Execution hooks are determined automatically by `type`, not
    configured by the submitter.

    I/O fields reference CWL input/output IDs via `source:` syntax,
    consistent with CWL's own referencing conventions.
    """
    schema_version: str  # required, e.g. "1.0"

    # Scheduling (DIRAC-specific, no CWL equivalent)
    priority: int = 5
    cpu_work: int | None = None   # HS06-seconds → JDL CPUTime
    platform: str | None = None
    sites: list[str] | None = None
    banned_sites: list[str] | None = None
    tags: list[str] | None = None  # merged with auto-derived tags

    # Job metadata
    type: str = "User"
    group: str = ""
    log_level: str = "INFO"

    # I/O: reference CWL input/output IDs via source:
    input_sandbox: list[IOSource] = []
    input_data: list[IOSource] = []
    output_sandbox: list[IOSource] = []
    output_data: list[OutputDataEntry] = []

    @classmethod
    def from_cwl(cls, cwl_object) -> "JobHint":
        hints = getattr(cwl_object, "hints", []) or []
        for hint in hints:
            if hint.get("class") == "dirac:Job":
                data = {k: v for k, v in hint.items() if k != "class"}
                return cls(**data)
        raise ValueError("CWL task is missing required dirac:Job hint")

6. Summary of Changes by Repository

diracx (primary)

Change Location
JobHint, IOSource, OutputDataEntry models diracx-core/src/diracx/core/models/
JobSubmissionModel, JobInputModel models diracx-core/src/diracx/core/models/
workflows table schema diracx-db/src/diracx/db/sql/job/schema.py
workflow_id + workflow_params columns on Jobs table diracx-db/src/diracx/db/sql/job/schema.py
New router POST /api/jobs/ (multipart CWL + inputs) diracx-routers/src/diracx/routers/jobs/submission.py
GET /api/workflows/{workflow_id} endpoint diracx-routers/src/diracx/routers/jobs/
submit_cwl_jobs() + cwl_to_jdl() logic diracx-logic/src/diracx/logic/jobs/submission.py
Job wrapper template (migrated from dirac-cwl) diracx-logic/src/diracx/logic/jobs/
dirac-cwl as dependency pyproject.toml

dirac-cwl

Change Location
Remove SchedulingHint + ExecutionHooksHint execution_hooks/core.py
Remove convert_to_jdl() job/submission_clients.py
Update JobWrapper.run_job() to resolve hooks from type job/job_wrapper.py
Update schema with Job definition (versioned) schemas/dirac-metadata.json
Models migrated to diracX — remove from dirac-cwl submission_models.py

DIRAC

Change Location
Modify __createCWLJobWrapper to fetch CWL via diracX API + read workflow_params WorkloadManagementSystem/Utilities/Utils.py
Remove git clone + pixi install from bash wrapper Same file
Load job wrapper via importlib.resources Same file

7. Migration Path

Phase 1 (this work):

  • Implement workflows table and workflow_id/workflow_params columns on Jobs
  • Implement dirac:Job hint and models in diracX
  • Implement POST /api/jobs/ endpoint + GET /api/workflows/{workflow_id}
  • Implement cwl_to_jdl() transition shim
  • Modify DIRAC __createCWLJobWrapper — remove git clone, use importlib.resources, fetch CWL from API
  • Remove old hints from dirac-cwl

Phase 2 (future, per production-plugin-system.md):

  • dirac:Transformation hint → transformation submission endpoint
  • dirac:Production hint → production orchestration endpoint
  • Migrate execution hooks plugin registry to diracX entrypoints

8. Decisions Made

# Decision Rationale
D1 Models live in diracX, not dirac-cwl Natural migration path; dirac-cwl components follow
D2 Endpoint accepts raw CWL YAML + input YAMLs User-friendly; wf.cwl + input1.yaml + input2.yaml → N jobs
D3 Endpoint is POST /api/jobs/ (not /api/jobs/cwl) CWL becomes the primary submission format
D4 Platform is CPU architecture (unrelated to containers) Container support is a separate discussion
D5 No git clone on worker nodes dirac-cwl installed in diracX; wrapper via importlib.resources
D6 Hook registry starts in dirac-cwl, migrates to diracX entrypoints Incremental migration
D7 cpu_work (HS06-seconds) in hint; ToolTimeLimit for local cwltool only Avoids unit ambiguity; DB12 calculates normalization factor
D8 Strict I/O validation — fail fast at submission Prevents wasting CPU on 10^3-10^5 jobs with bad references
D9 No backward compatibility with old hints Nothing in production; simplifies implementation
D10 Hints carry schema_version Forward-compatible evolution
D11 CWL stored in workflows table, not embedded in JDL Content-addressed (SHA-256), immutable; 10k parametric jobs share 1 workflow row
D12 Per-job params as immutable JSON column on Jobs table Co-located with job, no extra join; immutable once set
D13 JDL generation is a transition-period shim, not the target workflows + workflow_params are source of truth; JDL generated for existing DIRAC infra compatibility

9. Open Questions

  1. ToolTimeLimit in cwl_utils: Need to verify cwl_utils parses it. If available, it can be used for local execution wall-clock limits alongside cpu_work for DIRAC scheduling. To be investigated.

  2. Container support: How to run containerized jobs within DIRAC. Unrelated to Platform (which is CPU architecture). Separate design needed.

  3. Multipart API design: Exact multipart field naming and how to handle optional inputs (no input YAML = single job with defaults). Also: should the endpoint support JSON as an alternative to YAML?

  4. Input YAML templating: The endpoint naturally supports wf.cwl + N input YAMLs → N jobs. Future extension could support templating (e.g., parameter sweeps) — to be designed separately.

  5. Workflow cleanup policy: When persistent = FALSE, what triggers cleanup of orphaned workflow rows? Options: periodic GC that checks for referencing jobs, TTL-based expiry, or tied to existing job cleanup routines.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions