From bac02a4a906753a5718e99f233565c6f8724f500 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 10 Sep 2025 11:35:37 -0700 Subject: [PATCH 01/28] init commit for bl 9.3.1 --- orchestration/flows/bl931/__init__.py | 0 orchestration/flows/bl931/config.py | 14 ++ orchestration/flows/bl931/dispatcher.py | 57 +++++++ orchestration/flows/bl931/move.py | 215 ++++++++++++++++++++++++ orchestration/flows/bl931/prefect.yaml | 31 ++++ 5 files changed, 317 insertions(+) create mode 100644 orchestration/flows/bl931/__init__.py create mode 100644 orchestration/flows/bl931/config.py create mode 100644 orchestration/flows/bl931/dispatcher.py create mode 100644 orchestration/flows/bl931/move.py create mode 100644 orchestration/flows/bl931/prefect.yaml diff --git a/orchestration/flows/bl931/__init__.py b/orchestration/flows/bl931/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orchestration/flows/bl931/config.py b/orchestration/flows/bl931/config.py new file mode 100644 index 00000000..8db548bf --- /dev/null +++ b/orchestration/flows/bl931/config.py @@ -0,0 +1,14 @@ +from globus_sdk import TransferClient +from orchestration.globus import transfer + + +# TODO: Use BeamlineConfig base class (Waiting for PR #62 to be merged) +class Config931: + def __init__(self) -> None: + config = transfer.get_config() + self.endpoints = transfer.build_endpoints(config) + self.apps = transfer.build_apps(config) + self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) + self.data931 = self.endpoints["data931"] + self.data931_raw = self.endpoints["data931_raw"] + self.nersc931_alsdev_raw = self.endpoints["nersc931_alsdev_raw"] diff --git a/orchestration/flows/bl931/dispatcher.py b/orchestration/flows/bl931/dispatcher.py new file mode 100644 index 00000000..3a1bb46f --- /dev/null +++ b/orchestration/flows/bl931/dispatcher.py @@ -0,0 +1,57 @@ +import logging +from prefect import flow +from typing import Optional, Union, Any + +from orchestration.flows.bl931.move import process_new_931_file + +logger = logging.getLogger(__name__) + + +# TODO Once this PR (https://github.com/als-computing/splash_flows/pull/62) is merged, we can use config: Config931 +@flow(name="dispatcher", flow_run_name="dispatcher-{file_path}") +def dispatcher( + file_path: Optional[str] = None, + is_export_control: bool = False, + config: Optional[Union[dict, Any]] = None, +) -> None: + """ + Dispatcher flow for BL931 beamline that launches the new_931_file_flow. + + :param file_path: Path to the file to be processed. + :param is_export_control: Flag indicating if export control measures should be applied. + (Not used in the current BL931 processing) + :param config: Configuration settings for processing. + Expected to be an instance of Config931 or a dict that can be converted. + :raises ValueError: If no configuration is provided. + :raises TypeError: If the provided configuration is not a dict or Config931. + """ + + logger.info("Starting dispatcher flow for BL 9.3.1") + logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}") + + # Validate inputs and raise errors if necessary. The ValueErrors prevent the rest of the flow from running. + if file_path is None: + logger.error("No file_path provided to dispatcher.") + raise ValueError("File path is required for processing.") + + if is_export_control: + logger.error("Data is under export control. Processing is not allowed.") + raise ValueError("Data is under export control. Processing is not allowed.") + + if config is None: + logger.error("No configuration provided to dispatcher.") + raise ValueError("Configuration (config) is required for processing.") + + try: + process_new_931_file( + file_path=file_path, + config=config + ) + logger.info("Dispatcher flow completed successfully.") + except Exception as e: + logger.error(f"Error during processing in dispatcher flow: {e}") + raise + + +if __name__ == "__main__": + dispatcher() diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py new file mode 100644 index 00000000..e2df737e --- /dev/null +++ b/orchestration/flows/bl931/move.py @@ -0,0 +1,215 @@ +import datetime +import logging +from typing import Optional + +from prefect import flow +# from prefect.blocks.system import JSON + +from orchestration.flows.bl931.config import Config931 +from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe +from orchestration.prefect import schedule_prefect_flow +from orchestration.transfer_controller import CopyMethod, get_transfer_controller + +logger = logging.getLogger(__name__) + +# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls +# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. + + +def prune( + file_path: str = None, + source_endpoint: GlobusEndpoint = None, + check_endpoint: Optional[GlobusEndpoint] = None, + days_from_now: float = 0.0, + config: Config931 = None +) -> bool: + """ + Prune (delete) data from a globus endpoint. + If days_from_now is 0, executes pruning immediately. + Otherwise, schedules pruning for future execution using Prefect. + Args: + file_path (str): The path to the file or directory to prune + source_endpoint (GlobusEndpoint): The globus endpoint containing the data + check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning + days_from_now (float): Delay before pruning; if 0, prune immediately + Returns: + bool: True if pruning was successful or scheduled successfully, False otherwise + """ + if not file_path: + logger.error("No file_path provided for pruning operation") + return False + + if not source_endpoint: + logger.error("No source_endpoint provided for pruning operation") + return False + + if not config: + config = Config931() + + if days_from_now < 0: + raise ValueError(f"Invalid days_from_now: {days_from_now}") + + # JSON blocks are deprecated, we should use what they recommend in the docs + # globus_settings = JSON.load("globus-settings").value + # max_wait_seconds = globus_settings["max_wait_seconds"] + + logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'") + + # convert float days → timedelta + delay: datetime.timedelta = datetime.timedelta(days=days_from_now) + + # If days_from_now is 0, prune immediately + if delay.total_seconds() == 0: + logger.info(f"Executing immediate pruning of '{file_path}' from '{source_endpoint.name}'") + return _prune_globus_endpoint( + relative_path=file_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=config + ) + else: + # Otherwise, schedule pruning for future execution + logger.info(f"Scheduling pruning of '{file_path}' from '{source_endpoint.name}' " + f"in {delay.total_seconds()/86400:.1f} days") + + try: + schedule_prefect_flow( + deployment_name="prune_globus_endpoint/prune_globus_endpoint", + parameters={ + "relative_path": file_path, + "source_endpoint": source_endpoint, + "check_endpoint": check_endpoint, + "config": config + }, + duration_from_now=delay, + ) + logger.info(f"Successfully scheduled pruning task for {delay.total_seconds()/86400:.1f} days from now") + return True + except Exception as e: + logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True) + return False + +# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls +# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. + + +# @staticmethod +@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{{ relative_path | basename }}") +def _prune_globus_endpoint( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: Optional[GlobusEndpoint] = None, + config: Config931 = None +) -> None: + """ + Prefect flow that performs the actual Globus endpoint pruning operation. + Args: + relative_path (str): The path of the file or directory to prune + source_endpoint (GlobusEndpoint): The Globus endpoint to prune from + check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning + config (BeamlineConfig): Configuration object with transfer client + """ + logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'") + + if not config: + config = Config931() + + # globus_settings = JSON.load("globus-settings").value + # max_wait_seconds = globus_settings["max_wait_seconds"] + max_wait_seconds = 600 + flow_name = f"prune_from_{source_endpoint.name}" + logger.info(f"Running flow: {flow_name}") + logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") + prune_one_safe( + file=relative_path, + if_older_than_days=0, + transfer_client=config.tc, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + logger=logger, + max_wait_seconds=max_wait_seconds + ) + + +@flow(name="new_931_file_flow", flow_run_name="process_new-{file_path}") +def process_new_931_file( + file_path: str, + config: Config931 +) -> None: + """ + Flow to process a new file at BL 9.3.1 + 1. Copy the file from the data931 to NERSC CFS. Ingest file path in SciCat. + 2. Schedule pruning from data931. 6 months from now. + 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. + 4. Schedule pruning from NERSC CFS. + + :param file_path: Path to the new file to be processed. + :param config: Configuration settings for processing. + """ + + logger.info(f"Processing new 931 file: {file_path}") + + if not config: + config = Config931() + + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + transfer_controller.copy( + file_path=file_path, + source=config.data931_raw, + destination=config.nersc931_alsdev_raw + ) + + # TODO: Ingest file path in SciCat + # Waiting for PR #62 to be merged (scicat_controller) + + # Schedule pruning from QNAP + # Waiting for PR #62 to be merged (prune_controller) + # TODO: Determine scheduling days_from_now based on beamline needs + prune( + file_path=file_path, + source_endpoint=config.data931_raw, + check_endpoint=config.nersc931_alsdev_raw, + days_from_now=180.0 # determine appropriate value: currently 6 months + ) + + # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? + # Waiting for PR #62 to be merged (transfer_controller) + + # TODO: Ingest file path in SciCat + # Waiting for PR #62 to be merged (scicat_controller) + + +@flow(name="move_931_flight_check", flow_run_name="move_931_flight_check-{file_path}") +def move_931_flight_check( + file_path: str = "test_directory/test.txt", +): + """Please keep your arms and legs inside the vehicle at all times.""" + logger.info("931 flight check: testing transfer from data931 to NERSC CFS") + + config = Config931() + + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + success = transfer_controller.copy( + file_path=file_path, + source=config.data931_raw, + destination=config.nersc931_alsdev_raw + ) + if success is True: + logger.info("931 flight check: transfer successful") + else: + logger.error("931 flight check: transfer failed") + + +if __name__ == "__main__": + # Example usage + config = Config931() + file_path = "test_directory/" + process_new_931_file(file_path, config) diff --git a/orchestration/flows/bl931/prefect.yaml b/orchestration/flows/bl931/prefect.yaml new file mode 100644 index 00000000..b97a3e75 --- /dev/null +++ b/orchestration/flows/bl931/prefect.yaml @@ -0,0 +1,31 @@ +name: bl931 +prefect-version: 3.4.2 +deployments: +- name: new_file_931 + entrypoint: orchestration/flows/bl931/move.py:process_new_931_file + work_pool: + name: new_file_931_pool + work_queue_name: new_file_931_queue + +- name: new_file_931_flight_check + entrypoint: orchestration/flows/bl931/move.py:move_931_flight_check + work_pool: + name: new_file_931_pool + work_queue_name: move_file_931_flight_check_queue + schedules: + - cron: "0 */12 * * *" # Every 12 hours + slug: "test-move-931-flight-check" + timezone: America/Los_Angeles + active: true + +- name: run_931_dispatcher + entrypoint: orchestration/flows/bl931/dispatcher.py:dispatcher + work_pool: + name: dispatcher_931_pool + work_queue_name: dispatcher_931_queue + +- name: prune_data931 + entrypoint: orchestration/flows/bl931/move.py:_prune_globus_endpoint + work_pool: + name: prune_931_pool + work_queue_name: prune_931_queue From 373810699dd25e80ef4f68c9a62b7241b17bd23b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 27 Oct 2025 13:15:58 -0700 Subject: [PATCH 02/28] Adjusting 9.3.1 endpoint configuration to use the compute-dtn globus collection --- config.yml | 13 +++++++++++++ orchestration/flows/bl931/config.py | 5 ++--- orchestration/flows/bl931/move.py | 8 ++++---- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/config.yml b/config.yml index d8d990a4..af5548b6 100644 --- a/config.yml +++ b/config.yml @@ -52,6 +52,19 @@ globus: uri: lamarr.als.lbl.gov uuid: aee983fc-826e-4081-bfb2-62529970540d name: bl733-lamarr-beamlines + # 9.3.1 ENDPOINTS + + bl931-compute-dtn: + root_path: / + uri: compute-dtn.als.lbl.gov + uuid: 23af478e-d459-4e78-9753-5091b5fb432a + name: bl931-compute-dtn + + bl931-nersc_alsdev_raw: + root_path: /global/cfs/cdirs/als/data_mover/9.3.1/raw + uri: nersc.gov + uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 + name: bl931-nersc_alsdev_raw # 8.3.2 ENDPOINTS diff --git a/orchestration/flows/bl931/config.py b/orchestration/flows/bl931/config.py index 8db548bf..47370cad 100644 --- a/orchestration/flows/bl931/config.py +++ b/orchestration/flows/bl931/config.py @@ -9,6 +9,5 @@ def __init__(self) -> None: self.endpoints = transfer.build_endpoints(config) self.apps = transfer.build_apps(config) self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) - self.data931 = self.endpoints["data931"] - self.data931_raw = self.endpoints["data931_raw"] - self.nersc931_alsdev_raw = self.endpoints["nersc931_alsdev_raw"] + self.bl931_compute_dtn = self.endpoints["bl931-compute-dtn"] + self.bl931_nersc_alsdev_raw = self.endpoints["bl931-nersc_alsdev_raw"] diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index e2df737e..57b25476 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -159,8 +159,8 @@ def process_new_931_file( transfer_controller.copy( file_path=file_path, - source=config.data931_raw, - destination=config.nersc931_alsdev_raw + source=config.bl931_compute_dtn, + destination=config.bl931_nersc_alsdev_raw ) # TODO: Ingest file path in SciCat @@ -171,8 +171,8 @@ def process_new_931_file( # TODO: Determine scheduling days_from_now based on beamline needs prune( file_path=file_path, - source_endpoint=config.data931_raw, - check_endpoint=config.nersc931_alsdev_raw, + source_endpoint=config.bl931_compute_dtn, + check_endpoint=config.bl931_nersc_alsdev_raw, days_from_now=180.0 # determine appropriate value: currently 6 months ) From 8d6e8340c9d8d0d1915fd5647e95e212d38e581c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 28 Oct 2025 15:24:44 -0700 Subject: [PATCH 03/28] Adding pytests for bl931 flows --- orchestration/_tests/test_bl931/__init__.py | 0 orchestration/_tests/test_bl931/test_move.py | 191 +++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 orchestration/_tests/test_bl931/__init__.py create mode 100644 orchestration/_tests/test_bl931/test_move.py diff --git a/orchestration/_tests/test_bl931/__init__.py b/orchestration/_tests/test_bl931/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orchestration/_tests/test_bl931/test_move.py b/orchestration/_tests/test_bl931/test_move.py new file mode 100644 index 00000000..1e325431 --- /dev/null +++ b/orchestration/_tests/test_bl931/test_move.py @@ -0,0 +1,191 @@ +'''Pytest unit tests for BL931 move flow. ''' + +import logging +import pytest +from uuid import uuid4 + +from prefect.testing.utilities import prefect_test_harness +from prefect.blocks.system import Secret, JSON +from pytest_mock import MockFixture + +from orchestration._tests.test_transfer_controller import MockSecret +from orchestration.flows.bl931.config import Config931 + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@pytest.fixture(autouse=True, scope="session") +def prefect_test_fixture(): + """ + A pytest fixture that automatically sets up and tears down the Prefect test harness + for the entire test session. It creates and saves test secrets and configurations + required for Globus integration. + + Yields: + None + """ + with prefect_test_harness(): + globus_client_id = Secret(value=str(uuid4())) + globus_client_id.save(name="globus-client-id", overwrite=True) + + globus_client_secret = Secret(value=str(uuid4())) + globus_client_secret.save(name="globus-client-secret", overwrite=True) + + pruning_config = JSON(value={"max_wait_seconds": 600}) + pruning_config.save(name="pruning-config", overwrite=True) + + yield + + +# ---------------------------- +# Tests for 931 +# ---------------------------- + +def test_process_new_931_file(mocker: MockFixture) -> None: + """ + Test the process_new_931_file flow from orchestration.flows.bl931.move. + + This test verifies that: + - The get_transfer_controller function is called (patched) with the correct parameters. + - The returned transfer controller's copy method is called with the expected file path, + source, and destination endpoints from the provided configuration. + + Parameters: + mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. + """ + # Import the flow to test. + from orchestration.flows.bl931.move import process_new_931_file + + # Patch the Secret.load and init_transfer_client in the configuration context. + with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): + mocker.patch( + "orchestration.flows.bl931.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a dummy TransferClient + ) + # Patch the schedule_prefect_flow call to avoid real Prefect interaction + mocker.patch( + "orchestration.flows.bl931.move.schedule_prefect_flow", + return_value=None + ) + + # Instantiate the dummy configuration. + mock_config = Config931() + + # Generate a test file path. + test_file_path = f"/tmp/test_file_{uuid4()}.txt" + + # Create a mock transfer controller with a mocked 'copy' method. + mock_transfer_controller = mocker.MagicMock() + mock_transfer_controller.copy.return_value = True + + mock_prune = mocker.patch( + "orchestration.flows.bl931.move.prune", + return_value=None + ) + + # Patch get_transfer_controller where it is used in process_new_931_file. + mocker.patch( + "orchestration.flows.bl931.move.get_transfer_controller", + return_value=mock_transfer_controller + ) + + # Execute the move flow with the test file path and mock configuration. + result = process_new_931_file(file_path=test_file_path, config=mock_config) + + # Verify that the transfer controller's copy method was called exactly once. + assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert result is None, "The flow should return None" + assert mock_prune.call_count == 1, "Prune function should be called exactly once" + + # Reset mocks and test with config=None + mock_transfer_controller.copy.reset_mock() + mock_prune.reset_mock() + + result = process_new_931_file(file_path=test_file_path, config=None) + assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert result is None, "The flow should return None" + assert mock_prune.call_count == 1, "Prune function should be called exactly once" + + +def test_dispatcher_931_flow(mocker: MockFixture) -> None: + """ + Test the dispatcher flow for BL931. + + This test verifies that: + - The process_new_931_file function is called with the correct parameters + when the dispatcher flow is executed. + Parameters: + mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. + """ + # Import the dispatcher flow to test. + from orchestration.flows.bl931.dispatcher import dispatcher + + # Create a mock configuration object. + class MockConfig: + pass + + mock_config = MockConfig() + + # Generate a test file path. + test_file_path = f"/tmp/test_file_{uuid4()}.txt" + + # Patch the schedule_prefect_flow call to avoid real Prefect interaction + with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): + mocker.patch( + "orchestration.flows.bl931.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a dummy TransferClient + ) + # Patch the schedule_prefect_flow call to avoid real Prefect interaction + mocker.patch( + "orchestration.flows.bl931.move.schedule_prefect_flow", + return_value=None + ) + + # Patch the process_new_931_file function to monitor its calls. + mock_process_new_931_file = mocker.patch( + "orchestration.flows.bl931.dispatcher.process_new_931_file", + return_value=None + ) + + # Execute the dispatcher flow with test parameters. + dispatcher( + file_path=test_file_path, + is_export_control=False, + config=mock_config + ) + + # Verify that process_new_931_file was called exactly once with the expected arguments. + mock_process_new_931_file.assert_called_once_with( + file_path=test_file_path, + config=mock_config + ) + + # Verify that process_new_931_file is called even when config is None + mock_process_new_931_file.reset_mock() + dispatcher( + file_path=test_file_path, + is_export_control=False, + config=None + ) + mock_process_new_931_file.assert_called_once() + + # Test error handling for missing file_path + mock_process_new_931_file.reset_mock() + with pytest.raises(ValueError): + dispatcher( + file_path=None, + is_export_control=False, + config=mock_config + ) + mock_process_new_931_file.assert_not_called() + + # Test error handling for export control flag + mock_process_new_931_file.reset_mock() + with pytest.raises(ValueError): + dispatcher( + file_path=test_file_path, + is_export_control=True, + config=mock_config + ) + mock_process_new_931_file.assert_not_called() From 9b35bd3ad0da2ae8df2bf6b73d45b1f2f6bc4585 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 28 Oct 2025 15:25:22 -0700 Subject: [PATCH 04/28] Making Config931 optional, since it is initialized within the methods if it is set to None --- orchestration/flows/bl931/dispatcher.py | 5 +++-- orchestration/flows/bl931/move.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/orchestration/flows/bl931/dispatcher.py b/orchestration/flows/bl931/dispatcher.py index 3a1bb46f..a478caa0 100644 --- a/orchestration/flows/bl931/dispatcher.py +++ b/orchestration/flows/bl931/dispatcher.py @@ -2,6 +2,7 @@ from prefect import flow from typing import Optional, Union, Any +from orchestration.flows.bl931.config import Config931 from orchestration.flows.bl931.move import process_new_931_file logger = logging.getLogger(__name__) @@ -39,8 +40,8 @@ def dispatcher( raise ValueError("Data is under export control. Processing is not allowed.") if config is None: - logger.error("No configuration provided to dispatcher.") - raise ValueError("Configuration (config) is required for processing.") + logger.info("No config provided, initializing default Config931.") + config = Config931() try: process_new_931_file( diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 57b25476..b8c7e355 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -134,7 +134,7 @@ def _prune_globus_endpoint( @flow(name="new_931_file_flow", flow_run_name="process_new-{file_path}") def process_new_931_file( file_path: str, - config: Config931 + config: Optional[Config931] = None ) -> None: """ Flow to process a new file at BL 9.3.1 From 4da35bfb2a8626c9b7a9bba7712355f0b2ce051b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 28 Oct 2025 16:41:01 -0700 Subject: [PATCH 05/28] Adding documentation for bl 9.3.1 flows --- docs/mkdocs/docs/bl931.md | 38 ++++++++++++++++++++++++++++++++++++++ docs/mkdocs/mkdocs.yml | 6 ++++-- 2 files changed, 42 insertions(+), 2 deletions(-) create mode 100644 docs/mkdocs/docs/bl931.md diff --git a/docs/mkdocs/docs/bl931.md b/docs/mkdocs/docs/bl931.md new file mode 100644 index 00000000..7df5834f --- /dev/null +++ b/docs/mkdocs/docs/bl931.md @@ -0,0 +1,38 @@ +# Beamline 9.3.1 Flows + +This page documents the workflows supported by Splash Flows at [ALS Beamline 9.3.1 (Tender X-ray Spectroscopy)](https://als.lbl.gov/beamlines/9-3-1/). + +## Data at 9.3.1 + +At Beamline 9.3.1, users generate data in an HDF5 format containing a background subtracted stack of 2D images with associated Labview metadata. Depending on the experiment, the file sizes can be greater than 100GB. A ROI is exported for each dataset. + +## File Watcher + +There is a file watcher on the acquisition system that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps: +- Copy scans in real time from a Globus collection on the `compute-dtn` server to `NERSC CFS` using Globus Transfer. +- Copy project data to `NERSC HPSS` for long-term storage (TBD). +- Analysis on HPC systems (TBD). +- Ingest into SciCat (TBD). +- Schedule data pruning from `compute-dtn` and `NERSC CFS`. + +## Prefect Configuration + +### Registered Flows + +#### `dispatcher.py` + +The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. Once a new file is written, the `dispatcher()` Flow is called. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code). + +#### `move.py` + +Flow to process a new file at BL 9.3.1 +1. Copy the file from `compute-dtn` to `NERSC CFS` and ingest the file path and metadata into SciCat. +2. Schedule pruning from `compute-dtn`. +3. Copy the file from `NERSC CFS` to `NERSC HPSS`. Ingest the archived file path in SciCat. +4. Schedule pruning from `NERSC CFS`. + +## VM Details + +The computing backend runs on a VM in the B15 server room that is managed by ALS IT staff. + +`flow-931.als.lbl.gov` \ No newline at end of file diff --git a/docs/mkdocs/mkdocs.yml b/docs/mkdocs/mkdocs.yml index 05e6a2d2..3ed99823 100644 --- a/docs/mkdocs/mkdocs.yml +++ b/docs/mkdocs/mkdocs.yml @@ -15,8 +15,10 @@ nav: - Getting Started: getting_started.md - Beamline Implementations: - 7.3.3 SAXS/WAXS/GISAXS: bl733.md - - 8.3.2 Micro Tomography - Compute at ALCF: alcf832.md - - 8.3.2 Micro Tomography - Compute at NERSC: nersc832.md + - Beamline 8.3.2 - Microtomography: + - Compute at ALCF: alcf832.md + - Compute at NERSC: nersc832.md + - Beamline 9.3.1 - Tender X-ray Spectroscopy: bl931.md - Orchestration: orchestration.md - Configuration: configuration.md # - Troubleshooting: troubleshooting.md From 19128a871af9742e0c8fe1be79f19622629e08c5 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 28 Oct 2025 16:43:29 -0700 Subject: [PATCH 06/28] changing the data description in the documentation --- docs/mkdocs/docs/bl931.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/mkdocs/docs/bl931.md b/docs/mkdocs/docs/bl931.md index 7df5834f..4e300e5a 100644 --- a/docs/mkdocs/docs/bl931.md +++ b/docs/mkdocs/docs/bl931.md @@ -4,7 +4,7 @@ This page documents the workflows supported by Splash Flows at [ALS Beamline 9.3 ## Data at 9.3.1 -At Beamline 9.3.1, users generate data in an HDF5 format containing a background subtracted stack of 2D images with associated Labview metadata. Depending on the experiment, the file sizes can be greater than 100GB. A ROI is exported for each dataset. +Generates spectroscopy data. ## File Watcher From 62fd03589b9a41e4f097e6ef8e0c9a29fb81e129 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Thu, 6 Nov 2025 15:38:48 -0800 Subject: [PATCH 07/28] Making the move flow call the move task, and updating the pytest/dispatcher/prefect.yaml reflect the change --- orchestration/_tests/test_bl931/test_move.py | 36 ++++++++++---------- orchestration/flows/bl931/dispatcher.py | 8 ++--- orchestration/flows/bl931/move.py | 24 +++++++------ orchestration/flows/bl931/prefect.yaml | 4 +-- 4 files changed, 36 insertions(+), 36 deletions(-) diff --git a/orchestration/_tests/test_bl931/test_move.py b/orchestration/_tests/test_bl931/test_move.py index 1e325431..240b1213 100644 --- a/orchestration/_tests/test_bl931/test_move.py +++ b/orchestration/_tests/test_bl931/test_move.py @@ -42,7 +42,7 @@ def prefect_test_fixture(): # Tests for 931 # ---------------------------- -def test_process_new_931_file(mocker: MockFixture) -> None: +def test_process_new_931_file_task(mocker: MockFixture) -> None: """ Test the process_new_931_file flow from orchestration.flows.bl931.move. @@ -55,7 +55,7 @@ def test_process_new_931_file(mocker: MockFixture) -> None: mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. """ # Import the flow to test. - from orchestration.flows.bl931.move import process_new_931_file + from orchestration.flows.bl931.move import process_new_931_file_task # Patch the Secret.load and init_transfer_client in the configuration context. with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): @@ -84,14 +84,14 @@ def test_process_new_931_file(mocker: MockFixture) -> None: return_value=None ) - # Patch get_transfer_controller where it is used in process_new_931_file. + # Patch get_transfer_controller where it is used in process_new_931_file_task. mocker.patch( "orchestration.flows.bl931.move.get_transfer_controller", return_value=mock_transfer_controller ) # Execute the move flow with the test file path and mock configuration. - result = process_new_931_file(file_path=test_file_path, config=mock_config) + result = process_new_931_file_task(file_path=test_file_path, config=mock_config) # Verify that the transfer controller's copy method was called exactly once. assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" @@ -102,7 +102,7 @@ def test_process_new_931_file(mocker: MockFixture) -> None: mock_transfer_controller.copy.reset_mock() mock_prune.reset_mock() - result = process_new_931_file(file_path=test_file_path, config=None) + result = process_new_931_file_task(file_path=test_file_path, config=None) assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" assert result is None, "The flow should return None" assert mock_prune.call_count == 1, "Prune function should be called exactly once" @@ -113,7 +113,7 @@ def test_dispatcher_931_flow(mocker: MockFixture) -> None: Test the dispatcher flow for BL931. This test verifies that: - - The process_new_931_file function is called with the correct parameters + - The process_new_931_file_task function is called with the correct parameters when the dispatcher flow is executed. Parameters: mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. @@ -142,9 +142,9 @@ class MockConfig: return_value=None ) - # Patch the process_new_931_file function to monitor its calls. - mock_process_new_931_file = mocker.patch( - "orchestration.flows.bl931.dispatcher.process_new_931_file", + # Patch the process_new_931_file_task function to monitor its calls. + mock_process_new_931_file_task = mocker.patch( + "orchestration.flows.bl931.dispatcher.process_new_931_file_task", return_value=None ) @@ -155,37 +155,37 @@ class MockConfig: config=mock_config ) - # Verify that process_new_931_file was called exactly once with the expected arguments. - mock_process_new_931_file.assert_called_once_with( + # Verify that process_new_931_file_task was called exactly once with the expected arguments. + mock_process_new_931_file_task.assert_called_once_with( file_path=test_file_path, config=mock_config ) - # Verify that process_new_931_file is called even when config is None - mock_process_new_931_file.reset_mock() + # Verify that process_new_931_file_task is called even when config is None + mock_process_new_931_file_task.reset_mock() dispatcher( file_path=test_file_path, is_export_control=False, config=None ) - mock_process_new_931_file.assert_called_once() + mock_process_new_931_file_task.assert_called_once() # Test error handling for missing file_path - mock_process_new_931_file.reset_mock() + mock_process_new_931_file_task.reset_mock() with pytest.raises(ValueError): dispatcher( file_path=None, is_export_control=False, config=mock_config ) - mock_process_new_931_file.assert_not_called() + mock_process_new_931_file_task.assert_not_called() # Test error handling for export control flag - mock_process_new_931_file.reset_mock() + mock_process_new_931_file_task.reset_mock() with pytest.raises(ValueError): dispatcher( file_path=test_file_path, is_export_control=True, config=mock_config ) - mock_process_new_931_file.assert_not_called() + mock_process_new_931_file_task.assert_not_called() diff --git a/orchestration/flows/bl931/dispatcher.py b/orchestration/flows/bl931/dispatcher.py index a478caa0..e5d476ca 100644 --- a/orchestration/flows/bl931/dispatcher.py +++ b/orchestration/flows/bl931/dispatcher.py @@ -3,7 +3,7 @@ from typing import Optional, Union, Any from orchestration.flows.bl931.config import Config931 -from orchestration.flows.bl931.move import process_new_931_file +from orchestration.flows.bl931.move import process_new_931_file_task logger = logging.getLogger(__name__) @@ -44,7 +44,7 @@ def dispatcher( config = Config931() try: - process_new_931_file( + process_new_931_file_task( file_path=file_path, config=config ) @@ -52,7 +52,3 @@ def dispatcher( except Exception as e: logger.error(f"Error during processing in dispatcher flow: {e}") raise - - -if __name__ == "__main__": - dispatcher() diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index b8c7e355..f6f61cbe 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -2,7 +2,7 @@ import logging from typing import Optional -from prefect import flow +from prefect import flow, task # from prefect.blocks.system import JSON from orchestration.flows.bl931.config import Config931 @@ -94,7 +94,7 @@ def prune( # @staticmethod -@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{{ relative_path | basename }}") +@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{relative_path}") def _prune_globus_endpoint( relative_path: str, source_endpoint: GlobusEndpoint, @@ -132,7 +132,18 @@ def _prune_globus_endpoint( @flow(name="new_931_file_flow", flow_run_name="process_new-{file_path}") -def process_new_931_file( +def process_new_931_file_flow( + file_path: str, + config: Optional[Config931] = None +) -> None: + process_new_931_file_task( + file_path=file_path, + config=config + ) + + +@task(name="new_931_file_task") +def process_new_931_file_task( file_path: str, config: Optional[Config931] = None ) -> None: @@ -206,10 +217,3 @@ def move_931_flight_check( logger.info("931 flight check: transfer successful") else: logger.error("931 flight check: transfer failed") - - -if __name__ == "__main__": - # Example usage - config = Config931() - file_path = "test_directory/" - process_new_931_file(file_path, config) diff --git a/orchestration/flows/bl931/prefect.yaml b/orchestration/flows/bl931/prefect.yaml index b97a3e75..50980fe5 100644 --- a/orchestration/flows/bl931/prefect.yaml +++ b/orchestration/flows/bl931/prefect.yaml @@ -1,8 +1,8 @@ name: bl931 prefect-version: 3.4.2 deployments: -- name: new_file_931 - entrypoint: orchestration/flows/bl931/move.py:process_new_931_file +- name: new_file_931_flow + entrypoint: orchestration/flows/bl931/move.py:process_new_931_file_flow work_pool: name: new_file_931_pool work_queue_name: new_file_931_queue From 8948b1d4a5cd671f1602b22e25413fcfa2a45cfc Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 3 Dec 2025 13:09:10 -0800 Subject: [PATCH 08/28] Adjusting endpoint names in the test movement flow --- orchestration/flows/bl931/move.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index f6f61cbe..99fb687c 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -210,8 +210,8 @@ def move_931_flight_check( success = transfer_controller.copy( file_path=file_path, - source=config.data931_raw, - destination=config.nersc931_alsdev_raw + source=config.bl931_compute_dtn, + destination=config.bl931_nersc_alsdev_raw ) if success is True: logger.info("931 flight check: transfer successful") From 560259aff0f36bcf8d92d9ee17833d0a6515e492 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 3 Dec 2025 13:13:41 -0800 Subject: [PATCH 09/28] Ensuring get_run_logger() is set in the flight check --- orchestration/flows/bl931/move.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 99fb687c..41a2704d 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -2,7 +2,7 @@ import logging from typing import Optional -from prefect import flow, task +from prefect import flow, get_run_logger, task # from prefect.blocks.system import JSON from orchestration.flows.bl931.config import Config931 @@ -199,6 +199,7 @@ def move_931_flight_check( file_path: str = "test_directory/test.txt", ): """Please keep your arms and legs inside the vehicle at all times.""" + logger = get_run_logger() logger.info("931 flight check: testing transfer from data931 to NERSC CFS") config = Config931() From d395dcd9a9b09a8f49e12d0e0bf7b8f33f0bc287 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 3 Dec 2025 13:21:43 -0800 Subject: [PATCH 10/28] if the transfer test fails, it should throw a runtime error --- orchestration/flows/bl931/move.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 41a2704d..2d6dc220 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -218,3 +218,4 @@ def move_931_flight_check( logger.info("931 flight check: transfer successful") else: logger.error("931 flight check: transfer failed") + raise RuntimeError("931 flight check: transfer failed") From ffcb3b44f49c8d7fe43b8f18cb41dcf91a3d3c1a Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 3 Dec 2025 16:06:24 -0800 Subject: [PATCH 11/28] updating init_work_pools to handle when beamline id is a number (e.g. 733 -> bl733 is the folder name) or a word (dichroism -> dichroism is the folder name) --- init_work_pools.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/init_work_pools.py b/init_work_pools.py index de150b67..f65aa3ea 100644 --- a/init_work_pools.py +++ b/init_work_pools.py @@ -49,10 +49,15 @@ def check_env() -> tuple[str, str, str]: """Validate required environment variables and paths.""" beamline = os.environ.get("BEAMLINE") if not beamline: - logger.error("Must set BEAMLINE (e.g., 832, 733)") + logger.error("Must set BEAMLINE (e.g., 832, 733, dichroism)") sys.exit(1) - prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml" + # Check if the beamline identifier is a number or a string to get the correct flows folder name + if beamline.isdigit(): + prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml" + else: + prefect_yaml = f"orchestration/flows/{beamline}/prefect.yaml" + if not os.path.isfile(prefect_yaml): logger.error(f"[Init:{beamline}] Expected {prefect_yaml} not found!") sys.exit(1) From 0058581173178ef626428b367734700612509a15 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 8 Dec 2025 11:11:34 -0800 Subject: [PATCH 12/28] adjusting the test directory path for 931 --- orchestration/flows/bl931/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 2d6dc220..660b3aab 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -196,7 +196,7 @@ def process_new_931_file_task( @flow(name="move_931_flight_check", flow_run_name="move_931_flight_check-{file_path}") def move_931_flight_check( - file_path: str = "test_directory/test.txt", + file_path: str = "test/", ): """Please keep your arms and legs inside the vehicle at all times.""" logger = get_run_logger() From 965df4200fd60d9aff5804f150b8cf72924cc482 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 8 Dec 2025 11:31:37 -0800 Subject: [PATCH 13/28] Pinning fastapi==0.116.1; breaks with fastapi==0.124.0 --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 3ed219ca..9254d57d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ authlib globus-compute-sdk @ git+https://github.com/globus/globus-compute.git@d1731340074be56861ec91d732bdff44f8e2b46e#subdirectory=compute_sdk +fastapi==0.116.1 globus-sdk>=3.0 griffe>=0.49.0,<2.0.0 h5py From 4832eb8125520bfddbbb9c645db18d950ba5ada4 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 15:00:37 -0800 Subject: [PATCH 14/28] Switching JSON Blocks to Variable Blocks --- orchestration/_tests/test_bl931/test_move.py | 40 +++++++++++++------- orchestration/flows/bl931/move.py | 14 +++---- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/orchestration/_tests/test_bl931/test_move.py b/orchestration/_tests/test_bl931/test_move.py index 240b1213..6dbb1898 100644 --- a/orchestration/_tests/test_bl931/test_move.py +++ b/orchestration/_tests/test_bl931/test_move.py @@ -5,7 +5,8 @@ from uuid import uuid4 from prefect.testing.utilities import prefect_test_harness -from prefect.blocks.system import Secret, JSON +from prefect.blocks.system import Secret +from prefect.variables import Variable from pytest_mock import MockFixture from orchestration._tests.test_transfer_controller import MockSecret @@ -32,8 +33,21 @@ def prefect_test_fixture(): globus_client_secret = Secret(value=str(uuid4())) globus_client_secret.save(name="globus-client-secret", overwrite=True) - pruning_config = JSON(value={"max_wait_seconds": 600}) - pruning_config.save(name="pruning-config", overwrite=True) + Variable.set( + name="globus-settings", + value={"max_wait_seconds": 600}, + overwrite=True, + _sync=True + ) + + Variable.set( + name="bl931-settings", + value={ + "delete_data931_files_after_days": 180 + }, + overwrite=True, + _sync=True + ) yield @@ -58,11 +72,11 @@ def test_process_new_931_file_task(mocker: MockFixture) -> None: from orchestration.flows.bl931.move import process_new_931_file_task # Patch the Secret.load and init_transfer_client in the configuration context. - with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): - mocker.patch( - "orchestration.flows.bl931.config.transfer.init_transfer_client", - return_value=mocker.MagicMock() # Return a dummy TransferClient - ) + mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()) + mocker.patch( + "orchestration.flows.bl931.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a dummy TransferClient + ) # Patch the schedule_prefect_flow call to avoid real Prefect interaction mocker.patch( "orchestration.flows.bl931.move.schedule_prefect_flow", @@ -131,11 +145,11 @@ class MockConfig: test_file_path = f"/tmp/test_file_{uuid4()}.txt" # Patch the schedule_prefect_flow call to avoid real Prefect interaction - with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): - mocker.patch( - "orchestration.flows.bl931.config.transfer.init_transfer_client", - return_value=mocker.MagicMock() # Return a dummy TransferClient - ) + mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()) + mocker.patch( + "orchestration.flows.bl931.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a dummy TransferClient + ) # Patch the schedule_prefect_flow call to avoid real Prefect interaction mocker.patch( "orchestration.flows.bl931.move.schedule_prefect_flow", diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 660b3aab..3231457e 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -3,8 +3,7 @@ from typing import Optional from prefect import flow, get_run_logger, task -# from prefect.blocks.system import JSON - +from prefect.variables import Variable from orchestration.flows.bl931.config import Config931 from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe from orchestration.prefect import schedule_prefect_flow @@ -116,7 +115,8 @@ def _prune_globus_endpoint( # globus_settings = JSON.load("globus-settings").value # max_wait_seconds = globus_settings["max_wait_seconds"] - max_wait_seconds = 600 + + max_wait_seconds = Variable.get("globus-settings")["max_wait_seconds"] flow_name = f"prune_from_{source_endpoint.name}" logger.info(f"Running flow: {flow_name}") logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") @@ -174,17 +174,15 @@ def process_new_931_file_task( destination=config.bl931_nersc_alsdev_raw ) - # TODO: Ingest file path in SciCat - # Waiting for PR #62 to be merged (scicat_controller) - - # Schedule pruning from QNAP # Waiting for PR #62 to be merged (prune_controller) # TODO: Determine scheduling days_from_now based on beamline needs + + bl931_settings = Variable.get("bl931-settings") prune( file_path=file_path, source_endpoint=config.bl931_compute_dtn, check_endpoint=config.bl931_nersc_alsdev_raw, - days_from_now=180.0 # determine appropriate value: currently 6 months + days_from_now=bl931_settings.get("delete_data931_files_after_days", 180) ) # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? From d925c8947c5694adc50223328b9134ea7afa6b61 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 23 Dec 2025 11:43:03 -0800 Subject: [PATCH 15/28] Adding flow diagram to 9.3.1 documentation --- docs/mkdocs/docs/bl931.md | 66 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/docs/mkdocs/docs/bl931.md b/docs/mkdocs/docs/bl931.md index 4e300e5a..6838b3d0 100644 --- a/docs/mkdocs/docs/bl931.md +++ b/docs/mkdocs/docs/bl931.md @@ -35,4 +35,68 @@ Flow to process a new file at BL 9.3.1 The computing backend runs on a VM in the B15 server room that is managed by ALS IT staff. -`flow-931.als.lbl.gov` \ No newline at end of file +`flow-931.als.lbl.gov` + +## Flow Diagram + +```mermaid +sequenceDiagram + participant DET as Detector/
File Watcher + participant DISP as Prefect
Dispatcher + participant DTN as compute-dtn
Storage + participant GLOB as Globus
Transfer + participant CFS as NERSC
CFS + participant CAT as SciCat
Metadata + participant HPSS as NERSC
HPSS + + %% Initial Trigger + DET->>DET: Monitor filesystem + DET->>DISP: Trigger on new file + DISP->>DISP: Coordinate flows + + %% Flow 1: new_file_931 + rect rgb(220, 230, 255) + note over DISP,CAT: FLOW 1: new_file_931 + DISP->>GLOB: Init transfer + activate GLOB + GLOB->>DTN: Read from compute-dtn + DTN-->>GLOB: Data + GLOB->>CFS: Write to NERSC CFS + GLOB-->>DISP: Transfer complete + deactivate GLOB + + DISP->>CAT: Register metadata (TBD) + end + + %% Flow 2: HPSS Archive + rect rgb(220, 255, 230) + note over DISP,HPSS: FLOW 2: HPSS Archive (TBD) + DISP->>GLOB: Init archive transfer + activate GLOB + GLOB->>CFS: Read from CFS + CFS-->>GLOB: Data + GLOB->>HPSS: Write to tape + GLOB-->>DISP: Archive complete + deactivate GLOB + + DISP->>CAT: Update metadata (TBD) + end + + %% Flow 3: Scheduled Pruning + rect rgb(255, 255, 220) + note over DISP,CFS: FLOW 3: Scheduled Pruning + DISP->>DISP: Schedule prune tasks + + DISP->>DTN: Prune from compute-dtn + activate DTN + DTN->>DTN: Delete expired data + DTN-->>DISP: Pruning complete + deactivate DTN + + DISP->>CFS: Prune from CFS (after HPSS) + activate CFS + CFS->>CFS: Delete expired data + CFS-->>DISP: Pruning complete + deactivate CFS + end +``` \ No newline at end of file From 42365170f490a6cbfc73167853c6471085a33a5b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 09:59:56 -0800 Subject: [PATCH 16/28] Updating flight scheck to move globus_test/test.txt --- orchestration/flows/bl931/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 3231457e..8c3def4d 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -194,7 +194,7 @@ def process_new_931_file_task( @flow(name="move_931_flight_check", flow_run_name="move_931_flight_check-{file_path}") def move_931_flight_check( - file_path: str = "test/", + file_path: str = "globus_test/test.txt", ): """Please keep your arms and legs inside the vehicle at all times.""" logger = get_run_logger() From fc437c6b3a0ac3149a2a6344f95febeef86c316f Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 10:22:58 -0800 Subject: [PATCH 17/28] removing commented out code --- orchestration/flows/bl931/move.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 8c3def4d..f268a079 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -34,6 +34,8 @@ def prune( Returns: bool: True if pruning was successful or scheduled successfully, False otherwise """ + logger = get_run_logger() + if not file_path: logger.error("No file_path provided for pruning operation") return False @@ -48,10 +50,6 @@ def prune( if days_from_now < 0: raise ValueError(f"Invalid days_from_now: {days_from_now}") - # JSON blocks are deprecated, we should use what they recommend in the docs - # globus_settings = JSON.load("globus-settings").value - # max_wait_seconds = globus_settings["max_wait_seconds"] - logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'") # convert float days → timedelta From c1689fcacfa602e5792336f0f6ec8d118f65a513 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 10:26:22 -0800 Subject: [PATCH 18/28] adding _sync=True to Variable.get() call and added missing flow_run_name from schedule_prefect_flow --- orchestration/flows/bl931/move.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index f268a079..5b6234b5 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -72,6 +72,7 @@ def prune( try: schedule_prefect_flow( deployment_name="prune_globus_endpoint/prune_globus_endpoint", + flow_run_name=f"prune_globus-{source_endpoint.name}-{file_path}", parameters={ "relative_path": file_path, "source_endpoint": source_endpoint, @@ -106,15 +107,16 @@ def _prune_globus_endpoint( check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning config (BeamlineConfig): Configuration object with transfer client """ + logger = get_run_logger() + logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'") if not config: config = Config931() - # globus_settings = JSON.load("globus-settings").value - # max_wait_seconds = globus_settings["max_wait_seconds"] + globus_settings = Variable.get("globus-settings", _sync=True) + max_wait_seconds = globus_settings["max_wait_seconds"] - max_wait_seconds = Variable.get("globus-settings")["max_wait_seconds"] flow_name = f"prune_from_{source_endpoint.name}" logger.info(f"Running flow: {flow_name}") logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") From ccaad7450bf8af6afaa9f080052dce9e082c9412 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 10:28:54 -0800 Subject: [PATCH 19/28] Adding verbose logging to process_new_931_file_task --- orchestration/flows/bl931/move.py | 50 +++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 5b6234b5..59abc9ce 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -157,33 +157,59 @@ def process_new_931_file_task( :param file_path: Path to the new file to be processed. :param config: Configuration settings for processing. """ + logger = get_run_logger() + + if not file_path: + logger.error("No file_path provided") + raise ValueError("No file_path provided") logger.info(f"Processing new 931 file: {file_path}") if not config: + logger.info("No config provided, creating default Config931") config = Config931() + logger.info("Initializing Globus transfer controller") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, config=config ) - transfer_controller.copy( - file_path=file_path, - source=config.bl931_compute_dtn, - destination=config.bl931_nersc_alsdev_raw - ) + logger.info(f"Step 1: Copying {file_path} from data931 ({config.bl931_compute_dtn.name}) " + f"to NERSC CFS ({config.bl931_nersc_alsdev_raw.name})") + + try: + transfer_controller.copy( + file_path=file_path, + source=config.bl931_compute_dtn, + destination=config.bl931_nersc_alsdev_raw + ) + logger.info("Step 1 complete: File copied to NERSC CFS") + except Exception as e: + logger.error(f"Step 1 failed: Could not copy file to NERSC CFS: {e}", exc_info=True) + raise # Waiting for PR #62 to be merged (prune_controller) # TODO: Determine scheduling days_from_now based on beamline needs - bl931_settings = Variable.get("bl931-settings") - prune( - file_path=file_path, - source_endpoint=config.bl931_compute_dtn, - check_endpoint=config.bl931_nersc_alsdev_raw, - days_from_now=bl931_settings.get("delete_data931_files_after_days", 180) - ) + logger.info("Step 2: Scheduling pruning from data931") + try: + bl931_settings = Variable.get("bl931-settings", _sync=True) + days_from_now = bl931_settings.get("delete_data931_files_after_days", 180) + logger.info(f"Pruning scheduled for {days_from_now} days from now") + + prune( + file_path=file_path, + source_endpoint=config.bl931_compute_dtn, + check_endpoint=config.bl931_nersc_alsdev_raw, + days_from_now=days_from_now + ) + logger.info("Step 2 complete: Pruning scheduled") + except Exception as e: + logger.error(f"Step 2 failed: Could not schedule pruning: {e}", exc_info=True) + raise + + logger.info(f"All steps complete for file: {file_path}") # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? # Waiting for PR #62 to be merged (transfer_controller) From 79f280fe8cd0fa78e7a9d355c44dee43af55a97d Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 10:46:02 -0800 Subject: [PATCH 20/28] Adding transfer_success RuntimeError if False --- orchestration/flows/bl931/move.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 59abc9ce..038f48ee 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -179,11 +179,15 @@ def process_new_931_file_task( f"to NERSC CFS ({config.bl931_nersc_alsdev_raw.name})") try: - transfer_controller.copy( + transfer_success = transfer_controller.copy( file_path=file_path, source=config.bl931_compute_dtn, destination=config.bl931_nersc_alsdev_raw ) + if not transfer_success: + logger.error("Step 1 failed: Transfer was not successful") + raise RuntimeError("Transfer failed") + logger.info("Step 1 complete: File copied to NERSC CFS") except Exception as e: logger.error(f"Step 1 failed: Could not copy file to NERSC CFS: {e}", exc_info=True) From 4b24dcba9606e587388d3dfcccf523ff2f02cb91 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 10:51:23 -0800 Subject: [PATCH 21/28] Fixing the deployment name when scheduling prune_globus_endpoint --- orchestration/flows/bl931/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 038f48ee..9b046da1 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -71,7 +71,7 @@ def prune( try: schedule_prefect_flow( - deployment_name="prune_globus_endpoint/prune_globus_endpoint", + deployment_name="prune_globus_endpoint/prune_data931", flow_run_name=f"prune_globus-{source_endpoint.name}-{file_path}", parameters={ "relative_path": file_path, From 8fa493962db173de9be9d4d0a7421d1ee339a5a6 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 11:43:10 -0800 Subject: [PATCH 22/28] Adding beegfs endpoint for 931 --- config.yml | 6 ++++++ orchestration/flows/bl931/config.py | 1 + 2 files changed, 7 insertions(+) diff --git a/config.yml b/config.yml index af5548b6..e0ed2654 100644 --- a/config.yml +++ b/config.yml @@ -54,6 +54,12 @@ globus: name: bl733-lamarr-beamlines # 9.3.1 ENDPOINTS + bl931-beegfs-data: + root_path: /beamline_staging/bl931/raw/ + uri: beegfs.als.lbl.gov + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl931-beegfs-data + bl931-compute-dtn: root_path: / uri: compute-dtn.als.lbl.gov diff --git a/orchestration/flows/bl931/config.py b/orchestration/flows/bl931/config.py index 47370cad..23b7686c 100644 --- a/orchestration/flows/bl931/config.py +++ b/orchestration/flows/bl931/config.py @@ -11,3 +11,4 @@ def __init__(self) -> None: self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) self.bl931_compute_dtn = self.endpoints["bl931-compute-dtn"] self.bl931_nersc_alsdev_raw = self.endpoints["bl931-nersc_alsdev_raw"] + self.bl931_beegfs = self.endpoints["bl931-beegfs-data"] From b596ea06b1c2f9ba62e10aa517041541018189ae Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 11:43:36 -0800 Subject: [PATCH 23/28] Adding placeholder for transfer to beegfs --- orchestration/flows/bl931/move.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 9b046da1..373a5a17 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -175,28 +175,41 @@ def process_new_931_file_task( config=config ) - logger.info(f"Step 1: Copying {file_path} from data931 ({config.bl931_compute_dtn.name}) " + # logger.info(f"Step 1: Copying {file_path} from data931 to beegfs ({config.bl931_beegfs.name})") + + # beegfs_transfer_success = transfer_controller.copy( + # file_path=file_path, + # source=config.bl931_compute_dtn, + # destination=config.bl931_beegfs + # ) + # if not beegfs_transfer_success: + # logger.error("Step 1 failed: Beegfs transfer was not successful") + # raise Warning("Beegfs transfer failed") + # else: + # logger.info("Step 1 complete: File copied to beegfs") + + logger.info(f"Step 2: Copying {file_path} from data931 ({config.bl931_compute_dtn.name}) " f"to NERSC CFS ({config.bl931_nersc_alsdev_raw.name})") try: - transfer_success = transfer_controller.copy( + nersc_transfer_success = transfer_controller.copy( file_path=file_path, source=config.bl931_compute_dtn, destination=config.bl931_nersc_alsdev_raw ) - if not transfer_success: - logger.error("Step 1 failed: Transfer was not successful") + if not nersc_transfer_success: + logger.error("Step 2 failed: NERSC transfer was not successful") raise RuntimeError("Transfer failed") - logger.info("Step 1 complete: File copied to NERSC CFS") + logger.info("Step 2 complete: File copied to NERSC CFS") except Exception as e: - logger.error(f"Step 1 failed: Could not copy file to NERSC CFS: {e}", exc_info=True) + logger.error(f"Step 2 failed: Could not copy file to NERSC CFS: {e}", exc_info=True) raise # Waiting for PR #62 to be merged (prune_controller) # TODO: Determine scheduling days_from_now based on beamline needs - logger.info("Step 2: Scheduling pruning from data931") + logger.info("Step 3: Scheduling pruning from data931") try: bl931_settings = Variable.get("bl931-settings", _sync=True) days_from_now = bl931_settings.get("delete_data931_files_after_days", 180) @@ -208,9 +221,9 @@ def process_new_931_file_task( check_endpoint=config.bl931_nersc_alsdev_raw, days_from_now=days_from_now ) - logger.info("Step 2 complete: Pruning scheduled") + logger.info("Step 3 complete: Pruning scheduled") except Exception as e: - logger.error(f"Step 2 failed: Could not schedule pruning: {e}", exc_info=True) + logger.error(f"Step 3 failed: Could not schedule pruning: {e}", exc_info=True) raise logger.info(f"All steps complete for file: {file_path}") From 1f6d97609eebe03e83ab0ba427e0dcd27c6ca1a0 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 11:59:26 -0800 Subject: [PATCH 24/28] reorganizing config.yml to be in numerical order by beamline number --- config.yml | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/config.yml b/config.yml index e0ed2654..ab350b04 100644 --- a/config.yml +++ b/config.yml @@ -52,25 +52,6 @@ globus: uri: lamarr.als.lbl.gov uuid: aee983fc-826e-4081-bfb2-62529970540d name: bl733-lamarr-beamlines - # 9.3.1 ENDPOINTS - - bl931-beegfs-data: - root_path: /beamline_staging/bl931/raw/ - uri: beegfs.als.lbl.gov - uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a - name: bl931-beegfs-data - - bl931-compute-dtn: - root_path: / - uri: compute-dtn.als.lbl.gov - uuid: 23af478e-d459-4e78-9753-5091b5fb432a - name: bl931-compute-dtn - - bl931-nersc_alsdev_raw: - root_path: /global/cfs/cdirs/als/data_mover/9.3.1/raw - uri: nersc.gov - uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 - name: bl931-nersc_alsdev_raw # 8.3.2 ENDPOINTS @@ -164,6 +145,26 @@ globus: uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 name: nersc832 + # 9.3.1 ENDPOINTS + + bl931-beegfs-data: + root_path: /beamline_staging/bl931/raw/ + uri: beegfs.als.lbl.gov + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl931-beegfs-data + + bl931-compute-dtn: + root_path: / + uri: compute-dtn.als.lbl.gov + uuid: 23af478e-d459-4e78-9753-5091b5fb432a + name: bl931-compute-dtn + + bl931-nersc_alsdev_raw: + root_path: /global/cfs/cdirs/als/data_mover/9.3.1/raw + uri: nersc.gov + uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 + name: bl931-nersc_alsdev_raw + globus_apps: als_transfer: client_id: ${GLOBUS_CLIENT_ID} From 11a185e449e14d68fc0cfccddc10c6024fb78be2 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 12:03:48 -0800 Subject: [PATCH 25/28] Adding placeholder for 9.3.1 scicat ingestion --- orchestration/flows/bl931/move.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index 373a5a17..c60dd94a 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -1,10 +1,13 @@ import datetime import logging +# from pathlib import Path from typing import Optional from prefect import flow, get_run_logger, task from prefect.variables import Variable + from orchestration.flows.bl931.config import Config931 +# from orchestration.flows.scicat.ingest import scicat_ingest_flow from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe from orchestration.prefect import schedule_prefect_flow from orchestration.transfer_controller import CopyMethod, get_transfer_controller @@ -234,6 +237,27 @@ def process_new_931_file_task( # TODO: Ingest file path in SciCat # Waiting for PR #62 to be merged (scicat_controller) + # logger.info(f"Step 3: Ingesting {file_path} into SciCat") + + # # Build beegfs path for SciCat ingestion + # # Get relative path from source root + # try: + # rel_path = str(Path(file_path).relative_to(config.data733_raw.root_path)) + # except ValueError: + # # Already a relative path + # rel_path = file_path.lstrip("/") + + # # Build full beegfs path + # beegfs_path = "/global/" + config.beegfs931.root_path.strip("/") + "/" + rel_path + + # logger.info(f"Beegfs path: {beegfs_path}") + # try: + # scicat_ingest_flow(file_path=beegfs_path, ingester_spec="als931_ingester") + # logger.info("Step 3 complete: SciCat ingest successful") + # except Exception as e: + # logger.error(f"SciCat ingest failed with {e}") + + @flow(name="move_931_flight_check", flow_run_name="move_931_flight_check-{file_path}") def move_931_flight_check( From 87b11dc73cf65f6587fcf6d5368fe569da48ae33 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 13:05:43 -0800 Subject: [PATCH 26/28] removing config from the schedule_prefect_flow call due to serialization --- orchestration/flows/bl931/move.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index c60dd94a..af8f7e29 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -80,7 +80,6 @@ def prune( "relative_path": file_path, "source_endpoint": source_endpoint, "check_endpoint": check_endpoint, - "config": config }, duration_from_now=delay, ) @@ -258,7 +257,6 @@ def process_new_931_file_task( # logger.error(f"SciCat ingest failed with {e}") - @flow(name="move_931_flight_check", flow_run_name="move_931_flight_check-{file_path}") def move_931_flight_check( file_path: str = "globus_test/test.txt", From ac14b71c62a0bc336fc38b6e83bb7c135d010415 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 13:45:11 -0800 Subject: [PATCH 27/28] copy data to beegfs as part of the flow --- orchestration/flows/bl931/move.py | 75 +++++++++++++++---------------- 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py index af8f7e29..5398eec4 100644 --- a/orchestration/flows/bl931/move.py +++ b/orchestration/flows/bl931/move.py @@ -1,13 +1,13 @@ import datetime import logging -# from pathlib import Path +from pathlib import Path from typing import Optional from prefect import flow, get_run_logger, task from prefect.variables import Variable from orchestration.flows.bl931.config import Config931 -# from orchestration.flows.scicat.ingest import scicat_ingest_flow +from orchestration.flows.scicat.ingest import scicat_ingest_flow from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe from orchestration.prefect import schedule_prefect_flow from orchestration.transfer_controller import CopyMethod, get_transfer_controller @@ -177,18 +177,18 @@ def process_new_931_file_task( config=config ) - # logger.info(f"Step 1: Copying {file_path} from data931 to beegfs ({config.bl931_beegfs.name})") + logger.info(f"Step 1: Copying {file_path} from data931 to beegfs ({config.bl931_beegfs.name})") - # beegfs_transfer_success = transfer_controller.copy( - # file_path=file_path, - # source=config.bl931_compute_dtn, - # destination=config.bl931_beegfs - # ) - # if not beegfs_transfer_success: - # logger.error("Step 1 failed: Beegfs transfer was not successful") - # raise Warning("Beegfs transfer failed") - # else: - # logger.info("Step 1 complete: File copied to beegfs") + beegfs_transfer_success = transfer_controller.copy( + file_path=file_path, + source=config.bl931_compute_dtn, + destination=config.bl931_beegfs + ) + if not beegfs_transfer_success: + logger.error("Step 1 failed: Beegfs transfer was not successful") + raise Warning("Beegfs transfer failed") + else: + logger.info("Step 1 complete: File copied to beegfs") logger.info(f"Step 2: Copying {file_path} from data931 ({config.bl931_compute_dtn.name}) " f"to NERSC CFS ({config.bl931_nersc_alsdev_raw.name})") @@ -208,10 +208,30 @@ def process_new_931_file_task( logger.error(f"Step 2 failed: Could not copy file to NERSC CFS: {e}", exc_info=True) raise + # logger.info(f"Step 3: Ingesting {file_path} into SciCat") + + # Build beegfs path for SciCat ingestion + # Get relative path from source root + # try: + # rel_path = str(Path(file_path).relative_to(config.bl931_compute_dtn.root_path)) + # except ValueError: + # # Already a relative path + # rel_path = file_path.lstrip("/") + + # # Build full beegfs path + # beegfs_path = "/global/" + config.bl931_beegfs.root_path.strip("/") + "/" + rel_path + + # logger.info(f"Beegfs path: {beegfs_path}") + # try: + # scicat_ingest_flow(dataset_path=beegfs_path, ingester_spec="als931_ingester") + # logger.info("Step 3 complete: SciCat ingest successful") + # except Exception as e: + # logger.error(f"SciCat ingest failed with {e}") + # Waiting for PR #62 to be merged (prune_controller) # TODO: Determine scheduling days_from_now based on beamline needs - logger.info("Step 3: Scheduling pruning from data931") + logger.info("Step 4: Scheduling pruning from data931") try: bl931_settings = Variable.get("bl931-settings", _sync=True) days_from_now = bl931_settings.get("delete_data931_files_after_days", 180) @@ -223,9 +243,9 @@ def process_new_931_file_task( check_endpoint=config.bl931_nersc_alsdev_raw, days_from_now=days_from_now ) - logger.info("Step 3 complete: Pruning scheduled") + logger.info("Step 4 complete: Pruning scheduled") except Exception as e: - logger.error(f"Step 3 failed: Could not schedule pruning: {e}", exc_info=True) + logger.error(f"Step 4 failed: Could not schedule pruning: {e}", exc_info=True) raise logger.info(f"All steps complete for file: {file_path}") @@ -233,29 +253,6 @@ def process_new_931_file_task( # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? # Waiting for PR #62 to be merged (transfer_controller) - # TODO: Ingest file path in SciCat - # Waiting for PR #62 to be merged (scicat_controller) - - # logger.info(f"Step 3: Ingesting {file_path} into SciCat") - - # # Build beegfs path for SciCat ingestion - # # Get relative path from source root - # try: - # rel_path = str(Path(file_path).relative_to(config.data733_raw.root_path)) - # except ValueError: - # # Already a relative path - # rel_path = file_path.lstrip("/") - - # # Build full beegfs path - # beegfs_path = "/global/" + config.beegfs931.root_path.strip("/") + "/" + rel_path - - # logger.info(f"Beegfs path: {beegfs_path}") - # try: - # scicat_ingest_flow(file_path=beegfs_path, ingester_spec="als931_ingester") - # logger.info("Step 3 complete: SciCat ingest successful") - # except Exception as e: - # logger.error(f"SciCat ingest failed with {e}") - @flow(name="move_931_flight_check", flow_run_name="move_931_flight_check-{file_path}") def move_931_flight_check( From a2590c464a787afbe2719673b93df975d94bef64 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 14 Jan 2026 14:02:00 -0800 Subject: [PATCH 28/28] Updating 931 pytest to check for two transfers (1 to beegfs, another to NERSC) --- orchestration/_tests/test_bl931/test_move.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestration/_tests/test_bl931/test_move.py b/orchestration/_tests/test_bl931/test_move.py index 6dbb1898..72be36ac 100644 --- a/orchestration/_tests/test_bl931/test_move.py +++ b/orchestration/_tests/test_bl931/test_move.py @@ -108,7 +108,7 @@ def test_process_new_931_file_task(mocker: MockFixture) -> None: result = process_new_931_file_task(file_path=test_file_path, config=mock_config) # Verify that the transfer controller's copy method was called exactly once. - assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice" assert result is None, "The flow should return None" assert mock_prune.call_count == 1, "Prune function should be called exactly once" @@ -117,7 +117,7 @@ def test_process_new_931_file_task(mocker: MockFixture) -> None: mock_prune.reset_mock() result = process_new_931_file_task(file_path=test_file_path, config=None) - assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice" assert result is None, "The flow should return None" assert mock_prune.call_count == 1, "Prune function should be called exactly once"