Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions pycbc/workflow/splittable.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@

import os
import logging
import glob
import math

from pycbc.workflow.core import FileList, make_analysis_dir
from pycbc.workflow.core import FileList, make_analysis_dir, File
from pycbc.workflow.jobsetup import (PycbcSplitBankExecutable,
PycbcSplitBankXmlExecutable, PycbcSplitInspinjExecutable,
PycbcHDFSplitInjExecutable)

from urllib.parse import urljoin
from urllib.request import pathname2url

logger = logging.getLogger('pycbc.workflow.splittable')

def select_splitfilejob_instance(curr_exe):
Expand Down Expand Up @@ -109,18 +114,59 @@ def setup_splittable_workflow(workflow, input_tables, out_dir=None, tags=None):
logger.info("Adding split output file jobs to workflow.")
split_table_outs = setup_splittable_dax_generated(workflow,
input_tables, out_dir, tags)
elif splitMethod == "MANUAL_DIRECTORY":
logger.info("Registering pre-existing split files from directory.")
split_table_outs = setup_splittable_manual_directory(workflow, tags)
elif splitMethod == "NOOP":
# Probably better not to call the module at all, but this option will
# return the input file list.
split_table_outs = input_tables
else:
errMsg = "Splittable method not recognized. Must be one of "
errMsg += "IN_WORKFLOW or NOOP."
errMsg += "IN_WORKFLOW, MANUAL_DIRECTORY or NOOP."
raise ValueError(errMsg)

logger.info("Leaving split output files module.")
return split_table_outs

def setup_splittable_manual_directory(workflow, tags=None):
"""
New function to glob a directory and register existing files as
workflow products.
"""
if tags is None:
tags = []
cp = workflow.cp

# Get directory from config
bank_dir = cp.get_opt_tags("workflow-splittable", "tmpltbank-directory", tags)

# Glob all HDF files
bank_paths = sorted(glob.glob(os.path.join(bank_dir, '*.hdf')))
if not bank_paths:
raise ValueError(f"No .hdf files found in {bank_dir}")

n_dp = math.ceil(math.log10(max(len(bank_paths), 2)))
tmplt_banks = FileList([])

for i, path in enumerate(bank_paths):
bank_tag = ('bank%0{}d'.format(n_dp)) % i
abs_path = os.path.abspath(path)
pfn_local = urljoin('file:', pathname2url(abs_path))

# Create a File object that Pegasus recognizes as an existing input
curr_file = File(
workflow.ifos,
'TMPLTBANK',
workflow.analysis_time,
file_url=pfn_local,
tags=tags + [bank_tag]
)
curr_file.add_pfn(pfn_local, site='local')
tmplt_banks.append(curr_file)

return tmplt_banks

def setup_splittable_dax_generated(workflow, input_tables, out_dir, tags):
'''
Function for setting up the splitting jobs as part of the workflow.
Expand Down Expand Up @@ -178,4 +224,3 @@ def setup_splittable_dax_generated(workflow, input_tables, out_dir, tags):
workflow.add_node(node)
out_file_groups += node.output_files
return out_file_groups

Loading