From 1319fcb5ac75b448508eadb98e4997db72189c7c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 27 Oct 2025 13:03:53 -0700 Subject: [PATCH 1/7] Initial commit for bl531 --- config.yml | 17 ++ init_work_pools.py | 151 +++++++++++++++++ orchestration/flows/bl531/__init__.py | 0 orchestration/flows/bl531/config.py | 13 ++ orchestration/flows/bl531/dispatcher.py | 57 +++++++ orchestration/flows/bl531/move.py | 215 ++++++++++++++++++++++++ orchestration/flows/bl531/prefect.yaml | 31 ++++ 7 files changed, 484 insertions(+) create mode 100644 init_work_pools.py create mode 100644 orchestration/flows/bl531/__init__.py create mode 100644 orchestration/flows/bl531/config.py create mode 100644 orchestration/flows/bl531/dispatcher.py create mode 100644 orchestration/flows/bl531/move.py create mode 100644 orchestration/flows/bl531/prefect.yaml diff --git a/config.yml b/config.yml index e9936adc..401db1da 100644 --- a/config.yml +++ b/config.yml @@ -1,5 +1,22 @@ globus: globus_endpoints: + + # 5.3.1 ENDPOINTS + + bl531-nersc_alsdev: + root_path: /global/cfs/cdirs/als/gsharing/data_mover/531 + uri: nersc.gov + uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 + name: bl531-nersc_alsdev + + bl531-compute-dtn: + root_path: / + uri: compute-dtn.als.lbl.gov + uuid: TBD + name: bl531-compute-dtn + + # 8.3.2 ENDPOINTS + spot832: root_path: / uri: spot832.lbl.gov diff --git a/init_work_pools.py b/init_work_pools.py new file mode 100644 index 00000000..08ff007d --- /dev/null +++ b/init_work_pools.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +""" +init_work_pools.py + +Description: + Initializes Prefect work pools and deployments for the beamline defined by the BEAMLINE environment variable. + Uses orchestration/flows/bl"$BEAMLINE"/prefect.yaml as the single source of truth. + +Requirements: + - BEAMLINE must be set (e.g., 832). + - A prefect.yaml file must exist in orchestration/flows/bl"$BEAMLINE"/. + - Prefect CLI must be installed and available in PATH. + +Behavior: + - Waits until the Prefect server is reachable via its /health endpoint. + - Creates any missing work pools defined in the beamline's prefect.yaml. + - Deploys all flows defined in the beamline's prefect.yaml. + - Creates/updates Prefect Secret blocks for GLOBUS_CLIENT_ID and GLOBUS_CLIENT_SECRET + if the corresponding environment variables are present. Otherwise warns and continues. + + +Environment Variables: + BEAMLINE The beamline identifier (e.g., 832). Required. + PREFECT_API_URL Override the Prefect server API URL. + Default: http://prefect_server:4200/api +""" + +import httpx +import logging +import os +import subprocess +import sys +import time +import yaml + +from prefect.blocks.system import Secret + + +# ---------------- Logging Setup ---------------- # +logger = logging.getLogger("init_work_pools") +handler = logging.StreamHandler(sys.stdout) +formatter = logging.Formatter( + fmt="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +handler.setFormatter(formatter) +logger.addHandler(handler) +logger.setLevel(logging.INFO) +# ------------------------------------------------ # + + +def check_env() -> tuple[str, str, str]: + """Validate required environment variables and paths.""" + beamline = os.environ.get("BEAMLINE") + if not beamline: + logger.error("Must set BEAMLINE (e.g., 832, 733)") + sys.exit(1) + + prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml" + if not os.path.isfile(prefect_yaml): + logger.error(f"[Init:{beamline}] Expected {prefect_yaml} not found!") + sys.exit(1) + + api_url = os.environ.get("PREFECT_API_URL", "http://prefect_server:4200/api") + return beamline, prefect_yaml, api_url + + +def wait_for_prefect_server(api_url: str, beamline: str, timeout: int = 180): + """Wait until Prefect server health endpoint responds (unless using Prefect Cloud).""" + if "api.prefect.cloud" in api_url: + logger.info(f"[Init:{beamline}] Prefect Cloud detected — skipping health check.") + return + + health_url = f"{api_url}/health" + logger.info(f"[Init:{beamline}] Waiting for Prefect server at {health_url}...") + + start = time.time() + while time.time() - start < timeout: + try: + r = httpx.get(health_url, timeout=2.0) + if r.status_code == 200: + logger.info(f"[Init:{beamline}] Prefect server is up.") + return + except Exception: + pass + logger.warning(f"[Init:{beamline}] Still waiting...") + time.sleep(3) + + logger.error(f"[Init:{beamline}] Prefect server did not become ready in time.") + sys.exit(1) + + +def ensure_work_pools(prefect_yaml: str, beamline: str): + """Ensure that all work pools from prefect.yaml exist (create if missing).""" + with open(prefect_yaml, "r") as f: + config = yaml.safe_load(f) + + pools = {d["work_pool"]["name"] for d in config.get("deployments", []) if "work_pool" in d} + + for pool in pools: + logger.info(f"[Init:{beamline}] Ensuring pool: {pool}") + try: + subprocess.run( + ["prefect", "work-pool", "inspect", pool], + check=True, + capture_output=True, + ) + logger.info(f"[Init:{beamline}] Work pool '{pool}' already exists.") + except subprocess.CalledProcessError: + logger.info(f"[Init:{beamline}] Creating work pool: {pool}") + subprocess.run( + ["prefect", "work-pool", "create", pool, "--type", "process"], + check=True, + ) + + +def deploy_flows(prefect_yaml: str, beamline: str): + """Deploy flows defined in prefect.yaml using Prefect CLI.""" + logger.info(f"[Init:{beamline}] Deploying flows from {prefect_yaml}...") + subprocess.run( + ["prefect", "--no-prompt", "deploy", "--prefect-file", prefect_yaml, "--all"], + check=True, + ) + logger.info(f"[Init:{beamline}] Done.") + + +def ensure_globus_secrets(beamline: str): + globus_client_id = os.environ.get("GLOBUS_CLIENT_ID") + globus_client_secret = os.environ.get("GLOBUS_CLIENT_SECRET") + + if globus_client_id and globus_client_secret: + # Create or update Prefect Secret blocks for Globus credentials + try: + Secret(value=globus_client_id).save(name="globus-client-id", overwrite=True) + Secret(value=globus_client_secret).save(name="globus-client-secret", overwrite=True) + logger.info(f"[Init:{beamline}] Created/updated Prefect Secret blocks for Globus credentials.") + except Exception as e: + logger.warning(f"[Init:{beamline}] Failed to create/update Prefect Secret blocks: {str(e)}") + + +def main(): + beamline, prefect_yaml, api_url = check_env() + logger.info(f"[Init:{beamline}] Using prefect.yaml at {prefect_yaml}") + wait_for_prefect_server(api_url, beamline) + ensure_globus_secrets(beamline) + ensure_work_pools(prefect_yaml, beamline) + deploy_flows(prefect_yaml, beamline) + + +if __name__ == "__main__": + main() diff --git a/orchestration/flows/bl531/__init__.py b/orchestration/flows/bl531/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orchestration/flows/bl531/config.py b/orchestration/flows/bl531/config.py new file mode 100644 index 00000000..4f3c2ace --- /dev/null +++ b/orchestration/flows/bl531/config.py @@ -0,0 +1,13 @@ +from globus_sdk import TransferClient +from orchestration.globus import transfer + + +# TODO: Use BeamlineConfig base class (Waiting for PR #62 to be merged) +class Config531: + def __init__(self) -> None: + config = transfer.get_config() + self.endpoints = transfer.build_endpoints(config) + self.apps = transfer.build_apps(config) + self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) + self.bl531_nersc_alsdev = self.endpoints["bl531-nersc-alsdev"] + self.bl531_compute_dtn = self.endpoints["bl531-compute-dtn"] diff --git a/orchestration/flows/bl531/dispatcher.py b/orchestration/flows/bl531/dispatcher.py new file mode 100644 index 00000000..c3512269 --- /dev/null +++ b/orchestration/flows/bl531/dispatcher.py @@ -0,0 +1,57 @@ +import logging +from prefect import flow +from typing import Optional, Union, Any + +from orchestration.flows.bl531.move import process_new_531_file + +logger = logging.getLogger(__name__) + + +# TODO Once this PR (https://github.com/als-computing/splash_flows/pull/62) is merged, we can use config: Config531 +@flow(name="dispatcher", flow_run_name="dispatcher-{file_path}") +def dispatcher( + file_path: Optional[str] = None, + is_export_control: bool = False, + config: Optional[Union[dict, Any]] = None, +) -> None: + """ + Dispatcher flow for BL531 beamline that launches the new_531_file_flow. + + :param file_path: Path to the file to be processed. + :param is_export_control: Flag indicating if export control measures should be applied. + (Not used in the current BL531 processing) + :param config: Configuration settings for processing. + Expected to be an instance of Config531 or a dict that can be converted. + :raises ValueError: If no configuration is provided. + :raises TypeError: If the provided configuration is not a dict or Config531. + """ + + logger.info("Starting dispatcher flow for BL 5.3.1") + logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}") + + # Validate inputs and raise errors if necessary. The ValueErrors prevent the rest of the flow from running. + if file_path is None: + logger.error("No file_path provided to dispatcher.") + raise ValueError("File path is required for processing.") + + if is_export_control: + logger.error("Data is under export control. Processing is not allowed.") + raise ValueError("Data is under export control. Processing is not allowed.") + + if config is None: + logger.error("No configuration provided to dispatcher.") + raise ValueError("Configuration (config) is required for processing.") + + try: + process_new_531_file( + file_path=file_path, + config=config + ) + logger.info("Dispatcher flow completed successfully.") + except Exception as e: + logger.error(f"Error during processing in dispatcher flow: {e}") + raise + + +if __name__ == "__main__": + dispatcher() diff --git a/orchestration/flows/bl531/move.py b/orchestration/flows/bl531/move.py new file mode 100644 index 00000000..29e93822 --- /dev/null +++ b/orchestration/flows/bl531/move.py @@ -0,0 +1,215 @@ +import datetime +import logging +from typing import Optional + +from prefect import flow +# from prefect.blocks.system import JSON + +from orchestration.flows.bl531.config import Config531 +from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe +from orchestration.prefect import schedule_prefect_flow +from orchestration.transfer_controller import CopyMethod, get_transfer_controller + +logger = logging.getLogger(__name__) + +# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls +# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. + + +def prune( + file_path: str = None, + source_endpoint: GlobusEndpoint = None, + check_endpoint: Optional[GlobusEndpoint] = None, + days_from_now: float = 0.0, + config: Config531 = None +) -> bool: + """ + Prune (delete) data from a globus endpoint. + If days_from_now is 0, executes pruning immediately. + Otherwise, schedules pruning for future execution using Prefect. + Args: + file_path (str): The path to the file or directory to prune + source_endpoint (GlobusEndpoint): The globus endpoint containing the data + check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning + days_from_now (float): Delay before pruning; if 0, prune immediately + Returns: + bool: True if pruning was successful or scheduled successfully, False otherwise + """ + if not file_path: + logger.error("No file_path provided for pruning operation") + return False + + if not source_endpoint: + logger.error("No source_endpoint provided for pruning operation") + return False + + if not config: + config = Config531() + + if days_from_now < 0: + raise ValueError(f"Invalid days_from_now: {days_from_now}") + + # JSON blocks are deprecated, we should use what they recommend in the docs + # globus_settings = JSON.load("globus-settings").value + # max_wait_seconds = globus_settings["max_wait_seconds"] + + logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'") + + # convert float days → timedelta + delay: datetime.timedelta = datetime.timedelta(days=days_from_now) + + # If days_from_now is 0, prune immediately + if delay.total_seconds() == 0: + logger.info(f"Executing immediate pruning of '{file_path}' from '{source_endpoint.name}'") + return _prune_globus_endpoint( + relative_path=file_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=config + ) + else: + # Otherwise, schedule pruning for future execution + logger.info(f"Scheduling pruning of '{file_path}' from '{source_endpoint.name}' " + f"in {delay.total_seconds()/86400:.1f} days") + + try: + schedule_prefect_flow( + deployment_name="prune_globus_endpoint/prune_globus_endpoint", + parameters={ + "relative_path": file_path, + "source_endpoint": source_endpoint, + "check_endpoint": check_endpoint, + "config": config + }, + duration_from_now=delay, + ) + logger.info(f"Successfully scheduled pruning task for {delay.total_seconds()/86400:.1f} days from now") + return True + except Exception as e: + logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True) + return False + +# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls +# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. + + +# @staticmethod +@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{{ relative_path | basename }}") +def _prune_globus_endpoint( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: Optional[GlobusEndpoint] = None, + config: Config531 = None +) -> None: + """ + Prefect flow that performs the actual Globus endpoint pruning operation. + Args: + relative_path (str): The path of the file or directory to prune + source_endpoint (GlobusEndpoint): The Globus endpoint to prune from + check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning + config (BeamlineConfig): Configuration object with transfer client + """ + logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'") + + if not config: + config = Config531() + + # globus_settings = JSON.load("globus-settings").value + # max_wait_seconds = globus_settings["max_wait_seconds"] + max_wait_seconds = 600 + flow_name = f"prune_from_{source_endpoint.name}" + logger.info(f"Running flow: {flow_name}") + logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") + prune_one_safe( + file=relative_path, + if_older_than_days=0, + transfer_client=config.tc, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + logger=logger, + max_wait_seconds=max_wait_seconds + ) + + +@flow(name="new_531_file_flow", flow_run_name="process_new-{file_path}") +def process_new_531_file( + file_path: str, + config: Config531 +) -> None: + """ + Flow to process a new file at BL 5.3.1 + 1. Copy the file from the data531 to NERSC CFS. Ingest file path in SciCat. + 2. Schedule pruning from data531. 6 months from now. + 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. + 4. Schedule pruning from NERSC CFS. + + :param file_path: Path to the new file to be processed. + :param config: Configuration settings for processing. + """ + + logger.info(f"Processing new 531 file: {file_path}") + + if not config: + config = Config531() + + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + transfer_controller.copy( + file_path=file_path, + source=config.bl531_compute_dtn, + destination=config.bl531_nersc_alsdev + ) + + # TODO: Ingest file path in SciCat + # Waiting for PR #62 to be merged (scicat_controller) + + # Schedule pruning from QNAP + # Waiting for PR #62 to be merged (prune_controller) + # TODO: Determine scheduling days_from_now based on beamline needs + prune( + file_path=file_path, + source_endpoint=config.bl531_compute_dtn, + check_endpoint=config.bl531_nersc_alsdev, + days_from_now=180.0 + ) + + # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? + # Waiting for PR #62 to be merged (transfer_controller) + + # TODO: Ingest file path in SciCat + # Waiting for PR #62 to be merged (scicat_controller) + + +@flow(name="move_531_flight_check", flow_run_name="move_531_flight_check-{file_path}") +def move_531_flight_check( + file_path: str = "test_directory/test.txt", +): + """Please keep your arms and legs inside the vehicle at all times.""" + logger.info("531 flight check: testing transfer from data531 to NERSC CFS") + + config = Config531() + + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + success = transfer_controller.copy( + file_path=file_path, + source=config.bl531_compute_dtn, + destination=config.bl531_nersc_alsdev + ) + if success is True: + logger.info("531 flight check: transfer successful") + else: + logger.error("531 flight check: transfer failed") + + +if __name__ == "__main__": + # Example usage + config = Config531() + file_path = "test_directory/" + process_new_531_file(file_path, config) diff --git a/orchestration/flows/bl531/prefect.yaml b/orchestration/flows/bl531/prefect.yaml new file mode 100644 index 00000000..6cf35d19 --- /dev/null +++ b/orchestration/flows/bl531/prefect.yaml @@ -0,0 +1,31 @@ +name: bl531 +prefect-version: 3.4.2 +deployments: +- name: new_file_531 + entrypoint: orchestration/flows/bl531/move.py:process_new_531_file + work_pool: + name: new_file_531_pool + work_queue_name: new_file_531_queue + +- name: new_file_531_flight_check + entrypoint: orchestration/flows/bl531/move.py:move_531_flight_check + work_pool: + name: new_file_531_pool + work_queue_name: move_file_531_flight_check_queue + schedules: + - cron: "0 */12 * * *" # Every 12 hours + slug: "test-move-531-flight-check" + timezone: America/Los_Angeles + active: true + +- name: run_531_dispatcher + entrypoint: orchestration/flows/bl531/dispatcher.py:dispatcher + work_pool: + name: dispatcher_531_pool + work_queue_name: dispatcher_531_queue + +- name: prune_data531 + entrypoint: orchestration/flows/bl531/move.py:_prune_globus_endpoint + work_pool: + name: prune_531_pool + work_queue_name: prune_531_queue From d9fe1a12e276ef84c235455d2f7e1bc779f2068e Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 28 Oct 2025 16:13:13 -0700 Subject: [PATCH 2/7] Adding pytests for bl531 --- orchestration/_tests/test_bl531/__init__.py | 0 orchestration/_tests/test_bl531/test_move.py | 186 +++++++++++++++++++ 2 files changed, 186 insertions(+) create mode 100644 orchestration/_tests/test_bl531/__init__.py create mode 100644 orchestration/_tests/test_bl531/test_move.py diff --git a/orchestration/_tests/test_bl531/__init__.py b/orchestration/_tests/test_bl531/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orchestration/_tests/test_bl531/test_move.py b/orchestration/_tests/test_bl531/test_move.py new file mode 100644 index 00000000..de8f8e1d --- /dev/null +++ b/orchestration/_tests/test_bl531/test_move.py @@ -0,0 +1,186 @@ +'''Pytest unit tests for BL531 move flow. ''' + +import logging +import pytest +from uuid import uuid4 + +from prefect.testing.utilities import prefect_test_harness +from prefect.blocks.system import Secret, JSON +from pytest_mock import MockFixture + +from orchestration._tests.test_transfer_controller import MockSecret +from orchestration.flows.bl531.config import Config531 + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@pytest.fixture(autouse=True, scope="session") +def prefect_test_fixture(): + """ + A pytest fixture that automatically sets up and tears down the Prefect test harness + for the entire test session. It creates and saves test secrets and configurations + required for Globus integration. + + Yields: + None + """ + with prefect_test_harness(): + globus_client_id = Secret(value=str(uuid4())) + globus_client_id.save(name="globus-client-id", overwrite=True) + + globus_client_secret = Secret(value=str(uuid4())) + globus_client_secret.save(name="globus-client-secret", overwrite=True) + + pruning_config = JSON(value={"max_wait_seconds": 600}) + pruning_config.save(name="pruning-config", overwrite=True) + + yield + + +# ---------------------------- +# Tests for 531 +# ---------------------------- + +def test_process_new_531_file(mocker: MockFixture) -> None: + """ + Test the process_new_531_file flow from orchestration.flows.bl531.move. + + This test verifies that: + - The get_transfer_controller function is called (patched) with the correct parameters. + - The returned transfer controller's copy method is called with the expected file path, + source, and destination endpoints from the provided configuration. + + Parameters: + mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. + """ + # Import the flow to test. + from orchestration.flows.bl531.move import process_new_531_file + + # Patch the Secret.load and init_transfer_client in the configuration context. + with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): + mocker.patch( + "orchestration.flows.bl531.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a dummy TransferClient + ) + + # Instantiate the dummy configuration. + mock_config = Config531() + + # Generate a test file path. + test_file_path = f"/tmp/test_file_{uuid4()}.txt" + + # Create a mock transfer controller with a mocked 'copy' method. + mock_transfer_controller = mocker.MagicMock() + mock_transfer_controller.copy.return_value = True + + mock_prune = mocker.patch( + "orchestration.flows.bl531.move.prune", + return_value=None + ) + + # Patch get_transfer_controller where it is used in process_new_531_file. + mocker.patch( + "orchestration.flows.bl531.move.get_transfer_controller", + return_value=mock_transfer_controller + ) + + # Execute the move flow with the test file path and mock configuration. + result = process_new_531_file(file_path=test_file_path, config=mock_config) + + # Verify that the transfer controller's copy method was called exactly once. + assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert result is None, "The flow should return None" + assert mock_prune.call_count == 1, "Prune function should be called exactly once" + + # Reset mocks and test with config=None + mock_transfer_controller.copy.reset_mock() + mock_prune.reset_mock() + + result = process_new_531_file(file_path=test_file_path, config=None) + assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert result is None, "The flow should return None" + assert mock_prune.call_count == 1, "Prune function should be called exactly once" + + +def test_dispatcher_531_flow(mocker: MockFixture) -> None: + """ + Test the dispatcher flow for BL531. + + This test verifies that: + - The process_new_531_file function is called with the correct parameters + when the dispatcher flow is executed. + Parameters: + mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. + """ + # Import the dispatcher flow to test. + from orchestration.flows.bl531.dispatcher import dispatcher + + # Create a mock configuration object. + class MockConfig: + pass + + mock_config = MockConfig() + + # Generate a test file path. + test_file_path = f"/tmp/test_file_{uuid4()}.txt" + + # Patch the schedule_prefect_flow call to avoid real Prefect interaction + with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): + mocker.patch( + "orchestration.flows.bl531.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a dummy TransferClient + ) + # Patch the schedule_prefect_flow call to avoid real Prefect interaction + mocker.patch( + "orchestration.flows.bl531.move.schedule_prefect_flow", + return_value=None + ) + + # Patch the process_new_531_file function to monitor its calls. + mock_process_new_531_file = mocker.patch( + "orchestration.flows.bl531.dispatcher.process_new_531_file", + return_value=None + ) + + # Execute the dispatcher flow with test parameters. + dispatcher( + file_path=test_file_path, + is_export_control=False, + config=mock_config + ) + + # Verify that process_new_531_file was called exactly once with the expected arguments. + mock_process_new_531_file.assert_called_once_with( + file_path=test_file_path, + config=mock_config + ) + + # Verify that process_new_531_file is called even when config is None + mock_process_new_531_file.reset_mock() + dispatcher( + file_path=test_file_path, + is_export_control=False, + config=None + ) + mock_process_new_531_file.assert_called_once() + + # Test error handling for missing file_path + mock_process_new_531_file.reset_mock() + with pytest.raises(ValueError): + dispatcher( + file_path=None, + is_export_control=False, + config=mock_config + ) + mock_process_new_531_file.assert_not_called() + + # Test error handling for export control flag + mock_process_new_531_file.reset_mock() + with pytest.raises(ValueError): + dispatcher( + file_path=test_file_path, + is_export_control=True, + config=mock_config + ) + mock_process_new_531_file.assert_not_called() From 9485d87ce2f2cb5bbc913e71855f8e32bd6b334a Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 28 Oct 2025 16:13:48 -0700 Subject: [PATCH 3/7] Adjusting for prefect 3 --- orchestration/_tests/test_globus_flow.py | 14 +++++++------- orchestration/_tests/test_sfapi_flow.py | 4 ++-- orchestration/_tests/test_transfer_controller.py | 4 ++-- orchestration/flows/bl832/dispatcher.py | 2 +- requirements.txt | 2 +- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index e0823b27..0ceb94bb 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -24,26 +24,26 @@ def prefect_test_fixture(): """ with prefect_test_harness(): globus_client_id = Secret(value=str(uuid4())) - globus_client_id.save(name="globus-client-id") + globus_client_id.save(name="globus-client-id", overwrite=True) globus_client_secret = Secret(value=str(uuid4())) - globus_client_secret.save(name="globus-client-secret") + globus_client_secret.save(name="globus-client-secret", overwrite=True) globus_compute_endpoint = Secret(value=str(uuid4())) - globus_compute_endpoint.save(name="globus-compute-endpoint") + globus_compute_endpoint.save(name="globus-compute-endpoint", overwrite=True) pruning_config = JSON(value={"max_wait_seconds": 600}) - pruning_config.save(name="pruning-config") + pruning_config.save(name="pruning-config", overwrite=True) decision_settings = JSON(value={ "alcf_recon_flow/alcf_recon_flow": True, "nersc_recon_flow/nersc_recon_flow": False, "new_832_file_flow/new_file_832": False }) - decision_settings.save(name="decision-settings") + decision_settings.save(name="decision-settings", overwrite=True) alcf_allocation_root = JSON(value={"alcf-allocation-root-path": "/eagle/IRIProd/ALS"}) - alcf_allocation_root.save(name="alcf-allocation-root-path") + alcf_allocation_root.save(name="alcf-allocation-root-path", overwrite=True) yield @@ -150,7 +150,7 @@ class MockDeployment: async def mock_run_deployment(*args, **kwargs): return None - mocker.patch('prefect.deployments.deployments.run_deployment', new=mock_run_deployment) + mocker.patch('prefect.deployments.run_deployment', new=mock_run_deployment) # Mock asyncio.gather to avoid actual async task execution async def mock_gather(*args, **kwargs): diff --git a/orchestration/_tests/test_sfapi_flow.py b/orchestration/_tests/test_sfapi_flow.py index a1809686..66203d19 100644 --- a/orchestration/_tests/test_sfapi_flow.py +++ b/orchestration/_tests/test_sfapi_flow.py @@ -21,9 +21,9 @@ def prefect_test_fixture(): """ with prefect_test_harness(): globus_client_id = Secret(value=str(uuid4())) - globus_client_id.save(name="globus-client-id") + globus_client_id.save(name="globus-client-id", overwrite=True) globus_client_secret = Secret(value=str(uuid4())) - globus_client_secret.save(name="globus-client-secret") + globus_client_secret.save(name="globus-client-secret", overwrite=True) yield diff --git a/orchestration/_tests/test_transfer_controller.py b/orchestration/_tests/test_transfer_controller.py index a1cae916..95c0574a 100644 --- a/orchestration/_tests/test_transfer_controller.py +++ b/orchestration/_tests/test_transfer_controller.py @@ -26,10 +26,10 @@ def prefect_test_fixture(): with prefect_test_harness(): # Create ephemeral secrets in the local Prefect test database globus_client_id = Secret(value=str(uuid4())) - globus_client_id.save(name="globus-client-id") + globus_client_id.save(name="globus-client-id", overwrite=True) globus_client_secret = Secret(value=str(uuid4())) - globus_client_secret.save(name="globus-client-secret") + globus_client_secret.save(name="globus-client-secret", overwrite=True) yield diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index cd9da2f0..978bea6e 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -1,7 +1,7 @@ import asyncio from prefect import flow, task, get_run_logger from prefect.blocks.system import JSON -from prefect.deployments.deployments import run_deployment +from prefect.deployments import run_deployment from pydantic import BaseModel, ValidationError, Field from typing import Any, Optional, Union diff --git a/requirements.txt b/requirements.txt index 24b8e9e4..6552c650 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ numpy>=1.26.4 pillow pydantic==2.11 python-dotenv -prefect==2.20.17 +prefect==3.4.2 pyscicat pyyaml authlib From 6d825abd7de5756ec608680217be6fd7833a9cc7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 28 Oct 2025 16:14:10 -0700 Subject: [PATCH 4/7] Making config531 optional, as it is initialized within the functions if set to None --- orchestration/flows/bl531/config.py | 2 +- orchestration/flows/bl531/dispatcher.py | 5 +++-- orchestration/flows/bl531/move.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl531/config.py b/orchestration/flows/bl531/config.py index 4f3c2ace..62d819a4 100644 --- a/orchestration/flows/bl531/config.py +++ b/orchestration/flows/bl531/config.py @@ -9,5 +9,5 @@ def __init__(self) -> None: self.endpoints = transfer.build_endpoints(config) self.apps = transfer.build_apps(config) self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) - self.bl531_nersc_alsdev = self.endpoints["bl531-nersc-alsdev"] + self.bl531_nersc_alsdev = self.endpoints["bl531-nersc_alsdev"] self.bl531_compute_dtn = self.endpoints["bl531-compute-dtn"] diff --git a/orchestration/flows/bl531/dispatcher.py b/orchestration/flows/bl531/dispatcher.py index c3512269..c12a1f23 100644 --- a/orchestration/flows/bl531/dispatcher.py +++ b/orchestration/flows/bl531/dispatcher.py @@ -3,6 +3,7 @@ from typing import Optional, Union, Any from orchestration.flows.bl531.move import process_new_531_file +from orchestration.flows.bl531.config import Config531 logger = logging.getLogger(__name__) @@ -39,8 +40,8 @@ def dispatcher( raise ValueError("Data is under export control. Processing is not allowed.") if config is None: - logger.error("No configuration provided to dispatcher.") - raise ValueError("Configuration (config) is required for processing.") + config = Config531() + logger.info("No config provided. Using default Config531.") try: process_new_531_file( diff --git a/orchestration/flows/bl531/move.py b/orchestration/flows/bl531/move.py index 29e93822..e0f13245 100644 --- a/orchestration/flows/bl531/move.py +++ b/orchestration/flows/bl531/move.py @@ -134,7 +134,7 @@ def _prune_globus_endpoint( @flow(name="new_531_file_flow", flow_run_name="process_new-{file_path}") def process_new_531_file( file_path: str, - config: Config531 + config: Optional[Config531] = None ) -> None: """ Flow to process a new file at BL 5.3.1 From 5958bf77a35edcac0b46738b9ce0c498f724fac9 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 29 Oct 2025 10:14:27 -0700 Subject: [PATCH 5/7] Updating Dockerfile to pull prefect 3.4.2 --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index d409f98d..a89516cb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM prefecthq/prefect:2.20.17-python3.11 +FROM prefecthq/prefect:3.4.2-python3.13 WORKDIR /app COPY ./requirements.txt /tmp/ From a1703d96fbbff3b27859ac645df021ac3c743222 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Thu, 6 Nov 2025 14:53:45 -0800 Subject: [PATCH 6/7] Making the move flow call the move task, and updating dispatcher/tests/prefect.yaml to reflect that --- orchestration/_tests/test_bl531/test_move.py | 38 ++++++++++---------- orchestration/flows/bl531/dispatcher.py | 8 ++--- orchestration/flows/bl531/move.py | 23 ++++++------ orchestration/flows/bl531/prefect.yaml | 4 +-- 4 files changed, 36 insertions(+), 37 deletions(-) diff --git a/orchestration/_tests/test_bl531/test_move.py b/orchestration/_tests/test_bl531/test_move.py index de8f8e1d..a1d717e0 100644 --- a/orchestration/_tests/test_bl531/test_move.py +++ b/orchestration/_tests/test_bl531/test_move.py @@ -42,9 +42,9 @@ def prefect_test_fixture(): # Tests for 531 # ---------------------------- -def test_process_new_531_file(mocker: MockFixture) -> None: +def test_process_new_531_file_task(mocker: MockFixture) -> None: """ - Test the process_new_531_file flow from orchestration.flows.bl531.move. + Test the process_new_531_file_task flow from orchestration.flows.bl531.move. This test verifies that: - The get_transfer_controller function is called (patched) with the correct parameters. @@ -55,7 +55,7 @@ def test_process_new_531_file(mocker: MockFixture) -> None: mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. """ # Import the flow to test. - from orchestration.flows.bl531.move import process_new_531_file + from orchestration.flows.bl531.move import process_new_531_file_task # Patch the Secret.load and init_transfer_client in the configuration context. with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): @@ -79,14 +79,14 @@ def test_process_new_531_file(mocker: MockFixture) -> None: return_value=None ) - # Patch get_transfer_controller where it is used in process_new_531_file. + # Patch get_transfer_controller where it is used in process_new_531_file_task. mocker.patch( "orchestration.flows.bl531.move.get_transfer_controller", return_value=mock_transfer_controller ) # Execute the move flow with the test file path and mock configuration. - result = process_new_531_file(file_path=test_file_path, config=mock_config) + result = process_new_531_file_task(file_path=test_file_path, config=mock_config) # Verify that the transfer controller's copy method was called exactly once. assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" @@ -97,7 +97,7 @@ def test_process_new_531_file(mocker: MockFixture) -> None: mock_transfer_controller.copy.reset_mock() mock_prune.reset_mock() - result = process_new_531_file(file_path=test_file_path, config=None) + result = process_new_531_file_task(file_path=test_file_path, config=None) assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" assert result is None, "The flow should return None" assert mock_prune.call_count == 1, "Prune function should be called exactly once" @@ -108,7 +108,7 @@ def test_dispatcher_531_flow(mocker: MockFixture) -> None: Test the dispatcher flow for BL531. This test verifies that: - - The process_new_531_file function is called with the correct parameters + - The process_new_531_file_task function is called with the correct parameters when the dispatcher flow is executed. Parameters: mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. @@ -137,9 +137,9 @@ class MockConfig: return_value=None ) - # Patch the process_new_531_file function to monitor its calls. - mock_process_new_531_file = mocker.patch( - "orchestration.flows.bl531.dispatcher.process_new_531_file", + # Patch the process_new_531_file_task function to monitor its calls. + mock_process_new_531_file_task = mocker.patch( + "orchestration.flows.bl531.dispatcher.process_new_531_file_task", return_value=None ) @@ -150,37 +150,37 @@ class MockConfig: config=mock_config ) - # Verify that process_new_531_file was called exactly once with the expected arguments. - mock_process_new_531_file.assert_called_once_with( + # Verify that process_new_531_file_task was called exactly once with the expected arguments. + mock_process_new_531_file_task.assert_called_once_with( file_path=test_file_path, config=mock_config ) - # Verify that process_new_531_file is called even when config is None - mock_process_new_531_file.reset_mock() + # Verify that process_new_531_file_task_task is called even when config is None + mock_process_new_531_file_task.reset_mock() dispatcher( file_path=test_file_path, is_export_control=False, config=None ) - mock_process_new_531_file.assert_called_once() + mock_process_new_531_file_task.assert_called_once() # Test error handling for missing file_path - mock_process_new_531_file.reset_mock() + mock_process_new_531_file_task.reset_mock() with pytest.raises(ValueError): dispatcher( file_path=None, is_export_control=False, config=mock_config ) - mock_process_new_531_file.assert_not_called() + mock_process_new_531_file_task.assert_not_called() # Test error handling for export control flag - mock_process_new_531_file.reset_mock() + mock_process_new_531_file_task.reset_mock() with pytest.raises(ValueError): dispatcher( file_path=test_file_path, is_export_control=True, config=mock_config ) - mock_process_new_531_file.assert_not_called() + mock_process_new_531_file_task.assert_not_called() diff --git a/orchestration/flows/bl531/dispatcher.py b/orchestration/flows/bl531/dispatcher.py index c12a1f23..1f482377 100644 --- a/orchestration/flows/bl531/dispatcher.py +++ b/orchestration/flows/bl531/dispatcher.py @@ -2,7 +2,7 @@ from prefect import flow from typing import Optional, Union, Any -from orchestration.flows.bl531.move import process_new_531_file +from orchestration.flows.bl531.move import process_new_531_file_task from orchestration.flows.bl531.config import Config531 logger = logging.getLogger(__name__) @@ -44,7 +44,7 @@ def dispatcher( logger.info("No config provided. Using default Config531.") try: - process_new_531_file( + process_new_531_file_task( file_path=file_path, config=config ) @@ -52,7 +52,3 @@ def dispatcher( except Exception as e: logger.error(f"Error during processing in dispatcher flow: {e}") raise - - -if __name__ == "__main__": - dispatcher() diff --git a/orchestration/flows/bl531/move.py b/orchestration/flows/bl531/move.py index e0f13245..6bb7f7f8 100644 --- a/orchestration/flows/bl531/move.py +++ b/orchestration/flows/bl531/move.py @@ -2,8 +2,7 @@ import logging from typing import Optional -from prefect import flow -# from prefect.blocks.system import JSON +from prefect import flow, task from orchestration.flows.bl531.config import Config531 from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe @@ -132,7 +131,18 @@ def _prune_globus_endpoint( @flow(name="new_531_file_flow", flow_run_name="process_new-{file_path}") -def process_new_531_file( +def process_new_531_file_flow( + file_path: str, + config: Optional[Config531] = None +) -> None: + process_new_531_file_task( + file_path=file_path, + config=config + ) + + +@task(name="new_531_file_task") +def process_new_531_file_task( file_path: str, config: Optional[Config531] = None ) -> None: @@ -206,10 +216,3 @@ def move_531_flight_check( logger.info("531 flight check: transfer successful") else: logger.error("531 flight check: transfer failed") - - -if __name__ == "__main__": - # Example usage - config = Config531() - file_path = "test_directory/" - process_new_531_file(file_path, config) diff --git a/orchestration/flows/bl531/prefect.yaml b/orchestration/flows/bl531/prefect.yaml index 6cf35d19..7aac68c2 100644 --- a/orchestration/flows/bl531/prefect.yaml +++ b/orchestration/flows/bl531/prefect.yaml @@ -1,8 +1,8 @@ name: bl531 prefect-version: 3.4.2 deployments: -- name: new_file_531 - entrypoint: orchestration/flows/bl531/move.py:process_new_531_file +- name: new_file_531_flow + entrypoint: orchestration/flows/bl531/move.py:process_new_531_file_flow work_pool: name: new_file_531_pool work_queue_name: new_file_531_queue From 80f3bf9f5e4a84a022c3c2e6b54a332d15922c2c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Thu, 6 Nov 2025 14:54:32 -0800 Subject: [PATCH 7/7] Fixing the prune flow_run_name --- orchestration/flows/bl531/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl531/move.py b/orchestration/flows/bl531/move.py index 6bb7f7f8..5b7bc52e 100644 --- a/orchestration/flows/bl531/move.py +++ b/orchestration/flows/bl531/move.py @@ -93,7 +93,7 @@ def prune( # @staticmethod -@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{{ relative_path | basename }}") +@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{relative_path}") def _prune_globus_endpoint( relative_path: str, source_endpoint: GlobusEndpoint,