From 6d62aaef6623bebf8eabcbd625d7bae7a42b593c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 7 Feb 2025 15:42:08 -0800 Subject: [PATCH 1/2] Initial commit for bl7011 COSMIC Scattering flows. Inludes a new orchestration/flows/bl7011 directory containing a Congig7011() class, a move.py script (similar to the new 733 PR), globus endpoint configs, and a pytest case for move.py --- config.yml | 55 +++++++-- orchestration/_tests/test_globus_flow.py | 146 +++++++++++++++++------ orchestration/flows/bl7011/__init__.py | 0 orchestration/flows/bl7011/config.py | 13 ++ orchestration/flows/bl7011/move.py | 30 +++++ 5 files changed, 198 insertions(+), 46 deletions(-) create mode 100644 orchestration/flows/bl7011/__init__.py create mode 100644 orchestration/flows/bl7011/config.py create mode 100644 orchestration/flows/bl7011/move.py diff --git a/config.yml b/config.yml index 3624ef3e..94836b3e 100644 --- a/config.yml +++ b/config.yml @@ -1,5 +1,48 @@ globus: globus_endpoints: + + # 7.0.1.1 ENDPOINTS + + nersc7011_alsdev: + root_path: /global/cfs/cdirs/als/data_mover/7.0.1.1/ + uri: nersc.gov + uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 + name: nersc7011_alsdev + + nersc7011_alsdev_raw: + root_path: /global/cfs/cdirs/als/data_mover/7.0.1.1/raw + uri: nersc.gov + uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 + name: nersc7011_alsdev_raw + + data7011: + root_path: / + uri: data7011.lbl.gov + uuid: tbd + name: data7011 + + data7011_raw: + root_path: /data/raw + uri: data7011.lbl.gov + uuid: tbd + name: data7011_raw + + # 7.0.1.2 ENDPOINTS + + nersc7012: + root_path: /global/cfs/cdirs/als/gsharing/data_mover/7012 + uri: nersc.gov + uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 + name: nersc7012 + + data7012: + root_path: / + uri: hpc.lbl.gov + uuid: 639c49be-604f-423c-9c5d-82a53afe1bf1 + name: data7012 + + # 8.3.2 ENDPOINTS + spot832: root_path: / uri: spot832.lbl.gov @@ -90,18 +133,6 @@ globus: uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 name: nersc832 - nersc7012: - root_path: /global/cfs/cdirs/als/gsharing/data_mover/7012 - uri: nersc.gov - uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 - name: nersc7012 - - data7012: - root_path: / - uri: hpc.lbl.gov - uuid: 639c49be-604f-423c-9c5d-82a53afe1bf1 - name: data7012 - globus_apps: als_transfer: client_id: ${GLOBUS_CLIENT_ID} diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index dd411238..cf40edc7 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -51,6 +51,117 @@ def __init__(self, root_path, uuid_value=None): self.uri = f"mock_endpoint_uri_{self.uuid}" +# Mock the Client class to avoid real network calls +class MockGlobusComputeClient: + def __init__(self, *args, **kwargs): + # No real initialization, as this is a mock + pass + + def version_check(self): + # Mock version check to do nothing + pass + + def run(self, *args, **kwargs): + # Return a mock task ID + return "mock_task_id" + + def get_task(self, task_id): + # Return a mock task response + return { + "pending": False, + "status": "success", + "result": "mock_result" + } + + def get_result(self, task_id): + # Return a mock result + return "mock_result" + + +class MockSecret: + value = str(uuid4()) + + +# ---------------------------- +# Tests for 7011 +# ---------------------------- + +class MockConfig7011: + def __init__(self) -> None: + """ + Dummy configuration for 7011 flows. + """ + # Create mock endpoints + self.endpoints = { + "data7011_raw": MockEndpoint(root_path="mock_data7011_raw_path", uuid_value=str(uuid4())), + "nersc7011_alsdev_raw": MockEndpoint(root_path="mock_nersc7011_alsdev_raw_path", uuid_value=str(uuid4())), + } + + # Define mock apps + self.apps = { + "als_transfer": "mock_als_transfer_app" + } + + # Use the mock transfer client instead of the real TransferClient + self.tc = MockTransferClient() + + # Set attributes for easy access + self.data7011_raw = self.endpoints["data7011_raw"] + self.nersc7011_alsdev_raw = self.endpoints["nersc7011_alsdev_raw"] + + +def test_process_new_7011_file(mocker: MockFixture) -> None: + """ + Test the process_new_7011_file flow from orchestration.flows.bl7011.move. + + This test verifies that: + - The get_transfer_controller function is called (patched) with the correct parameters. + - The returned transfer controller's copy method is called with the expected file path, + source, and destination endpoints from the provided configuration. + + Parameters: + mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. + """ + # Import the flow to test. + from orchestration.flows.bl7011.move import process_new_7011_file + + # Patch the Secret.load and init_transfer_client in the configuration context. + with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): + mocker.patch( + "orchestration.flows.bl7011.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a dummy TransferClient + ) + from orchestration.flows.bl7011.config import Config7011 + + # Instantiate the dummy configuration. + mock_config = Config7011() + + # Generate a test file path. + test_file_path = f"/tmp/test_file_{uuid4()}.txt" + + # Create a mock transfer controller with a mocked 'copy' method. + mock_transfer_controller = mocker.MagicMock() + mock_transfer_controller.copy.return_value = True + + # Patch get_transfer_controller where it is used in process_new_733_file. + mocker.patch( + "orchestration.flows.bl7011.move.get_transfer_controller", + return_value=mock_transfer_controller + ) + + # Execute the flow with the test file path and dummy configuration. + result = process_new_7011_file(file_path=test_file_path, config=mock_config) + + # Verify that the transfer controller's copy method was called exactly once. + assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert result is None, "The flow should return None" + + +# ---------------------------- +# Tests for 832 +# ---------------------------- + + class MockConfig832(): def __init__(self) -> None: # Mock configuration @@ -94,39 +205,10 @@ def __init__(self) -> None: self.scicat = config["scicat"] -# Mock the Client class to avoid real network calls -class MockGlobusComputeClient: - def __init__(self, *args, **kwargs): - # No real initialization, as this is a mock - pass - - def version_check(self): - # Mock version check to do nothing - pass - - def run(self, *args, **kwargs): - # Return a mock task ID - return "mock_task_id" - - def get_task(self, task_id): - # Return a mock task response - return { - "pending": False, - "status": "success", - "result": "mock_result" - } - - def get_result(self, task_id): - # Return a mock result - return "mock_result" - - def test_832_dispatcher(mocker: MockFixture): """Test 832 uber decision flow.""" # Mock the Secret block load using a simple manual mock class - class MockSecret: - value = str(uuid4()) mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()) @@ -174,11 +256,7 @@ def test_alcf_recon_flow(mocker: MockFixture): Case 4) data832->ALCF transfer fails => raises ValueError("Transfer to ALCF Failed") """ - # 1) Patch Secret.load(...) so HPC calls won't blow up from malformed UUID - mock_secret = mocker.MagicMock() - mock_secret.get.return_value = str(uuid4()) - - with mocker.patch('prefect.blocks.system.Secret.load', return_value=mock_secret): + with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): # 2) Patch out the calls in Config832 that do real Globus auth: # a) init_transfer_client(...) used in the constructor mocker.patch( diff --git a/orchestration/flows/bl7011/__init__.py b/orchestration/flows/bl7011/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orchestration/flows/bl7011/config.py b/orchestration/flows/bl7011/config.py new file mode 100644 index 00000000..59750c1e --- /dev/null +++ b/orchestration/flows/bl7011/config.py @@ -0,0 +1,13 @@ +from globus_sdk import TransferClient +from orchestration.globus import transfer + + +class Config7011: + def __init__(self) -> None: + config = transfer.get_config() + self.endpoints = transfer.build_endpoints(config) + self.apps = transfer.build_apps(config) + self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) + self.data7011 = self.endpoints["data7011"] + self.data7011_raw = self.endpoints["data7011_raw"] + self.nersc7011_alsdev_raw = self.endpoints["nersc7011_alsdev_raw"] diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py new file mode 100644 index 00000000..8a43999c --- /dev/null +++ b/orchestration/flows/bl7011/move.py @@ -0,0 +1,30 @@ +import logging +from prefect import flow + +from orchestration.flows.bl7011.config import Config7011 +from orchestration.transfer_controller import CopyMethod, get_transfer_controller + +logger = logging.getLogger(__name__) + + +@flow(name="new_7011_file_flow") +def process_new_7011_file( + file_path: str, + config: Config7011 +) -> None: + """ + + """ + + logger.info(f"Processing new 733 file: {file_path}") + + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=Config7011() + ) + + transfer_controller.copy( + file_path=file_path, + source=config.data7011_raw, + destination=config.nersc7011_alsdev_raw + ) From 13ed2600b58fe016f3b5effa2ca9d3f4cd7874fe Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 18 Feb 2025 12:03:38 -0800 Subject: [PATCH 2/2] Continuing to develop 7.0.1.1 flows. Updated docs, added a dispatcher.py script, added a create_deployments_7011.sh script, added comments indicating next steps. --- config.yml | 12 ++--- create_deployments_7011.sh | 28 ++++++++++++ docs/mkdocs/docs/bl7011.md | 40 +++++++++++++++++ docs/mkdocs/mkdocs.yml | 6 ++- orchestration/flows/bl7011/config.py | 5 ++- orchestration/flows/bl7011/dispatcher.py | 56 ++++++++++++++++++++++++ orchestration/flows/bl7011/move.py | 23 ++++++++-- 7 files changed, 157 insertions(+), 13 deletions(-) create mode 100644 create_deployments_7011.sh create mode 100644 docs/mkdocs/docs/bl7011.md create mode 100644 orchestration/flows/bl7011/dispatcher.py diff --git a/config.yml b/config.yml index 94836b3e..0adf9b03 100644 --- a/config.yml +++ b/config.yml @@ -15,17 +15,17 @@ globus: uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 name: nersc7011_alsdev_raw - data7011: + qnap7011: root_path: / - uri: data7011.lbl.gov + uri: qnap7011.lbl.gov uuid: tbd - name: data7011 + name: qnap7011 - data7011_raw: + qnap7011_raw: root_path: /data/raw - uri: data7011.lbl.gov + uri: qnap7011.lbl.gov uuid: tbd - name: data7011_raw + name: qnap7011_raw # 7.0.1.2 ENDPOINTS diff --git a/create_deployments_7011.sh b/create_deployments_7011.sh new file mode 100644 index 00000000..f078c7a3 --- /dev/null +++ b/create_deployments_7011.sh @@ -0,0 +1,28 @@ +export $(grep -v '^#' .env | xargs) + + +# Create work pools. If a work pool already exists, it will throw a warning but that's no problem +prefect work-pool create 'dispatcher_7011_flow_pool' +prefect work-pool create 'new_file_7011_flow_pool' +# prefect work-pool create 'new_file_7011_prune_pool' + +# dispatcher_7011_flow_pool + # in docker-compose.yaml: + # command: prefect agent start --pool "dispatcher_7011_flow_pool" +prefect deployment build ./orchestration/flows/bl7011/dispatcher.py:dispatcher -n run_733_dispatcher -p dispatcher_pool -q dispatcher_733_queue +prefect deployment apply dispatcher-deployment.yaml + +# new_file_7011_flow_pool + # in docker-compose.yaml: + # command: prefect agent start --pool "new_file_7011_flow_pool" +prefect deployment build ./orchestration/flows/bl7011/move.py:process_new_7011_file -n new_file_7011 -p new_file_7011_flow_pool -q new_file_7011_queue +prefect deployment apply process_new_7011_file-deployment.yaml + + +# TODO: Wait for PR #62 to be merged and use the new prune_controller +# new_file_7011_prune_pool + # in docker-compose.yaml: + # command: prefect agent start --pool "new_file_7011_prune_pool" + +# prefect deployment build ./orchestration/flows/bl7011/prune.py:prune_data7011 -n prune_data7011 -p new_file_7011_prune_pool -q prune_data7011_queue +# prefect deployment apply prune_data7011-deployment.yaml diff --git a/docs/mkdocs/docs/bl7011.md b/docs/mkdocs/docs/bl7011.md new file mode 100644 index 00000000..bb7e9164 --- /dev/null +++ b/docs/mkdocs/docs/bl7011.md @@ -0,0 +1,40 @@ +# Beamline 7.0.1.1 Flows + +This page documents the workflows supported by Splash Flows Globus at [ALS Beamline 7.0.1.1 (COSMIC Scattering)](https://als.lbl.gov/beamlines/7-0-1-1/). + +## Data at 7.0.1.1 + +At Beamline 7.0.1.1, users generate data in an HDF5 format containing a background subtracted stack of 2D images with associated Labview metadata. Depending on the experiment, the file sizes can be greater than 100GB. A ROI is exported for each dataset. + +## File Watcher + +There is a file watcher on the system `QNAP` that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps: +- Copy scans in real time to `NERSC CFS` using Globus Transfer. +- Copy project data to `NERSC HPSS` for long-term storage. +- Analysis on HPC systems (TBD). +- Schedule data pruning from `QNAP` and `NERSC CFS`. + + +## Prefect Configuration + +### Registered Flows + +#### `dispatcher.py` + +The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. As as soon as the File Watcher detects that a new file is written, it calls the `dispatcher()` Flow. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code). + +#### `move.py` + +Flow to process a new file at BL 7.0.1.1 +1. Copy the file from the QNAP to NERSC CFS. Ingest file path in SciCat. +2. Schedule pruning from QNAP. +3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. +4. Schedule pruning from NERSC CFS. + + +## VM Details + +The computing backend runs on a VM in the B15 server room that is managed by ALS IT staff. + +**Name**: `flow-xpcs` +**OS**: `Ubuntu 20.02 LTS` ... **must be updated to `Ubuntu 24.04 LTS`** \ No newline at end of file diff --git a/docs/mkdocs/mkdocs.yml b/docs/mkdocs/mkdocs.yml index 728a990e..2c11fa6c 100644 --- a/docs/mkdocs/mkdocs.yml +++ b/docs/mkdocs/mkdocs.yml @@ -13,8 +13,10 @@ nav: - Home: index.md - Installation and Requirements: install.md - Getting Started: getting_started.md -- Compute at ALCF: alcf832.md -- Compute at NERSC: nersc832.md +- Beamline Implementations: + - 7.3.3 COSMIC Scattering: bl7011.md + - 8.3.2 Micro Tomography - Compute at ALCF: alcf832.md + - 8.3.2 Micro Tomography - Compute at NERSC: nersc832.md - Orchestration: orchestration.md - Configuration: configuration.md # - Troubleshooting: troubleshooting.md diff --git a/orchestration/flows/bl7011/config.py b/orchestration/flows/bl7011/config.py index 59750c1e..414d2d50 100644 --- a/orchestration/flows/bl7011/config.py +++ b/orchestration/flows/bl7011/config.py @@ -2,12 +2,13 @@ from orchestration.globus import transfer +# TODO: Use BeamlineConfig base class (Waiting for PR #62 to be merged) class Config7011: def __init__(self) -> None: config = transfer.get_config() self.endpoints = transfer.build_endpoints(config) self.apps = transfer.build_apps(config) self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) - self.data7011 = self.endpoints["data7011"] - self.data7011_raw = self.endpoints["data7011_raw"] + self.qnap7011 = self.endpoints["qnap7011"] + self.qnap7011_raw = self.endpoints["qnap7011_raw"] self.nersc7011_alsdev_raw = self.endpoints["nersc7011_alsdev_raw"] diff --git a/orchestration/flows/bl7011/dispatcher.py b/orchestration/flows/bl7011/dispatcher.py new file mode 100644 index 00000000..4b22a0a7 --- /dev/null +++ b/orchestration/flows/bl7011/dispatcher.py @@ -0,0 +1,56 @@ +import logging +from prefect import flow +from typing import Optional, Union, Any + +from orchestration.flows.bl7011.move import process_new_7011_file + +logger = logging.getLogger(__name__) + + +@flow(name="dispatcher") +def dispatcher( + file_path: Optional[str] = None, + is_export_control: bool = False, + config: Optional[Union[dict, Any]] = None, +) -> None: + """ + Dispatcher flow for BL733 beamline that launches the new_733_file_flow. + + :param file_path: Path to the file to be processed. + :param is_export_control: Flag indicating if export control measures should be applied. + (Not used in the current BL733 processing) + :param config: Configuration settings for processing. + Expected to be an instance of Config733 or a dict that can be converted. + :raises ValueError: If no configuration is provided. + :raises TypeError: If the provided configuration is not a dict or Config733. + """ + + logger.info("Starting dispatcher flow for BL 7.0.1.1") + logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}") + + # Validate inputs. + if file_path is None: + logger.error("No file_path provided to dispatcher.") + raise ValueError("File path is required for processing.") + + if is_export_control: + logger.error("Data is under export control. Processing is not allowed.") + raise ValueError("Data is under export control. Processing is not allowed.") + + if config is None: + logger.error("No configuration provided to dispatcher.") + raise ValueError("Configuration (config) is required for processing.") + + try: + process_new_7011_file( + file_path=file_path, + config=config + ) + logger.info("Dispatcher flow completed successfully.") + except Exception as e: + logger.error(f"Error during processing in dispatcher flow: {e}") + raise + + +if __name__ == "__main__": + dispatcher() diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index 8a43999c..7cc4bd12 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -13,10 +13,17 @@ def process_new_7011_file( config: Config7011 ) -> None: """ - + Flow to process a new file at BL 7.0.1.1 + 1. Copy the file from the QNAP to NERSC CFS. Ingest file path in SciCat. + 2. Schedule pruning from QNAP. + 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. + 4. Schedule pruning from NERSC CFS. + + :param file_path: Path to the new file to be processed. + :param config: Configuration settings for processing. """ - logger.info(f"Processing new 733 file: {file_path}") + logger.info(f"Processing new BL 7.0.1.1 file: {file_path}") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, @@ -25,6 +32,16 @@ def process_new_7011_file( transfer_controller.copy( file_path=file_path, - source=config.data7011_raw, + source=config.qnap7011_raw, destination=config.nersc7011_alsdev_raw ) + + # TODO: Ingest file path in SciCat + + # TODO: Schedule pruning from QNAP + # Waiting for PR #62 to be merged (prune_controller) + + # TODO: Copy the file from NERSC CFS to NERSC HPSS + # Waiting for PR #62 to be merged (transfer_controller) + + # TODO: Ingest file path in SciCat