From 20bf2aedd29fd1f63ada909387914149fef95a08 Mon Sep 17 00:00:00 2001 From: kkacanja Date: Thu, 26 Mar 2026 10:39:43 -0400 Subject: [PATCH] adding file to allow splitable bank dirs --- pycbc/workflow/splittable.py | 51 +++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/pycbc/workflow/splittable.py b/pycbc/workflow/splittable.py index eed0ead5af5..57d0a9944d0 100644 --- a/pycbc/workflow/splittable.py +++ b/pycbc/workflow/splittable.py @@ -30,12 +30,17 @@ import os import logging +import glob +import math -from pycbc.workflow.core import FileList, make_analysis_dir +from pycbc.workflow.core import FileList, make_analysis_dir, File from pycbc.workflow.jobsetup import (PycbcSplitBankExecutable, PycbcSplitBankXmlExecutable, PycbcSplitInspinjExecutable, PycbcHDFSplitInjExecutable) +from urllib.parse import urljoin +from urllib.request import pathname2url + logger = logging.getLogger('pycbc.workflow.splittable') def select_splitfilejob_instance(curr_exe): @@ -109,18 +114,59 @@ def setup_splittable_workflow(workflow, input_tables, out_dir=None, tags=None): logger.info("Adding split output file jobs to workflow.") split_table_outs = setup_splittable_dax_generated(workflow, input_tables, out_dir, tags) + elif splitMethod == "MANUAL_DIRECTORY": + logger.info("Registering pre-existing split files from directory.") + split_table_outs = setup_splittable_manual_directory(workflow, tags) elif splitMethod == "NOOP": # Probably better not to call the module at all, but this option will # return the input file list. split_table_outs = input_tables else: errMsg = "Splittable method not recognized. Must be one of " - errMsg += "IN_WORKFLOW or NOOP." + errMsg += "IN_WORKFLOW, MANUAL_DIRECTORY or NOOP." raise ValueError(errMsg) logger.info("Leaving split output files module.") return split_table_outs +def setup_splittable_manual_directory(workflow, tags=None): + """ + New function to glob a directory and register existing files as + workflow products. + """ + if tags is None: + tags = [] + cp = workflow.cp + + # Get directory from config + bank_dir = cp.get_opt_tags("workflow-splittable", "tmpltbank-directory", tags) + + # Glob all HDF files + bank_paths = sorted(glob.glob(os.path.join(bank_dir, '*.hdf'))) + if not bank_paths: + raise ValueError(f"No .hdf files found in {bank_dir}") + + n_dp = math.ceil(math.log10(max(len(bank_paths), 2))) + tmplt_banks = FileList([]) + + for i, path in enumerate(bank_paths): + bank_tag = ('bank%0{}d'.format(n_dp)) % i + abs_path = os.path.abspath(path) + pfn_local = urljoin('file:', pathname2url(abs_path)) + + # Create a File object that Pegasus recognizes as an existing input + curr_file = File( + workflow.ifos, + 'TMPLTBANK', + workflow.analysis_time, + file_url=pfn_local, + tags=tags + [bank_tag] + ) + curr_file.add_pfn(pfn_local, site='local') + tmplt_banks.append(curr_file) + + return tmplt_banks + def setup_splittable_dax_generated(workflow, input_tables, out_dir, tags): ''' Function for setting up the splitting jobs as part of the workflow. @@ -178,4 +224,3 @@ def setup_splittable_dax_generated(workflow, input_tables, out_dir, tags): workflow.add_node(node) out_file_groups += node.output_files return out_file_groups -