From cc8fde245fc6e3c5fc43e507ea37fedafe6afcf8 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 7 Jan 2026 13:19:21 -0800 Subject: [PATCH 01/11] Updating bl733 config to use the Lamarr endpoint --- config.yml | 14 ++++---------- orchestration/flows/bl733/config.py | 2 +- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/config.yml b/config.yml index d8d990a4..508058b5 100644 --- a/config.yml +++ b/config.yml @@ -41,17 +41,11 @@ globus: uuid: 26b3d2cf-fd80-4a64-a78f-38a155aca926 name: data733_raw - bl733-lamarr-global: - root_path: /bl733/ + bl733-lamarr-data: + root_path: /beamlines/bl733/ uri: lamarr.als.lbl.gov - uuid: dbaac176-b1f7-4134-979a-0b1668786d11 - name: bl733-lamarr-global - - bl733-lamarr-beamlines: - root_path: /bl733/ - uri: lamarr.als.lbl.gov - uuid: aee983fc-826e-4081-bfb2-62529970540d - name: bl733-lamarr-beamlines + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl733-lamarr-data # 8.3.2 ENDPOINTS diff --git a/orchestration/flows/bl733/config.py b/orchestration/flows/bl733/config.py index 3e1637ca..b796f09c 100644 --- a/orchestration/flows/bl733/config.py +++ b/orchestration/flows/bl733/config.py @@ -12,4 +12,4 @@ def __init__(self) -> None: self.data733 = self.endpoints["bl733-als-data733"] self.data733_raw = self.endpoints["bl733-als-data733_raw"] self.nersc733_alsdev_raw = self.endpoints["bl733-nersc-alsdev_raw"] - self.lamarr733 = self.endpoints["bl733-lamarr-beamlines"] + self.lamarr733 = self.endpoints["bl733-lamarr-data"] From 3ea19a283a0f0ca54028ec2c2df4d2e5de2ca90f Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 7 Jan 2026 13:23:17 -0800 Subject: [PATCH 02/11] Renaming the folder on Lamarr's root_path to match the other endpoints (periods between the numbers in the beamline id) --- config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yml b/config.yml index 508058b5..fdabf0c2 100644 --- a/config.yml +++ b/config.yml @@ -42,7 +42,7 @@ globus: name: data733_raw bl733-lamarr-data: - root_path: /beamlines/bl733/ + root_path: /beamlines/7.3.3/ uri: lamarr.als.lbl.gov uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a name: bl733-lamarr-data From ec39b80abda060181b0e6830abfa55e149a25fe7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 7 Jan 2026 17:16:00 -0800 Subject: [PATCH 03/11] Fixing schedule prefect flow flow_run_name typo --- orchestration/flows/bl733/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index 296bf875..d06833c2 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -75,7 +75,7 @@ def prune( try: schedule_prefect_flow( deployment_name="prune_globus_endpoint/prune_globus_endpoint", - flow_name=f"prune_globus-{source_endpoint.name}-{file_path}", + flow_run_name=f"prune_globus-{source_endpoint.name}-{file_path}", parameters={ "relative_path": file_path, "source_endpoint": source_endpoint, From 032191d944cf055f45e06458d5b1b40d30a3bb56 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 7 Jan 2026 17:27:20 -0800 Subject: [PATCH 04/11] Fixing prune deployment name --- orchestration/flows/bl733/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index d06833c2..9bab3264 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -74,7 +74,7 @@ def prune( try: schedule_prefect_flow( - deployment_name="prune_globus_endpoint/prune_globus_endpoint", + deployment_name="_prune_globus_endpoint/prune_data733", flow_run_name=f"prune_globus-{source_endpoint.name}-{file_path}", parameters={ "relative_path": file_path, From a598e1d92db8b52b8ff2af06c70658bf769d565b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 7 Jan 2026 17:37:03 -0800 Subject: [PATCH 05/11] removing underscore from flow name --- orchestration/flows/bl733/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index 9bab3264..849a6cb9 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -74,7 +74,7 @@ def prune( try: schedule_prefect_flow( - deployment_name="_prune_globus_endpoint/prune_data733", + deployment_name="prune_globus_endpoint/prune_data733", flow_run_name=f"prune_globus-{source_endpoint.name}-{file_path}", parameters={ "relative_path": file_path, From ecef981b6c4da0a30907c88091b3dae7698c9402 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 7 Jan 2026 17:44:26 -0800 Subject: [PATCH 06/11] removing config from schedule_prefect_flow call --- orchestration/flows/bl733/move.py | 1 - 1 file changed, 1 deletion(-) diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index 849a6cb9..3ad4d287 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -80,7 +80,6 @@ def prune( "relative_path": file_path, "source_endpoint": source_endpoint, "check_endpoint": check_endpoint, - "config": config }, duration_from_now=delay, ) From 6a9261bb412f99229c6f1903c9df631a326c5ab9 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 7 Jan 2026 17:55:03 -0800 Subject: [PATCH 07/11] Adding more verbose logging to move flow --- orchestration/flows/bl733/move.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index 3ad4d287..6e5caf36 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -175,31 +175,42 @@ def process_new_733_file_task( logger.info(f"Processing new 733 file: {file_path}") if not config: + logger.info("No config provided, creating default Config733") config = Config733() + logger.info("Initializing Globus transfer controller") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, config=config ) + logger.info(f"Step 1: Copying {file_path} from data733 to Lamarr ({config.lamarr733.name})") transfer_controller.copy( file_path=file_path, source=config.data733_raw, destination=config.lamarr733 ) + logger.info("Step 1 complete: File copied to Lamarr") + logger.info(f"Step 2: Copying {file_path} from data733 to NERSC CFS ({config.nersc733_alsdev_raw.name})") transfer_controller.copy( file_path=file_path, source=config.data733_raw, destination=config.nersc733_alsdev_raw ) + logger.info("Step 2 complete: File copied to NERSC CFS") + + logger.info(f"Step 3: Ingesting {file_path} into SciCat") # Note that the SciCat ingester assumes the data is on Lamarr. try: scicat_ingest_flow(dataset_path=Path(file_path), ingester_spec="als733_saxs") + logger.info("Step 3 complete: SciCat ingest successful") except Exception as e: logger.error(f"SciCat ingest failed with {e}") + logger.info("Step 4: Scheduling pruning from data733") + # Waiting for PR #62 to be merged (prune_controller) bl733_settings = Variable.get("bl733-settings", _sync=True) prune( @@ -209,6 +220,9 @@ def process_new_733_file_task( days_from_now=bl733_settings["delete_data733_files_after_days"] # 180 days ) + logger.info("Step 4 complete: Pruning scheduled") + logger.info(f"All steps complete for {file_path}") + # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? # Waiting for PR #62 to be merged (transfer_controller) From 504e5abb927941912c751692b8beb5fb135969a6 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 12 Jan 2026 10:34:04 -0800 Subject: [PATCH 08/11] renaming Lamarr references to beegfs --- config.yml | 7 +++---- orchestration/flows/bl733/config.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/config.yml b/config.yml index fdabf0c2..33b78daa 100644 --- a/config.yml +++ b/config.yml @@ -41,12 +41,11 @@ globus: uuid: 26b3d2cf-fd80-4a64-a78f-38a155aca926 name: data733_raw - bl733-lamarr-data: + bl733-beegfs-data: root_path: /beamlines/7.3.3/ - uri: lamarr.als.lbl.gov + uri: beegfs.als.lbl.gov uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a - name: bl733-lamarr-data - + name: bl733-beegfs-data # 8.3.2 ENDPOINTS spot832: diff --git a/orchestration/flows/bl733/config.py b/orchestration/flows/bl733/config.py index b796f09c..49d1606e 100644 --- a/orchestration/flows/bl733/config.py +++ b/orchestration/flows/bl733/config.py @@ -12,4 +12,4 @@ def __init__(self) -> None: self.data733 = self.endpoints["bl733-als-data733"] self.data733_raw = self.endpoints["bl733-als-data733_raw"] self.nersc733_alsdev_raw = self.endpoints["bl733-nersc-alsdev_raw"] - self.lamarr733 = self.endpoints["bl733-lamarr-data"] + self.beegfs733 = self.endpoints["bl733-beegfs-data"] From f1a1b012236ff43d663da4db04806ee4f5d73368 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 12 Jan 2026 14:52:50 -0800 Subject: [PATCH 09/11] Updating bl733-beegfs-data root path --- config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yml b/config.yml index 33b78daa..3f26a4f0 100644 --- a/config.yml +++ b/config.yml @@ -42,7 +42,7 @@ globus: name: data733_raw bl733-beegfs-data: - root_path: /beamlines/7.3.3/ + root_path: /beamline_staging/bl733/raw/ uri: beegfs.als.lbl.gov uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a name: bl733-beegfs-data From 3d61bde177e7cb59c611b82b4b2339e441bfb825 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 12 Jan 2026 16:42:48 -0800 Subject: [PATCH 10/11] updating dispatcher to handle a list of files, and generating a flow run name based on whether one or more files are sent --- orchestration/flows/bl733/dispatcher.py | 40 ++++++++++++++++++++----- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/orchestration/flows/bl733/dispatcher.py b/orchestration/flows/bl733/dispatcher.py index f2f82bcc..9f652d23 100644 --- a/orchestration/flows/bl733/dispatcher.py +++ b/orchestration/flows/bl733/dispatcher.py @@ -1,5 +1,6 @@ import logging -from prefect import flow +from pathlib import Path +from prefect import flow, runtime from typing import Optional, Union, Any from orchestration.flows.bl733.move import process_new_733_file_task @@ -8,17 +9,32 @@ logger = logging.getLogger(__name__) +def generate_flow_run_name() -> str: + """Generate flow run name from runtime parameters.""" + params = runtime.flow_run.parameters + file_path = params.get("file_path") + if file_path is None: + return "dispatcher-no_file" + elif isinstance(file_path, str): + return f"dispatcher-{Path(file_path).name}" + elif len(file_path) == 1: + return f"dispatcher-{Path(file_path[0]).name}" + else: + return f"dispatcher-{Path(file_path[0]).name}_+{len(file_path)-1}_more" + + # TODO Once this PR (https://github.com/als-computing/splash_flows/pull/62) is merged, we can use config: Config733 -@flow(name="dispatcher", flow_run_name="dispatcher-{file_path}") +@flow(name="dispatcher", flow_run_name=generate_flow_run_name) def dispatcher( - file_path: Optional[str] = None, + file_path: Optional[Union[str, list[str]]] = None, is_export_control: bool = False, config: Optional[Union[dict, Any]] = None, ) -> None: """ Dispatcher flow for BL733 beamline that launches the new_733_file_flow. - :param file_path: Path to the file to be processed. + :param file_path: Path(s) to the file(s) to be processed. Can be a single path (str) + or multiple paths (list[str]). :param is_export_control: Flag indicating if export control measures should be applied. (Not used in the current BL733 processing) :param config: Configuration settings for processing. @@ -26,11 +42,21 @@ def dispatcher( :raises ValueError: If no file_path is provided. """ - logger.info("Starting dispatcher flow for BL 7.3.3") + # Normalize file_path to a list + if file_path is None: + file_paths = [] + elif isinstance(file_path, str): + file_paths = [file_path] + else: + file_paths = file_path + + num_files = len(file_paths) + + logger.info(f"Starting dispatcher flow for BL 7.3.3 with {num_files} file(s)") logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}") # Validate inputs and raise errors if necessary. The ValueErrors prevent the rest of the flow from running. - if file_path is None: + if not file_paths: # returns True for empty list logger.error("No file_path provided to dispatcher.") raise ValueError("File path is required for processing.") @@ -46,7 +72,7 @@ def dispatcher( file_path=file_path, config=config ) - logger.info("Dispatcher flow completed successfully.") + logger.info(f"Dispatcher flow completed successfully for {num_files} file(s).") except Exception as e: logger.error(f"Error during processing in dispatcher flow: {e}") raise From 1cd4e32067fd0e81238c47ce6be273148f8c94a6 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 12 Jan 2026 16:44:02 -0800 Subject: [PATCH 11/11] Updating move.py so the move task/flows handle a list of files. Added a function to find the common root path between the files. Sends one globus transfer per endpoint for that common path. Send the list of files to scicat ingester using the path they should be on beegfs. --- orchestration/flows/bl733/move.py | 98 +++++++++++++++++++++++-------- 1 file changed, 74 insertions(+), 24 deletions(-) diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index 6e5caf36..9029e2c8 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -1,6 +1,7 @@ import datetime import logging from pathlib import Path +import os from typing import Optional from prefect import flow, get_run_logger, task @@ -18,6 +19,24 @@ # Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. +def get_common_parent_path(file_paths: list[str]) -> str: + """ + Find the highest common parent directory for a list of file paths. + + :param file_paths: List of file paths + :return: Common parent directory path + """ + if not file_paths: + raise ValueError("No file paths provided") + + if len(file_paths) == 1: + # Single file - return its parent directory + return str(Path(file_paths[0]).parent) + + # Use os.path.commonpath for multiple files + return os.path.commonpath(file_paths) + + def prune( file_path: str = None, source_endpoint: GlobusEndpoint = None, @@ -132,9 +151,9 @@ def _prune_globus_endpoint( ) -@flow(name="new_733_file_flow", flow_run_name="process_new-{file_path}") +@flow(name="new_733_file_flow", flow_run_name="process_new-{file_path[0]}") def process_new_733_file_flow( - file_path: str, + file_path: list[str], config: Optional[Config733] = None ) -> None: """ @@ -144,7 +163,7 @@ def process_new_733_file_flow( 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. 4. Schedule pruning from NERSC CFS. - :param file_path: Path to the new file to be processed. + :param file_paths: Paths to the new files to be processed. :param config: Configuration settings for processing. :return: None """ @@ -156,55 +175,85 @@ def process_new_733_file_flow( @task(name="new_733_file_task") def process_new_733_file_task( - file_path: str, + file_path: list[str], config: Optional[Config733] = None ) -> None: """ Task to process new data at BL 7.3.3 - 1. Copy the data from data733 to Lamarr (our common staging area). + 1. Copy the data from data733 to beegfs (our common staging area). 2. Copy the file from the data733 to NERSC CFS. - 3. Ingest the data from Lamarr into SciCat. + 3. Ingest the data from beegfs into SciCat. 4. Schedule pruning from data733 for 6 months from now. 5. Archive the file from NERSC CFS to NERSC HPSS at some point in the future. - :param file_path: Path to the new file to be processed. + :param file_paths: Path(s) to the new file(s) to be processed. :param config: Configuration settings for processing. """ logger = get_run_logger() - logger.info(f"Processing new 733 file: {file_path}") + if file_path is None: + file_paths = [] + elif isinstance(file_path, str): + file_paths = [file_path] + else: + file_paths = file_path + + if not file_paths: + logger.error("No file_paths provided") + raise ValueError("No file_paths provided") + + logger.info(f"Processing new 733 files: {file_paths}") if not config: logger.info("No config provided, creating default Config733") config = Config733() + common_path = get_common_parent_path(file_paths) + logger.info(f"Common parent path: {common_path}") + logger.info("Initializing Globus transfer controller") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, config=config ) - logger.info(f"Step 1: Copying {file_path} from data733 to Lamarr ({config.lamarr733.name})") + logger.info(f"Step 1: Copying {common_path} from data733 to beegfs ({config.beegfs733.name})") + transfer_controller.copy( - file_path=file_path, + file_path=common_path, source=config.data733_raw, - destination=config.lamarr733 + destination=config.beegfs733 ) - logger.info("Step 1 complete: File copied to Lamarr") + logger.info("Step 1 complete: File copied to beegfs") - logger.info(f"Step 2: Copying {file_path} from data733 to NERSC CFS ({config.nersc733_alsdev_raw.name})") + logger.info(f"Step 2: Copying {common_path} from data733 to NERSC CFS ({config.nersc733_alsdev_raw.name})") transfer_controller.copy( - file_path=file_path, + file_path=common_path, source=config.data733_raw, destination=config.nersc733_alsdev_raw ) logger.info("Step 2 complete: File copied to NERSC CFS") - logger.info(f"Step 3: Ingesting {file_path} into SciCat") - # Note that the SciCat ingester assumes the data is on Lamarr. + logger.info(f"Step 3: Ingesting {len(file_paths)} into SciCat") + + # Build beegfs paths for SciCat ingestion + beegfs_file_paths = [] + for fp in file_paths: + # Get relative path from source root + try: + rel_path = str(Path(fp).relative_to(config.data733_raw.root_path)) + except ValueError: + # Already a relative path + rel_path = fp.lstrip("/") + + # Build full beegfs path + beegfs_path = "/global/" + config.beegfs733.root_path.strip("/") + "/" + rel_path + beegfs_file_paths.append(beegfs_path) + + logger.info(f"Beegfs paths: {beegfs_file_paths}") try: - scicat_ingest_flow(dataset_path=Path(file_path), ingester_spec="als733_saxs") + scicat_ingest_flow(file_paths=beegfs_file_paths, ingester_spec="als733_saxs") logger.info("Step 3 complete: SciCat ingest successful") except Exception as e: logger.error(f"SciCat ingest failed with {e}") @@ -213,15 +262,16 @@ def process_new_733_file_task( # Waiting for PR #62 to be merged (prune_controller) bl733_settings = Variable.get("bl733-settings", _sync=True) - prune( - file_path=file_path, - source_endpoint=config.data733_raw, - check_endpoint=config.nersc733_alsdev_raw, - days_from_now=bl733_settings["delete_data733_files_after_days"] # 180 days - ) + for file_path in file_paths: + prune( + file_path=file_path, + source_endpoint=config.data733_raw, + check_endpoint=config.nersc733_alsdev_raw, + days_from_now=bl733_settings["delete_data733_files_after_days"] # 180 days + ) logger.info("Step 4 complete: Pruning scheduled") - logger.info(f"All steps complete for {file_path}") + logger.info(f"All steps complete for {len(file_paths)} file(s)") # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? # Waiting for PR #62 to be merged (transfer_controller)