diff --git a/config.yml b/config.yml index d8d990a4..3f26a4f0 100644 --- a/config.yml +++ b/config.yml @@ -41,18 +41,11 @@ globus: uuid: 26b3d2cf-fd80-4a64-a78f-38a155aca926 name: data733_raw - bl733-lamarr-global: - root_path: /bl733/ - uri: lamarr.als.lbl.gov - uuid: dbaac176-b1f7-4134-979a-0b1668786d11 - name: bl733-lamarr-global - - bl733-lamarr-beamlines: - root_path: /bl733/ - uri: lamarr.als.lbl.gov - uuid: aee983fc-826e-4081-bfb2-62529970540d - name: bl733-lamarr-beamlines - + bl733-beegfs-data: + root_path: /beamline_staging/bl733/raw/ + uri: beegfs.als.lbl.gov + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl733-beegfs-data # 8.3.2 ENDPOINTS spot832: diff --git a/orchestration/flows/bl733/config.py b/orchestration/flows/bl733/config.py index 3e1637ca..49d1606e 100644 --- a/orchestration/flows/bl733/config.py +++ b/orchestration/flows/bl733/config.py @@ -12,4 +12,4 @@ def __init__(self) -> None: self.data733 = self.endpoints["bl733-als-data733"] self.data733_raw = self.endpoints["bl733-als-data733_raw"] self.nersc733_alsdev_raw = self.endpoints["bl733-nersc-alsdev_raw"] - self.lamarr733 = self.endpoints["bl733-lamarr-beamlines"] + self.beegfs733 = self.endpoints["bl733-beegfs-data"] diff --git a/orchestration/flows/bl733/dispatcher.py b/orchestration/flows/bl733/dispatcher.py index f2f82bcc..9f652d23 100644 --- a/orchestration/flows/bl733/dispatcher.py +++ b/orchestration/flows/bl733/dispatcher.py @@ -1,5 +1,6 @@ import logging -from prefect import flow +from pathlib import Path +from prefect import flow, runtime from typing import Optional, Union, Any from orchestration.flows.bl733.move import process_new_733_file_task @@ -8,17 +9,32 @@ logger = logging.getLogger(__name__) +def generate_flow_run_name() -> str: + """Generate flow run name from runtime parameters.""" + params = runtime.flow_run.parameters + file_path = params.get("file_path") + if file_path is None: + return "dispatcher-no_file" + elif isinstance(file_path, str): + return f"dispatcher-{Path(file_path).name}" + elif len(file_path) == 1: + return f"dispatcher-{Path(file_path[0]).name}" + else: + return f"dispatcher-{Path(file_path[0]).name}_+{len(file_path)-1}_more" + + # TODO Once this PR (https://github.com/als-computing/splash_flows/pull/62) is merged, we can use config: Config733 -@flow(name="dispatcher", flow_run_name="dispatcher-{file_path}") +@flow(name="dispatcher", flow_run_name=generate_flow_run_name) def dispatcher( - file_path: Optional[str] = None, + file_path: Optional[Union[str, list[str]]] = None, is_export_control: bool = False, config: Optional[Union[dict, Any]] = None, ) -> None: """ Dispatcher flow for BL733 beamline that launches the new_733_file_flow. - :param file_path: Path to the file to be processed. + :param file_path: Path(s) to the file(s) to be processed. Can be a single path (str) + or multiple paths (list[str]). :param is_export_control: Flag indicating if export control measures should be applied. (Not used in the current BL733 processing) :param config: Configuration settings for processing. @@ -26,11 +42,21 @@ def dispatcher( :raises ValueError: If no file_path is provided. """ - logger.info("Starting dispatcher flow for BL 7.3.3") + # Normalize file_path to a list + if file_path is None: + file_paths = [] + elif isinstance(file_path, str): + file_paths = [file_path] + else: + file_paths = file_path + + num_files = len(file_paths) + + logger.info(f"Starting dispatcher flow for BL 7.3.3 with {num_files} file(s)") logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}") # Validate inputs and raise errors if necessary. The ValueErrors prevent the rest of the flow from running. - if file_path is None: + if not file_paths: # returns True for empty list logger.error("No file_path provided to dispatcher.") raise ValueError("File path is required for processing.") @@ -46,7 +72,7 @@ def dispatcher( file_path=file_path, config=config ) - logger.info("Dispatcher flow completed successfully.") + logger.info(f"Dispatcher flow completed successfully for {num_files} file(s).") except Exception as e: logger.error(f"Error during processing in dispatcher flow: {e}") raise diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index 296bf875..9029e2c8 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -1,6 +1,7 @@ import datetime import logging from pathlib import Path +import os from typing import Optional from prefect import flow, get_run_logger, task @@ -18,6 +19,24 @@ # Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. +def get_common_parent_path(file_paths: list[str]) -> str: + """ + Find the highest common parent directory for a list of file paths. + + :param file_paths: List of file paths + :return: Common parent directory path + """ + if not file_paths: + raise ValueError("No file paths provided") + + if len(file_paths) == 1: + # Single file - return its parent directory + return str(Path(file_paths[0]).parent) + + # Use os.path.commonpath for multiple files + return os.path.commonpath(file_paths) + + def prune( file_path: str = None, source_endpoint: GlobusEndpoint = None, @@ -74,13 +93,12 @@ def prune( try: schedule_prefect_flow( - deployment_name="prune_globus_endpoint/prune_globus_endpoint", - flow_name=f"prune_globus-{source_endpoint.name}-{file_path}", + deployment_name="prune_globus_endpoint/prune_data733", + flow_run_name=f"prune_globus-{source_endpoint.name}-{file_path}", parameters={ "relative_path": file_path, "source_endpoint": source_endpoint, "check_endpoint": check_endpoint, - "config": config }, duration_from_now=delay, ) @@ -133,9 +151,9 @@ def _prune_globus_endpoint( ) -@flow(name="new_733_file_flow", flow_run_name="process_new-{file_path}") +@flow(name="new_733_file_flow", flow_run_name="process_new-{file_path[0]}") def process_new_733_file_flow( - file_path: str, + file_path: list[str], config: Optional[Config733] = None ) -> None: """ @@ -145,7 +163,7 @@ def process_new_733_file_flow( 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. 4. Schedule pruning from NERSC CFS. - :param file_path: Path to the new file to be processed. + :param file_paths: Paths to the new files to be processed. :param config: Configuration settings for processing. :return: None """ @@ -157,58 +175,103 @@ def process_new_733_file_flow( @task(name="new_733_file_task") def process_new_733_file_task( - file_path: str, + file_path: list[str], config: Optional[Config733] = None ) -> None: """ Task to process new data at BL 7.3.3 - 1. Copy the data from data733 to Lamarr (our common staging area). + 1. Copy the data from data733 to beegfs (our common staging area). 2. Copy the file from the data733 to NERSC CFS. - 3. Ingest the data from Lamarr into SciCat. + 3. Ingest the data from beegfs into SciCat. 4. Schedule pruning from data733 for 6 months from now. 5. Archive the file from NERSC CFS to NERSC HPSS at some point in the future. - :param file_path: Path to the new file to be processed. + :param file_paths: Path(s) to the new file(s) to be processed. :param config: Configuration settings for processing. """ logger = get_run_logger() - logger.info(f"Processing new 733 file: {file_path}") + if file_path is None: + file_paths = [] + elif isinstance(file_path, str): + file_paths = [file_path] + else: + file_paths = file_path + + if not file_paths: + logger.error("No file_paths provided") + raise ValueError("No file_paths provided") + + logger.info(f"Processing new 733 files: {file_paths}") if not config: + logger.info("No config provided, creating default Config733") config = Config733() + common_path = get_common_parent_path(file_paths) + logger.info(f"Common parent path: {common_path}") + + logger.info("Initializing Globus transfer controller") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, config=config ) + logger.info(f"Step 1: Copying {common_path} from data733 to beegfs ({config.beegfs733.name})") + transfer_controller.copy( - file_path=file_path, + file_path=common_path, source=config.data733_raw, - destination=config.lamarr733 + destination=config.beegfs733 ) + logger.info("Step 1 complete: File copied to beegfs") + logger.info(f"Step 2: Copying {common_path} from data733 to NERSC CFS ({config.nersc733_alsdev_raw.name})") transfer_controller.copy( - file_path=file_path, + file_path=common_path, source=config.data733_raw, destination=config.nersc733_alsdev_raw ) - # Note that the SciCat ingester assumes the data is on Lamarr. + logger.info("Step 2 complete: File copied to NERSC CFS") + + logger.info(f"Step 3: Ingesting {len(file_paths)} into SciCat") + + # Build beegfs paths for SciCat ingestion + beegfs_file_paths = [] + for fp in file_paths: + # Get relative path from source root + try: + rel_path = str(Path(fp).relative_to(config.data733_raw.root_path)) + except ValueError: + # Already a relative path + rel_path = fp.lstrip("/") + + # Build full beegfs path + beegfs_path = "/global/" + config.beegfs733.root_path.strip("/") + "/" + rel_path + beegfs_file_paths.append(beegfs_path) + + logger.info(f"Beegfs paths: {beegfs_file_paths}") try: - scicat_ingest_flow(dataset_path=Path(file_path), ingester_spec="als733_saxs") + scicat_ingest_flow(file_paths=beegfs_file_paths, ingester_spec="als733_saxs") + logger.info("Step 3 complete: SciCat ingest successful") except Exception as e: logger.error(f"SciCat ingest failed with {e}") + logger.info("Step 4: Scheduling pruning from data733") + # Waiting for PR #62 to be merged (prune_controller) bl733_settings = Variable.get("bl733-settings", _sync=True) - prune( - file_path=file_path, - source_endpoint=config.data733_raw, - check_endpoint=config.nersc733_alsdev_raw, - days_from_now=bl733_settings["delete_data733_files_after_days"] # 180 days - ) + for file_path in file_paths: + prune( + file_path=file_path, + source_endpoint=config.data733_raw, + check_endpoint=config.nersc733_alsdev_raw, + days_from_now=bl733_settings["delete_data733_files_after_days"] # 180 days + ) + + logger.info("Step 4 complete: Pruning scheduled") + logger.info(f"All steps complete for {len(file_paths)} file(s)") # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? # Waiting for PR #62 to be merged (transfer_controller)