diff --git a/setup.py b/setup.py index ff4079b5..1fd86a81 100644 --- a/setup.py +++ b/setup.py @@ -73,7 +73,7 @@ def check_provided(distribution, min_version, max_version=None, optional=False): author_email='cgl-toil@googlegroups.com', url="https://github.com/BD2KGenomics/toil-scripts", install_requires=[ - 'tqdm==3.8.0', # FIXME: Remove once ADAM stops using it (superfluous import) + 'toil-lib==1.0.3', 'pyyaml==3.11'], tests_require=[ 'pytest==2.8.3'], diff --git a/src/toil_scripts/__init__.py b/src/toil_scripts/__init__.py index 4ecdf424..af5c6eed 100644 --- a/src/toil_scripts/__init__.py +++ b/src/toil_scripts/__init__.py @@ -2,7 +2,7 @@ _log = logging.getLogger(__name__) -# TODO: replace this with toil_scripts.lib.urls._download_s3_url +# TODO: replace this with toil_lib.urls._download_s3_url def download_from_s3_url(file_path, url): from urlparse import urlparse diff --git a/src/toil_scripts/adam_gatk_pipeline/align_and_call.py b/src/toil_scripts/adam_gatk_pipeline/align_and_call.py index e3fd9eed..85785537 100644 --- a/src/toil_scripts/adam_gatk_pipeline/align_and_call.py +++ b/src/toil_scripts/adam_gatk_pipeline/align_and_call.py @@ -123,6 +123,8 @@ # these don't seem necessary! but, must be imported here due to a serialization issue from toil.lib.spark import spawn_spark_cluster +from toil_lib.programs import mock_mode + # import job steps from other toil pipelines from toil_scripts.adam_pipeline.adam_preprocessing import * #static_adam_preprocessing_dag from toil_scripts.bwa_alignment.bwa_alignment import * #download_shared_files @@ -130,7 +132,6 @@ from toil_scripts.gatk_processing.gatk_preprocessing import * #download_gatk_files from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file -from toil_scripts.lib.programs import mock_mode def sample_loop(job, uuid_list, inputs): """ diff --git a/src/toil_scripts/adam_kmers/count_kmers.py b/src/toil_scripts/adam_kmers/count_kmers.py index 994945d9..3b961256 100644 --- a/src/toil_scripts/adam_kmers/count_kmers.py +++ b/src/toil_scripts/adam_kmers/count_kmers.py @@ -11,8 +11,8 @@ from toil.lib.spark import spawn_spark_cluster # imports from toil_scripts -from toil_scripts.lib import require -from toil_scripts.tools.spark_tools import call_adam, call_conductor, \ +from toil_lib import require +from toil_lib.tools.spark_tools import call_adam, call_conductor, \ MasterAddress, HDFS_MASTER_PORT, SPARK_MASTER_PORT diff --git a/src/toil_scripts/adam_pipeline/adam_preprocessing.py b/src/toil_scripts/adam_pipeline/adam_preprocessing.py index 7e6bf1e8..933168ea 100644 --- a/src/toil_scripts/adam_pipeline/adam_preprocessing.py +++ b/src/toil_scripts/adam_pipeline/adam_preprocessing.py @@ -39,11 +39,12 @@ from toil.job import Job from toil.lib.spark import spawn_spark_cluster -from toil_scripts.lib import require -from toil_scripts.lib.files import copy_files, move_files -from toil_scripts.lib.programs import docker_call, mock_mode +from toil_lib import require +from toil_lib.files import copy_files, move_files +from toil_lib.programs import docker_call, mock_mode +from toil_lib.tools.spark_tools import call_adam, call_conductor, MasterAddress, HDFS_MASTER_PORT, SPARK_MASTER_PORT + from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file -from toil_scripts.tools.spark_tools import call_adam, call_conductor, MasterAddress, HDFS_MASTER_PORT, SPARK_MASTER_PORT log = logging.getLogger(__name__) diff --git a/src/toil_scripts/bwa_alignment/bwa_alignment.py b/src/toil_scripts/bwa_alignment/bwa_alignment.py index 149310cf..c0bd6e7a 100644 --- a/src/toil_scripts/bwa_alignment/bwa_alignment.py +++ b/src/toil_scripts/bwa_alignment/bwa_alignment.py @@ -8,14 +8,14 @@ import yaml from toil.job import Job +from toil_lib import require, required_length +from toil_lib.files import copy_file_job +from toil_lib.jobs import map_job +from toil_lib.tools.aligners import run_bwakit +from toil_lib.tools.indexing import run_samtools_faidx, run_bwa_index +from toil_lib.urls import download_url_job, s3am_upload_job -from toil_scripts.lib import require, required_length -from toil_scripts.lib.files import copy_file_job -from toil_scripts.lib.jobs import map_job -from toil_scripts.lib.urls import download_url_job, s3am_upload_job from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file -from toil_scripts.tools.aligners import run_bwakit -from toil_scripts.tools.indexing import run_samtools_faidx, run_bwa_index def download_reference_files(job, inputs, samples): diff --git a/src/toil_scripts/exome_variant_pipeline/exome_variant_pipeline.py b/src/toil_scripts/exome_variant_pipeline/exome_variant_pipeline.py index 666cd171..9c485b95 100644 --- a/src/toil_scripts/exome_variant_pipeline/exome_variant_pipeline.py +++ b/src/toil_scripts/exome_variant_pipeline/exome_variant_pipeline.py @@ -12,18 +12,17 @@ from bd2k.util.files import mkdir_p from bd2k.util.processes import which from toil.job import Job - -from toil_scripts.lib import require -from toil_scripts.lib.files import copy_files -from toil_scripts.lib.jobs import map_job -from toil_scripts.lib.urls import download_url_job, s3am_upload -from toil_scripts.tools.mutation_callers import run_muse -from toil_scripts.tools.mutation_callers import run_mutect -from toil_scripts.tools.mutation_callers import run_pindel -from toil_scripts.tools.preprocessing import run_gatk_preprocessing -from toil_scripts.tools.preprocessing import run_picard_create_sequence_dictionary -from toil_scripts.tools.preprocessing import run_samtools_faidx -from toil_scripts.tools.preprocessing import run_samtools_index +from toil_lib import require +from toil_lib.files import copy_files +from toil_lib.jobs import map_job +from toil_lib.tools.mutation_callers import run_muse +from toil_lib.tools.mutation_callers import run_mutect +from toil_lib.tools.mutation_callers import run_pindel +from toil_lib.tools.preprocessing import run_gatk_preprocessing +from toil_lib.tools.preprocessing import run_picard_create_sequence_dictionary +from toil_lib.tools.preprocessing import run_samtools_faidx +from toil_lib.tools.preprocessing import run_samtools_index +from toil_lib.urls import download_url_job, s3am_upload # Start of Job Functions diff --git a/src/toil_scripts/gatk_germline/germline.py b/src/toil_scripts/gatk_germline/germline.py index b9b8712c..99b21c42 100644 --- a/src/toil_scripts/gatk_germline/germline.py +++ b/src/toil_scripts/gatk_germline/germline.py @@ -42,10 +42,11 @@ import yaml from toil.job import Job +from toil_lib import require +from toil_lib.programs import docker_call +from toil_lib.urls import s3am_upload + from toil_scripts import download_from_s3_url -from toil_scripts.lib import require -from toil_scripts.lib.programs import docker_call -from toil_scripts.lib.urls import s3am_upload from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file diff --git a/src/toil_scripts/gatk_processing/gatk_preprocessing.py b/src/toil_scripts/gatk_processing/gatk_preprocessing.py index 3df4701b..afe83908 100755 --- a/src/toil_scripts/gatk_processing/gatk_preprocessing.py +++ b/src/toil_scripts/gatk_processing/gatk_preprocessing.py @@ -33,10 +33,11 @@ import yaml from toil.job import Job +from toil_lib import require +from toil_lib.programs import docker_call +from toil_lib.urls import s3am_upload + from toil_scripts import download_from_s3_url -from toil_scripts.lib import require -from toil_scripts.lib.programs import docker_call -from toil_scripts.lib.urls import s3am_upload from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file _log = logging.getLogger(__name__) diff --git a/src/toil_scripts/lib/__init__.py b/src/toil_scripts/lib/__init__.py deleted file mode 100644 index c95e534d..00000000 --- a/src/toil_scripts/lib/__init__.py +++ /dev/null @@ -1,63 +0,0 @@ -import argparse -import os -import tempfile - -def flatten(x): - """ - Flattens a nested array into a single list - - :param list x: The nested list/tuple to be flattened. - """ - result = [] - for el in x: - if hasattr(el, "__iter__") and not isinstance(el, basestring): - result.extend(flatten(el)) - else: - result.append(el) - return result - - -def partitions(l, partition_size): - """ - >>> list(partitions([], 10)) - [] - >>> list(partitions([1,2,3,4,5], 1)) - [[1], [2], [3], [4], [5]] - >>> list(partitions([1,2,3,4,5], 2)) - [[1, 2], [3, 4], [5]] - >>> list(partitions([1,2,3,4,5], 5)) - [[1, 2, 3, 4, 5]] - - :param list l: List to be partitioned - :param int partition_size: Size of partitions - """ - for i in xrange(0, len(l), partition_size): - yield l[i:i + partition_size] - - -class UserError(Exception): - pass - - -def require(expression, message): - if not expression: - raise UserError('\n\n' + message + '\n\n') - - -def required_length(nmin, nmax): - """ - For use with argparse's action argument. Allows setting a range for nargs. - Example: nargs='+', action=required_length(2, 3) - - :param int nmin: Minimum number of arguments - :param int nmax: Maximum number of arguments - :return: RequiredLength object - """ - class RequiredLength(argparse.Action): - def __call__(self, parser, args, values, option_string=None): - if not nmin <= len(values) <= nmax: - msg = 'argument "{f}" requires between {nmin} and {nmax} arguments'.format( - f=self.dest, nmin=nmin, nmax=nmax) - raise argparse.ArgumentTypeError(msg) - setattr(args, self.dest, values) - return RequiredLength diff --git a/src/toil_scripts/lib/files.py b/src/toil_scripts/lib/files.py deleted file mode 100644 index 9dbff314..00000000 --- a/src/toil_scripts/lib/files.py +++ /dev/null @@ -1,107 +0,0 @@ -from contextlib import closing -import os -import shutil -import tarfile - - -def tarball_files(tar_name, file_paths, output_dir='.', prefix=''): - """ - Creates a tarball from a group of files - - :param str tar_name: Name of tarball - :param list[str] file_paths: Absolute file paths to include in the tarball - :param str output_dir: Output destination for tarball - :param str prefix: Optional prefix for files in tarball - """ - with tarfile.open(os.path.join(output_dir, tar_name), 'w:gz') as f_out: - for file_path in file_paths: - if not file_path.startswith('/'): - raise ValueError('Path provided is relative not absolute.') - arcname = prefix + os.path.basename(file_path) - f_out.add(file_path, arcname=arcname) - - -def __forall_files(file_paths, output_dir, op): - """ - Applies a function to a set of files and an output directory. - - :param str output_dir: Output directory - :param list[str] file_paths: Absolute file paths to move - """ - for file_path in file_paths: - if not file_path.startswith('/'): - raise ValueError('Path provided (%s) is relative not absolute.' % file_path) - dest = os.path.join(output_dir, os.path.basename(file_path)) - op(file_path, dest) - - -def copy_file_job(job, name, file_id, output_dir): - """ - Job version of move_files for one file - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str name: Name of output file (including extension) - :param str file_id: FileStoreID of file - :param str output_dir: Location to place output file - """ - work_dir = job.fileStore.getLocalTempDir() - fpath = job.fileStore.readGlobalFile(file_id, os.path.join(work_dir, name)) - copy_files([fpath], output_dir) - - -def move_files(file_paths, output_dir): - """ - Moves files from the working directory to the output directory. - - Important note: this function can couple dangerously with caching. - Specifically, if this function is called on a file in the cache, the cache - will contain a broken reference. This may lead to a non-existent file path - being passed to later jobs. Don't call this function on files that are in - the cache, unless you know for sure that the input file will not be used by - any later jobs. - - :param str output_dir: Output directory - :param list[str] file_paths: Absolute file paths to move - """ - __forall_files(file_paths, output_dir, shutil.move) - - -def copy_files(file_paths, output_dir): - """ - Moves files from the working directory to the output directory. - - :param str output_dir: Output directory - :param list[str] file_paths: Absolute file paths to move - """ - __forall_files(file_paths, output_dir, shutil.copy) - - -def consolidate_tarballs_job(job, fname_to_id): - """ - Combine the contents of separate tarballs into one. - Subdirs within the tarball will be named the keys in **fname_to_id - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param dict[str,str] fname_to_id: Dictionary of the form: file-name-prefix=FileStoreID - :return: The file store ID of the generated tarball - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - # Retrieve output file paths to consolidate - tar_paths = [] - for fname, file_store_id in fname_to_id.iteritems(): - p = job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, fname + '.tar.gz')) - tar_paths.append((p, fname)) - # I/O - # output_name is arbitrary as this job function returns a FileStoreId - output_name = 'foo.tar.gz' - out_tar = os.path.join(work_dir, output_name) - # Consolidate separate tarballs into one - with tarfile.open(os.path.join(work_dir, out_tar), 'w:gz') as f_out: - for tar, fname in tar_paths: - with tarfile.open(tar, 'r') as f_in: - for tarinfo in f_in: - with closing(f_in.extractfile(tarinfo)) as f_in_file: - tarinfo.name = os.path.join(output_name, fname, os.path.basename(tarinfo.name)) - f_out.addfile(tarinfo, fileobj=f_in_file) - return job.fileStore.writeGlobalFile(out_tar) diff --git a/src/toil_scripts/lib/jobs.py b/src/toil_scripts/lib/jobs.py deleted file mode 100644 index f9637bc9..00000000 --- a/src/toil_scripts/lib/jobs.py +++ /dev/null @@ -1,23 +0,0 @@ -from toil_scripts.lib import partitions - - -def map_job(job, func, inputs, *args): - """ - Spawns a tree of jobs to avoid overloading the number of jobs spawned by a single parent. - This function is appropriate to use when batching samples greater than 1,000. - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param function func: Function to spawn dynamically, passes one sample as first argument - :param list inputs: Array of samples to be batched - :param list args: any arguments to be passed to the function - """ - # num_partitions isn't exposed as an argument in order to be transparent to the user. - # The value for num_partitions is a tested value - num_partitions = 100 - partition_size = len(inputs) / num_partitions - if partition_size > 1: - for partition in partitions(inputs, partition_size): - job.addChildJobFn(map_job, func, partition, *args) - else: - for sample in inputs: - job.addChildJobFn(func, sample, *args) diff --git a/src/toil_scripts/lib/programs.py b/src/toil_scripts/lib/programs.py deleted file mode 100644 index bb03a416..00000000 --- a/src/toil_scripts/lib/programs.py +++ /dev/null @@ -1,126 +0,0 @@ -import os -import subprocess -import logging -from bd2k.util.exceptions import panic - -_log = logging.getLogger(__name__) - - -def mock_mode(): - """ - Checks whether the ADAM_GATK_MOCK_MODE environment variable is set. - In mock mode, all docker calls other than those to spin up and submit jobs to the spark cluster - are stubbed out and dummy files are used as inputs and outputs. - """ - return True if int(os.environ.get('TOIL_SCRIPTS_MOCK_MODE', '0')) else False - - -def docker_call(tool, - parameters=None, - work_dir='.', - rm=True, - env=None, - outfile=None, - inputs=None, - outputs=None, - docker_parameters=None, - check_output=False, - mock=None): - """ - Calls Docker, passing along parameters and tool. - - :param str tool: Name of the Docker image to be used (e.g. quay.io/ucsc_cgl/samtools) - :param list[str] parameters: Command line arguments to be passed to the tool - :param str work_dir: Directory to mount into the container via `-v`. Destination convention is /data - :param bool rm: Set to True to pass `--rm` flag. - :param dict[str,str] env: Environment variables to be added (e.g. dict(JAVA_OPTS='-Xmx15G')) - :param bool sudo: If True, prepends `sudo` to the docker call - :param file outfile: Pipe output of Docker call to file handle - :param list[str] inputs: A list of the input files. - :param dict[str,str] outputs: A dictionary containing the outputs files as keys with either None - or a url. The value is only used if mock=True - :param dict[str,str] docker_parameters: Parameters to pass to docker - :param bool check_output: When True, this function returns docker's output - :param bool mock: Whether to run in mock mode. If this variable is unset, its value will be determined by - the environment variable. - """ - from toil_scripts.lib.urls import download_url - - if mock is None: - mock = mock_mode() - if parameters is None: - parameters = [] - if inputs is None: - inputs = [] - if outputs is None: - outputs = {} - - for filename in inputs: - assert(os.path.isfile(os.path.join(work_dir, filename))) - - if mock: - for filename, url in outputs.items(): - file_path = os.path.join(work_dir, filename) - if url is None: - # create mock file - if not os.path.exists(file_path): - f = open(file_path, 'w') - f.write("contents") # FIXME - f.close() - - else: - file_path = os.path.join(work_dir, filename) - if not os.path.exists(file_path): - outfile = download_url(url, work_dir=work_dir, name=filename) - assert os.path.exists(file_path) - return - - base_docker_call = ['docker', 'run', - '--log-driver=none', - '-v', '{}:/data'.format(os.path.abspath(work_dir))] - if rm: - base_docker_call.append('--rm') - if env: - for e, v in env.iteritems(): - base_docker_call.extend(['-e', '{}={}'.format(e, v)]) - if docker_parameters: - base_docker_call += docker_parameters - - _log.debug("Calling docker with %s." % " ".join(base_docker_call + [tool] + parameters)) - - docker_call = base_docker_call + [tool] + parameters - - try: - if outfile: - subprocess.check_call(docker_call, stdout=outfile) - else: - if check_output: - return subprocess.check_output(docker_call) - else: - subprocess.check_call(docker_call) - # Fix root ownership of output files - except: - # Panic avoids hiding the exception raised in the try block - with panic(): - _fix_permissions(base_docker_call, tool, work_dir) - else: - _fix_permissions(base_docker_call, tool, work_dir) - - for filename in outputs.keys(): - if not os.path.isabs(filename): - filename = os.path.join(work_dir, filename) - assert(os.path.isfile(filename)) - - -def _fix_permissions(base_docker_call, tool, work_dir): - """ - Fix permission of a mounted Docker directory by reusing the tool - - :param list base_docker_call: Docker run parameters - :param str tool: Name of tool - :param str work_dir: Path of work directory to recursively chown - """ - base_docker_call.append('--entrypoint=chown') - stat = os.stat(work_dir) - command = base_docker_call + [tool] + ['-R', '{}:{}'.format(stat.st_uid, stat.st_gid), '/data'] - subprocess.check_call(command) diff --git a/src/toil_scripts/lib/test/__init__.py b/src/toil_scripts/lib/test/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/toil_scripts/lib/test/test_files.py b/src/toil_scripts/lib/test/test_files.py deleted file mode 100644 index 3167a3e9..00000000 --- a/src/toil_scripts/lib/test/test_files.py +++ /dev/null @@ -1,48 +0,0 @@ -import os -import tarfile -from toil.job import Job - - -def test_tarball_files(tmpdir): - from toil_scripts.lib.files import tarball_files - work_dir = str(tmpdir) - fpath = os.path.join(work_dir, 'output_file') - with open(fpath, 'wb') as fout: - fout.write(os.urandom(1024)) - tarball_files(output_dir=work_dir, tar_name='test.tar', file_paths=[fpath]) - assert os.path.exists(os.path.join(work_dir, 'test.tar')) - - -def test_copy_files(tmpdir): - from toil_scripts.lib.files import copy_files - work_dir = str(tmpdir) - os.mkdir(os.path.join(work_dir, 'test')) - fpath = os.path.join(work_dir, 'output_file') - with open(fpath, 'wb') as fout: - fout.write(os.urandom(1024)) - copy_files([fpath], os.path.join(work_dir, 'test')) - assert os.path.exists(os.path.join(work_dir, 'test', 'output_file')) - - -def test_consolidate_tarballs_job(tmpdir): - options = Job.Runner.getDefaultOptions(os.path.join(str(tmpdir), 'test_store')) - Job.Runner.startToil(Job.wrapJobFn(_consolidate_tarball_job_setup), options) - - -def _consolidate_tarball_job_setup(job): - from toil_scripts.lib.files import consolidate_tarballs_job - # Create test file - work_dir = job.fileStore.getLocalTempDir() - fpath = os.path.join(work_dir, 'output_file') - with open(fpath, 'wb') as fout: - fout.write(os.urandom(1024)) - # Create test tarballs - fpath1 = os.path.join(work_dir, 'test1.tar.gz') - fpath2 = os.path.join(work_dir, 'test2.tar.gz') - with tarfile.open(fpath1, 'w:gz') as f_out: - f_out.add(fpath) - with tarfile.open(fpath2, 'w:gz') as f_out: - f_out.add(fpath) - id1 = job.fileStore.writeGlobalFile(fpath1) - id2 = job.fileStore.writeGlobalFile(fpath2) - job.addChildJobFn(consolidate_tarballs_job, dict(test1=id1, test2=id2)) diff --git a/src/toil_scripts/lib/test/test_jobs.py b/src/toil_scripts/lib/test/test_jobs.py deleted file mode 100644 index 94cf4132..00000000 --- a/src/toil_scripts/lib/test/test_jobs.py +++ /dev/null @@ -1,21 +0,0 @@ -import tempfile - -import os -from toil.job import Job - - -def test_map_job(): - from toil_scripts.lib.jobs import map_job - work_dir = tempfile.mkdtemp() - options = Job.Runner.getDefaultOptions(os.path.join(work_dir, 'test_store')) - options.workDir = work_dir - samples = [x for x in xrange(200)] - j = Job.wrapJobFn(map_job, _test_batch, samples, 'a', 'b', 'c', disk='1K') - Job.Runner.startToil(j, options) - - -def _test_batch(job, sample, a, b, c): - assert str(sample).isdigit() - assert a == 'a' - assert b == 'b' - assert c == 'c' diff --git a/src/toil_scripts/lib/test/test_lib.py b/src/toil_scripts/lib/test/test_lib.py deleted file mode 100644 index 9d1acf45..00000000 --- a/src/toil_scripts/lib/test/test_lib.py +++ /dev/null @@ -1,15 +0,0 @@ -def test_flatten(): - from toil_scripts.lib import flatten - x = [(1, 2), (3, 4, (5, 6))] - y = (1, (2, (3, 4, (5)))) - assert flatten(x) == [1, 2, 3, 4, 5, 6] - assert flatten(y) == [1, 2, 3, 4, 5] - - -def test_partitions(): - from toil_scripts.lib import partitions - x = [z for z in xrange(100)] - assert len(list(partitions(x, 10))) == 10 - assert len(list(partitions(x, 20))) == 5 - assert len(list(partitions(x, 100))) == 1 - assert list(partitions([], 10)) == [] diff --git a/src/toil_scripts/lib/test/test_programs.py b/src/toil_scripts/lib/test/test_programs.py deleted file mode 100644 index 3c75ac4f..00000000 --- a/src/toil_scripts/lib/test/test_programs.py +++ /dev/null @@ -1,14 +0,0 @@ -import os - - -def test_docker_call(tmpdir): - from toil_scripts.lib.programs import docker_call - work_dir = str(tmpdir) - parameter = ['--help'] - tool = 'quay.io/ucsc_cgl/samtools' - docker_call(work_dir=work_dir, parameters=parameter, tool=tool) - # Test outfile - fpath = os.path.join(work_dir, 'test') - with open(fpath, 'w') as f: - docker_call(tool='ubuntu', env=dict(foo='bar'), parameters=['printenv', 'foo'], outfile=f) - assert open(fpath).read() == 'bar\n' diff --git a/src/toil_scripts/lib/test/test_urls.py b/src/toil_scripts/lib/test/test_urls.py deleted file mode 100644 index 2bab4cf0..00000000 --- a/src/toil_scripts/lib/test/test_urls.py +++ /dev/null @@ -1,54 +0,0 @@ -import os -import subprocess -import filecmp -from contextlib import closing -from uuid import uuid4 - -from toil.job import Job - - -def test_download_url_job(tmpdir): - from toil_scripts.lib.urls import download_url_job - options = Job.Runner.getDefaultOptions(os.path.join(str(tmpdir), 'test_store')) - j = Job.wrapJobFn(download_url_job, 'www.google.com') - Job.Runner.startToil(j, options) - - -def test_download_url(tmpdir): - from toil_scripts.lib.urls import download_url - work_dir = str(tmpdir) - download_url(work_dir=work_dir, url='www.google.com', name='testy') - assert os.path.exists(os.path.join(work_dir, 'testy')) - - -def test_upload_and_download_with_encryption(tmpdir): - from toil_scripts.lib.urls import s3am_upload - from toil_scripts.lib.urls import download_url - from boto.s3.connection import S3Connection, Bucket, Key - work_dir = str(tmpdir) - # Create temporary encryption key - key_path = os.path.join(work_dir, 'foo.key') - subprocess.check_call(['dd', 'if=/dev/urandom', 'bs=1', 'count=32', - 'of={}'.format(key_path)]) - # Create test file - upload_fpath = os.path.join(work_dir, 'upload_file') - with open(upload_fpath, 'wb') as fout: - fout.write(os.urandom(1024)) - # Upload file - random_key = os.path.join('test/', str(uuid4()), 'upload_file') - s3_url = os.path.join('s3://cgl-driver-projects/', random_key) - try: - s3_dir = os.path.split(s3_url)[0] - s3am_upload(fpath=upload_fpath, s3_dir=s3_dir, s3_key_path=key_path) - # Download the file - download_url(url=s3_url, name='download_file', work_dir=work_dir, s3_key_path=key_path) - download_fpath = os.path.join(work_dir, 'download_file') - assert os.path.exists(download_fpath) - assert filecmp.cmp(upload_fpath, download_fpath) - finally: - # Delete the Key. Key deletion never fails so we don't need to catch any exceptions - with closing(S3Connection()) as conn: - b = Bucket(conn, 'cgl-driver-projects') - k = Key(b) - k.key = random_key - k.delete() diff --git a/src/toil_scripts/lib/urls.py b/src/toil_scripts/lib/urls.py deleted file mode 100644 index 63040f72..00000000 --- a/src/toil_scripts/lib/urls.py +++ /dev/null @@ -1,111 +0,0 @@ -import glob -import os -import shutil -import subprocess -from urlparse import urlparse - -from toil_scripts.lib import require -from toil_scripts.lib.programs import docker_call - - -def download_url(url, work_dir='.', name=None, s3_key_path=None, cghub_key_path=None): - """ - Downloads URL, can pass in file://, http://, s3://, or ftp://, gnos://cghub/analysisID, or gnos:///analysisID - - :param str url: URL to download from - :param str work_dir: Directory to download file to - :param str name: Name of output file, if None, basename of URL is used - :param str s3_key_path: Path to 32-byte encryption key if url points to S3 file that uses SSE-C - :param str cghub_key_path: Path to cghub key used to download from CGHub. - :return: Path to the downloaded file - :rtype: str - """ - file_path = os.path.join(work_dir, name) if name else os.path.join(work_dir, os.path.basename(url)) - if cghub_key_path: - _download_with_genetorrent(url, file_path, cghub_key_path) - elif urlparse(url).scheme == 's3': - _s3am_with_retry(num_cores=1, file_path=file_path, s3_url=url, mode='download', s3_key_path=s3_key_path) - elif urlparse(url).scheme == 'file': - shutil.copy(urlparse(url).path, file_path) - else: - subprocess.check_call(['curl', '-fs', '--retry', '5', '--create-dir', url, '-o', file_path]) - assert os.path.exists(file_path) - return file_path - - -def download_url_job(job, url, name=None, s3_key_path=None, cghub_key_path=None): - """Job version of `download_url`""" - work_dir = job.fileStore.getLocalTempDir() - fpath = download_url(url, work_dir=work_dir, name=name, - s3_key_path=s3_key_path, cghub_key_path=cghub_key_path) - return job.fileStore.writeGlobalFile(fpath) - - -def _download_with_genetorrent(url, file_path, cghub_key_path): - parsed_url = urlparse(url) - analysis_id = parsed_url.path[1:] - assert parsed_url.scheme == 'gnos', 'Improper format. gnos://cghub/ID. User supplied: {}'.format(parsed_url) - work_dir = os.path.dirname(file_path) - folder_path = os.path.join(work_dir, os.path.basename(analysis_id)) - parameters = ['-vv', '-c', cghub_key_path, '-d', analysis_id] - docker_call(tool='quay.io/ucsc_cgl/genetorrent:3.8.7--9911761265b6f08bc3ef09f53af05f56848d805b', - work_dir=work_dir, parameters=parameters) - sample = glob.glob(os.path.join(folder_path, '*tar*')) - assert len(sample) == 1, 'More than one sample tar in CGHub download: {}'.format(analysis_id) - - -def s3am_upload(fpath, s3_dir, num_cores=1, s3_key_path=None): - """ - Uploads a file to s3 via S3AM - For SSE-C encryption: provide a path to a 32-byte file - - :param str fpath: Path to file to upload - :param str s3_dir: Ouptut S3 path. Format: s3://bucket/[directory] - :param int num_cores: Number of cores to use for up/download with S3AM - :param str s3_key_path: (OPTIONAL) Path to 32-byte key to be used for SSE-C encryption - """ - require(s3_dir.startswith('s3://'), 'Format of s3_dir (s3://) is incorrect: {}'.format(s3_dir)) - s3_dir = os.path.join(s3_dir, os.path.basename(fpath)) - _s3am_with_retry(num_cores, file_path=fpath, s3_url=s3_dir, mode='upload', s3_key_path=s3_key_path) - - -def s3am_upload_job(job, file_id, file_name, s3_dir, s3_key_path=None): - """Job version of s3am_upload""" - work_dir = job.fileStore.getLocalTempDir() - fpath = job.fileStore.readGlobalFile(file_id, os.path.join(work_dir, file_name)) - s3am_upload(fpath=fpath, s3_dir=s3_dir, num_cores=job.cores, s3_key_path=s3_key_path) - - -def _s3am_with_retry(num_cores, file_path, s3_url, mode='upload', s3_key_path=None): - """ - Run s3am with 3 retries - - :param int num_cores: Number of cores to pass to upload/download slots - :param str file_path: Full path to the file - :param str s3_url: S3 URL - :param str mode: Mode to run s3am in. Either "upload" or "download" - :param str s3_key_path: Path to the SSE-C key if using encryption - """ - command = ['s3am'] - if mode == 'upload': - command.extend(['upload', '--force', '--upload-slots={}'.format(num_cores), - '--exists=overwrite', 'file://' + file_path, s3_url]) - elif mode == 'download': - command.extend(['download', '--file-exists=overwrite', - '--download-exists=discard', s3_url, 'file://' + file_path]) - else: - raise ValueError('Improper mode specified. mode must be equal to "upload" or "download".') - if s3_key_path: - for arg in [s3_key_path, '--sse-key-file', '--sse-key-is-master']: - command.insert(2, arg) - for arg in ['--part-size=50M', '--download-slots={}'.format(num_cores)]: - command.insert(2, arg) - # Run s3am with retries - retry_count = 3 - for i in xrange(retry_count): - ret_code = subprocess.call(command) - if ret_code == 0: - return - else: - print 'S3AM failed with status code: {}'.format(ret_code) - raise RuntimeError('S3AM failed to {} after {} retries.'.format(mode, retry_count)) diff --git a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py index fa53d2a6..60cf012b 100644 --- a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py +++ b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py @@ -16,15 +16,14 @@ from bd2k.util.files import mkdir_p from bd2k.util.processes import which from toil.job import Job - -from toil_scripts.lib import require, UserError -from toil_scripts.lib.files import copy_files -from toil_scripts.lib.jobs import map_job -from toil_scripts.lib.urls import download_url_job, s3am_upload -from toil_scripts.tools.QC import run_fastqc -from toil_scripts.tools.aligners import run_star -from toil_scripts.tools.preprocessing import run_cutadapt -from toil_scripts.tools.quantifiers import run_kallisto, run_rsem, run_rsem_postprocess +from toil_lib import require, UserError +from toil_lib.files import copy_files +from toil_lib.jobs import map_job +from toil_lib.tools.QC import run_fastqc +from toil_lib.tools.aligners import run_star +from toil_lib.tools.preprocessing import run_cutadapt +from toil_lib.tools.quantifiers import run_kallisto, run_rsem, run_rsem_postprocess +from toil_lib.urls import download_url_job, s3am_upload schemes = ('http', 'file', 's3', 'ftp', 'gnos') diff --git a/src/toil_scripts/rnaseq_unc/rnaseq_unc_pipeline.py b/src/toil_scripts/rnaseq_unc/rnaseq_unc_pipeline.py index 5aa7a04a..54f2d4e8 100644 --- a/src/toil_scripts/rnaseq_unc/rnaseq_unc_pipeline.py +++ b/src/toil_scripts/rnaseq_unc/rnaseq_unc_pipeline.py @@ -55,17 +55,18 @@ """ import argparse import base64 -from collections import OrderedDict -from contextlib import closing +import errno import glob import hashlib -import os -import subprocess -import errno import multiprocessing +import os import shutil +import subprocess import tarfile +from collections import OrderedDict +from contextlib import closing from urlparse import urlparse + from toil.job import Job diff --git a/src/toil_scripts/spladder_pipeline/spladder_pipeline.py b/src/toil_scripts/spladder_pipeline/spladder_pipeline.py index 2bfa88d7..14e520c1 100644 --- a/src/toil_scripts/spladder_pipeline/spladder_pipeline.py +++ b/src/toil_scripts/spladder_pipeline/spladder_pipeline.py @@ -29,16 +29,15 @@ from contextlib import closing from glob import glob -from toil.job import Job - -from toil_scripts.lib.files import tarball_files -from toil_scripts.lib.jobs import map_job -from toil_scripts.lib.programs import docker_call -from toil_scripts.lib.urls import s3am_upload_job -from toil_scripts.lib.urls import download_url_job -from toil_scripts.lib.urls import download_url -from bd2k.util.processes import which from bd2k.util.files import mkdir_p +from bd2k.util.processes import which +from toil.job import Job +from toil_lib.files import tarball_files +from toil_lib.jobs import map_job +from toil_lib.programs import docker_call +from toil_lib.urls import download_url +from toil_lib.urls import download_url_job +from toil_lib.urls import s3am_upload_job def parse_input_samples(job, inputs): diff --git a/src/toil_scripts/tools/QC.py b/src/toil_scripts/tools/QC.py deleted file mode 100644 index 447f7b06..00000000 --- a/src/toil_scripts/tools/QC.py +++ /dev/null @@ -1,29 +0,0 @@ -import os - -from toil_scripts.lib.files import tarball_files -from toil_scripts.lib.programs import docker_call - - -def run_fastqc(job, r1_id, r2_id): - """ - Run Fastqc on the input reads - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str r1_id: FileStoreID of fastq read 1 - :param str r2_id: FileStoreID of fastq read 2 - :return: FileStoreID of fastQC output (tarball) - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - job.fileStore.readGlobalFile(r1_id, os.path.join(work_dir, 'R1.fastq')) - parameters = ['/data/R1.fastq'] - output_names = ['R1_fastqc.html', 'R1_fastqc.zip'] - if r2_id: - job.fileStore.readGlobalFile(r2_id, os.path.join(work_dir, 'R2.fastq')) - parameters.extend(['-t', '2', '/data/R2.fastq']) - output_names.extend(['R2_fastqc.html', 'R2_fastqc.zip']) - docker_call(tool='quay.io/ucsc_cgl/fastqc:0.11.5--be13567d00cd4c586edf8ae47d991815c8c72a49', - work_dir=work_dir, parameters=parameters) - output_files = [os.path.join(work_dir, x) for x in output_names] - tarball_files(tar_name='fastqc.tar.gz', file_paths=output_files, output_dir=work_dir) - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'fastqc.tar.gz')) diff --git a/src/toil_scripts/tools/__init__.py b/src/toil_scripts/tools/__init__.py deleted file mode 100644 index a3a192aa..00000000 --- a/src/toil_scripts/tools/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -import os -import subprocess - - -def get_mean_insert_size(work_dir, bam_name): - """Function taken from MC3 Pipeline""" - cmd = "docker run --log-driver=none --rm -v {}:/data quay.io/ucsc_cgl/samtools " \ - "view -f66 {}".format(work_dir, os.path.join(work_dir, bam_name)) - process = subprocess.Popen(args=cmd, shell=True, stdout=subprocess.PIPE) - b_sum = 0.0 - b_count = 0.0 - while True: - line = process.stdout.readline() - if not line: - break - tmp = line.split("\t") - if abs(long(tmp[8])) < 10000: - b_sum += abs(long(tmp[8])) - b_count += 1 - process.wait() - try: - mean = b_sum / b_count - except ZeroDivisionError: - mean = 150 - print "Using insert size: %d" % mean - return int(mean) diff --git a/src/toil_scripts/tools/aligners.py b/src/toil_scripts/tools/aligners.py deleted file mode 100644 index db3cc369..00000000 --- a/src/toil_scripts/tools/aligners.py +++ /dev/null @@ -1,150 +0,0 @@ -import os - -import subprocess - -from toil_scripts.lib.programs import docker_call -from toil_scripts.lib.urls import download_url - - -def run_star(job, r1_id, r2_id, star_index_url, wiggle=False): - """ - Performs alignment of fastqs to bam via STAR - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str r1_id: FileStoreID of fastq (pair 1) - :param str r2_id: FileStoreID of fastq (pair 2 if applicable, else pass None) - :param str star_index_url: STAR index tarball - :param bool wiggle: If True, will output a wiggle file and return it - :return: FileStoreID from RSEM - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - download_url(url=star_index_url, name='starIndex.tar.gz', work_dir=work_dir) - subprocess.check_call(['tar', '-xvf', os.path.join(work_dir, 'starIndex.tar.gz'), '-C', work_dir]) - os.remove(os.path.join(work_dir, 'starIndex.tar.gz')) - # Determine tarball structure - star index contains are either in a subdir or in the tarball itself - star_index = os.path.join('/data', os.listdir(work_dir)[0]) if len(os.listdir(work_dir)) == 1 else '/data' - # Parameter handling for paired / single-end data - parameters = ['--runThreadN', str(job.cores), - '--genomeDir', star_index, - '--outFileNamePrefix', 'rna', - '--outSAMtype', 'BAM', 'SortedByCoordinate', - '--outSAMunmapped', 'Within', - '--quantMode', 'TranscriptomeSAM', - '--outSAMattributes', 'NH', 'HI', 'AS', 'NM', 'MD', - '--outFilterType', 'BySJout', - '--outFilterMultimapNmax', '20', - '--outFilterMismatchNmax', '999', - '--outFilterMismatchNoverReadLmax', '0.04', - '--alignIntronMin', '20', - '--alignIntronMax', '1000000', - '--alignMatesGapMax', '1000000', - '--alignSJoverhangMin', '8', - '--alignSJDBoverhangMin', '1', - '--sjdbScore', '1'] - if wiggle: - parameters.extend(['--outWigType', 'bedGraph', - '--outWigStrand', 'Unstranded', - '--outWigReferencesPrefix', 'chr']) - if r1_id and r2_id: - job.fileStore.readGlobalFile(r1_id, os.path.join(work_dir, 'R1.fastq')) - job.fileStore.readGlobalFile(r2_id, os.path.join(work_dir, 'R2.fastq')) - parameters.extend(['--readFilesIn', '/data/R1.fastq', '/data/R2.fastq']) - else: - job.fileStore.readGlobalFile(r1_id, os.path.join(work_dir, 'R1.fastq')) - parameters.extend(['--readFilesIn', '/data/R1.fastq']) - # Call: STAR Mapping - docker_call(tool='quay.io/ucsc_cgl/star:2.4.2a--bcbd5122b69ff6ac4ef61958e47bde94001cfe80', - work_dir=work_dir, parameters=parameters) - # Write to fileStore - transcriptome_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'rnaAligned.toTranscriptome.out.bam')) - sorted_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'rnaAligned.sortedByCoord.out.bam')) - if wiggle: - wiggle_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'rnaSignal.UniqueMultiple.str1.out.bg')) - return transcriptome_id, sorted_id, wiggle_id - else: - return transcriptome_id, sorted_id - - -def run_bwakit(job, config, sort=True, trim=False): - """ - Runs BWA-Kit to align a fastq file or fastq pair into a BAM file. - - :param JobFunctionWrappingJob job: Passed by Toil automatically - :param Namespace config: A configuration object that holds strings as attributes. - The attributes must be accessible via the dot operator. - The config must have: - config.r1 FileStoreID for 1st fastq file - config.r2 FileStoreID for 2nd fastq file (or None if single-ended) - config.ref FileStoreID for the reference genome - config.fai FileStoreID for the reference index file - config.amb FileStoreID for the reference amb file - config.ann FileStoreID for the reference ann file - config.bwt FileStoreID for the reference bwt file - config.pac FileStoreID for the reference pac file - config.sa FileStoreID for the reference sa file - config.alt FileStoreID for the reference alt (or None) - config.rg_line The read group value to use (or None -- see below) - config.library Read group attribute: library - config.platform Read group attribute: platform - config.program_unit Read group attribute: program unit - config.uuid Read group attribute: sample ID - - If specifying config.rg_line, use the following format: - BAM read group header line (@RG), as defined on page 3 of the SAM spec. - Tabs should be escaped, e.g., @RG\\tID:foo\\tLB:bar... - for the read group "foo" from sequencing library "bar". - Multiple @RG lines can be defined, but should be split by an escaped newline \\n, - e.g., @RG\\tID:foo\\t:LB:bar\\n@RG\\tID:santa\\tLB:cruz. - - :param bool sort: If True, sorts the BAM - :param bool trim: If True, performs adapter trimming - :return: FileStoreID of BAM - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - file_names = ['r1.fq.gz', 'ref.fa.fai', 'ref.fa', 'ref.fa.amb', 'ref.fa.ann', - 'ref.fa.bwt', 'ref.fa.pac', 'ref.fa.sa'] - ids = [config.r1, config.ref, config.fai, config.amb, config.ann, config.bwt, config.pac, config.sa] - # If a fastq pair was provided - if getattr(config, 'r2', None): - file_names.insert(1, 'r2.fq.gz') - ids.insert(1, config.r2) - # If an alt file was provided - if getattr(config, 'alt', None): - file_names.append('ref.fa.alt') - ids.append(config.alt) - for fileStoreID, name in zip(ids, file_names): - job.fileStore.readGlobalFile(fileStoreID, os.path.join(work_dir, name)) - # If a read group line was provided - if getattr(config, 'rg_line', None): - rg = config.rg_line - # Otherwise, generate a read group line to place in the BAM. - else: - rg = "@RG\\tID:{0}".format(config.uuid) # '\' character is escaped so bwakit gets passed '\t' properly - rg_attributes = [config.library, config.platform, config.program_unit, config.uuid] - for tag, info in zip(['LB', 'PL', 'PU', 'SM'], rg_attributes): - rg += '\\t{0}:{1}'.format(tag, info) - # BWA Options - opt_args = [] - if sort: - opt_args.append('-s') - if trim: - opt_args.append('-a') - # Call: bwakit - parameters = (['-t', str(job.cores), - '-R', rg] + - opt_args + - ['-o', '/data/aligned', - '/data/ref.fa', - '/data/r1.fq.gz']) - if getattr(config, 'r2', None): # If a fastq pair was provided - parameters.append('/data/r2.fq.gz') - mock_bam = config.uuid + '.bam' - outputs = {'aligned.aln.bam': mock_bam} - docker_call(tool='quay.io/ucsc_cgl/bwakit:0.7.12--528bb9bf73099a31e74a7f5e6e3f2e0a41da486e', - parameters=parameters, inputs=file_names, outputs=outputs, work_dir=work_dir) - - # Either write file to local output directory or upload to S3 cloud storage - job.fileStore.logToMaster('Aligned sample: {}'.format(config.uuid)) - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'aligned.aln.bam')) diff --git a/src/toil_scripts/tools/indexing.py b/src/toil_scripts/tools/indexing.py deleted file mode 100644 index 97acdbf0..00000000 --- a/src/toil_scripts/tools/indexing.py +++ /dev/null @@ -1,42 +0,0 @@ -import os - -from toil_scripts.lib.programs import docker_call - - -def run_bwa_index(job, ref_id): - """ - Use BWA to create reference index files - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str ref_id: FileStoreID for the reference genome - :return: FileStoreIDs for BWA index files - :rtype: tuple(str, str, str, str, str) - """ - job.fileStore.logToMaster('Created BWA index files') - work_dir = job.fileStore.getLocalTempDir() - job.fileStore.readGlobalFile(ref_id, os.path.join(work_dir, 'ref.fa')) - command = ['index', '/data/ref.fa'] - docker_call(work_dir=work_dir, parameters=command, - tool='quay.io/ucsc_cgl/bwa:0.7.12--256539928ea162949d8a65ca5c79a72ef557ce7c') - ids = {} - for output in ['ref.fa.amb', 'ref.fa.ann', 'ref.fa.bwt', 'ref.fa.pac', 'ref.fa.sa']: - ids[output.split('.')[-1]] = (job.fileStore.writeGlobalFile(os.path.join(work_dir, output))) - return ids['amb'], ids['ann'], ids['bwt'], ids['pac'], ids['sa'] - - -def run_samtools_faidx(job, ref_id): - """ - Use Samtools to create reference index file - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str ref_id: FileStoreID for the reference genome - :return: FileStoreID for reference index - :rtype: str - """ - job.fileStore.logToMaster('Created reference index') - work_dir = job.fileStore.getLocalTempDir() - job.fileStore.readGlobalFile(ref_id, os.path.join(work_dir, 'ref.fasta')) - command = ['faidx', '/data/ref.fasta'] - docker_call(work_dir=work_dir, parameters=command, - tool='quay.io/ucsc_cgl/samtools:0.1.19--dd5ac549b95eb3e5d166a5e310417ef13651994e') - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'ref.fasta.fai')) diff --git a/src/toil_scripts/tools/mutation_callers.py b/src/toil_scripts/tools/mutation_callers.py deleted file mode 100644 index 0fb631f6..00000000 --- a/src/toil_scripts/tools/mutation_callers.py +++ /dev/null @@ -1,129 +0,0 @@ -import os -from glob import glob - -from toil_scripts.tools import get_mean_insert_size -from toil_scripts.lib.files import tarball_files -from toil_scripts.lib.programs import docker_call - - -def run_mutect(job, normal_bam, normal_bai, tumor_bam, tumor_bai, ref, ref_dict, fai, cosmic, dbsnp): - """ - Calls MuTect to perform variant analysis - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str normal_bam: Normal BAM FileStoreID - :param str normal_bai: Normal BAM index FileStoreID - :param str tumor_bam: Tumor BAM FileStoreID - :param str tumor_bai: Tumor BAM Index FileStoreID - :param str ref: Reference genome FileStoreID - :param str ref_dict: Reference dictionary FileStoreID - :param str fai: Reference index FileStoreID - :param str cosmic: Cosmic VCF FileStoreID - :param str dbsnp: DBSNP VCF FileStoreID - :return: MuTect output (tarball) FileStoreID - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - file_ids = [normal_bam, normal_bai, tumor_bam, tumor_bai, ref, fai, ref_dict, cosmic, dbsnp] - file_names = ['normal.bam', 'normal.bai', 'tumor.bam', 'tumor.bai', 'ref.fasta', - 'ref.fasta.fai', 'ref.dict', 'cosmic.vcf', 'dbsnp.vcf'] - for file_store_id, name in zip(file_ids, file_names): - job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) - # Call: MuTect - parameters = ['--analysis_type', 'MuTect', - '--reference_sequence', 'ref.fasta', - '--cosmic', '/data/cosmic.vcf', - '--dbsnp', '/data/dbsnp.vcf', - '--input_file:normal', '/data/normal.bam', - '--input_file:tumor', '/data/tumor.bam', - '--tumor_lod', str(10), # Taken from MC3 pipeline - '--initial_tumor_lod', str(4.0), # Taken from MC3 pipeline - '--out', 'mutect.out', - '--coverage_file', 'mutect.cov', - '--vcf', 'mutect.vcf'] - docker_call(work_dir=work_dir, parameters=parameters, - tool='quay.io/ucsc_cgl/mutect:1.1.7--e8bf09459cf0aecb9f55ee689c2b2d194754cbd3') - # Write output to file store - output_file_names = ['mutect.vcf', 'mutect.cov', 'mutect.out'] - output_file_paths = [os.path.join(work_dir, x) for x in output_file_names] - tarball_files('mutect.tar.gz', file_paths=output_file_paths, output_dir=work_dir) - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'mutect.tar.gz')) - - -def run_muse(job, normal_bam, normal_bai, tumor_bam, tumor_bai, ref, ref_dict, fai, dbsnp): - """ - Calls MuSe to find variants - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str normal_bam: Normal BAM FileStoreID - :param str normal_bai: Normal BAM index FileStoreID - :param str tumor_bam: Tumor BAM FileStoreID - :param str tumor_bai: Tumor BAM Index FileStoreID - :param str tumor_bai: Tumor BAM Index FileStoreID - :param str ref: Reference genome FileStoreID - :param str ref_dict: Reference genome dictionary FileStoreID - :param str fai: Reference index FileStoreID - :param str dbsnp: DBSNP VCF FileStoreID - :return: MuSe output (tarball) FileStoreID - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - file_ids = [normal_bam, normal_bai, tumor_bam, tumor_bai, ref, ref_dict, fai, dbsnp] - file_names = ['normal.bam', 'normal.bai', 'tumor.bam', 'tumor.bai', - 'ref.fasta', 'ref.dict', 'ref.fasta.fai', 'dbsnp.vcf'] - for file_store_id, name in zip(file_ids, file_names): - job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) - # Call: MuSE - parameters = ['--mode', 'wxs', - '--dbsnp', '/data/dbsnp.vcf', - '--fafile', '/data/ref.fasta', - '--tumor-bam', '/data/tumor.bam', - '--tumor-bam-index', '/data/tumor.bai', - '--normal-bam', '/data/normal.bam', - '--normal-bam-index', '/data/normal.bai', - '--outfile', '/data/muse.vcf', - '--cpus', str(job.cores)] - docker_call(tool='quay.io/ucsc_cgl/muse:1.0--6add9b0a1662d44fd13bbc1f32eac49326e48562', - work_dir=work_dir, parameters=parameters) - # Return fileStore ID - tarball_files('muse.tar.gz', file_paths=[os.path.join(work_dir, 'muse.vcf')], output_dir=work_dir) - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'muse.tar.gz')) - - -def run_pindel(job, normal_bam, normal_bai, tumor_bam, tumor_bai, ref, fai): - """ - Calls Pindel to compute indels / deletions - - :param JobFunctionWrappingJob job: Passed automatically by Toil - :param str normal_bam: Normal BAM FileStoreID - :param str normal_bai: Normal BAM index FileStoreID - :param str tumor_bam: Tumor BAM FileStoreID - :param str tumor_bai: Tumor BAM Index FileStoreID - :param str ref: Reference genome FileStoreID - :param str fai: Reference index FileStoreID - :return: Pindel output (tarball) FileStoreID - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - file_ids = [normal_bam, normal_bai, tumor_bam, tumor_bai, ref, fai] - file_names = ['normal.bam', 'normal.bai', 'tumor.bam', 'tumor.bai', 'ref.fasta', 'ref.fasta.fai'] - for file_store_id, name in zip(file_ids, file_names): - job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) - # Create Pindel config - with open(os.path.join(work_dir, 'pindel-config.txt'), 'w') as f: - for bam in ['normal', 'tumor']: - f.write('/data/{} {} {}\n'.format(bam + '.bam', get_mean_insert_size(work_dir, bam + '.bam'), bam)) - # Call: Pindel - parameters = ['-f', '/data/ref.fasta', - '-i', '/data/pindel-config.txt', - '--number_of_threads', str(job.cores), - '--minimum_support_for_event', '3', - '--report_long_insertions', 'true', - '--report_breakpoints', 'true', - '-o', 'pindel'] - docker_call(tool='quay.io/ucsc_cgl/pindel:0.2.5b6--4e8d1b31d4028f464b3409c6558fb9dfcad73f88', - work_dir=work_dir, parameters=parameters) - # Collect output files and write to file store - output_files = glob(os.path.join(work_dir, 'pindel*')) - tarball_files('pindel.tar.gz', file_paths=output_files, output_dir=work_dir) - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'pindel.tar.gz')) diff --git a/src/toil_scripts/tools/preprocessing.py b/src/toil_scripts/tools/preprocessing.py deleted file mode 100644 index 18d192ce..00000000 --- a/src/toil_scripts/tools/preprocessing.py +++ /dev/null @@ -1,302 +0,0 @@ -import os - -from toil_scripts.lib import require -from toil_scripts.lib.programs import docker_call - - -def run_cutadapt(job, r1_id, r2_id, fwd_3pr_adapter, rev_3pr_adapter): - """ - Adapter triming for RNA-seq data - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str r1_id: FileStoreID of fastq read 1 - :param str r2_id: FileStoreID of fastq read 2 (if paired data) - :param str fwd_3pr_adapter: Adapter sequence for the forward 3' adapter - :param str rev_3pr_adapter: Adapter sequence for the reverse 3' adapter (second fastq pair) - :return: R1 and R2 FileStoreIDs - :rtype: tuple - """ - work_dir = job.fileStore.getLocalTempDir() - if r2_id: - require(rev_3pr_adapter, "Paired end data requires a reverse 3' adapter sequence.") - # Retrieve files - parameters = ['-a', fwd_3pr_adapter, - '-m', '35'] - if r1_id and r2_id: - job.fileStore.readGlobalFile(r1_id, os.path.join(work_dir, 'R1.fastq')) - job.fileStore.readGlobalFile(r2_id, os.path.join(work_dir, 'R2.fastq')) - parameters.extend(['-A', rev_3pr_adapter, - '-o', '/data/R1_cutadapt.fastq', - '-p', '/data/R2_cutadapt.fastq', - '/data/R1.fastq', '/data/R2.fastq']) - else: - job.fileStore.readGlobalFile(r1_id, os.path.join(work_dir, 'R1.fastq')) - parameters.extend(['-o', '/data/R1_cutadapt.fastq', '/data/R1.fastq']) - # Call: CutAdapt - docker_call(tool='quay.io/ucsc_cgl/cutadapt:1.9--6bd44edd2b8f8f17e25c5a268fedaab65fa851d2', - work_dir=work_dir, parameters=parameters) - # Write to fileStore - if r1_id and r2_id: - r1_cut_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'R1_cutadapt.fastq')) - r2_cut_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'R2_cutadapt.fastq')) - else: - r1_cut_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'R1_cutadapt.fastq')) - r2_cut_id = None - return r1_cut_id, r2_cut_id - - -def run_samtools_faidx(job, ref_id): - """ - Use Samtools to create reference index file - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str ref_id: FileStoreID for the reference genome - :return: FileStoreID for reference index - :rtype: str - """ - job.fileStore.logToMaster('Created reference index') - work_dir = job.fileStore.getLocalTempDir() - job.fileStore.readGlobalFile(ref_id, os.path.join(work_dir, 'ref.fasta')) - command = ['faidx', 'ref.fasta'] - docker_call(work_dir=work_dir, parameters=command, - tool='quay.io/ucsc_cgl/samtools:0.1.19--dd5ac549b95eb3e5d166a5e310417ef13651994e') - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'ref.fasta.fai')) - - -def run_samtools_index(job, bam_id): - """ - Runs samtools index to create (.bai) files - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str bam_id: FileStoreID of the bam file - :return: BAM index FileStoreID - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - job.fileStore.readGlobalFile(bam_id, os.path.join(work_dir, 'sample.bam')) - # Call: index the bam - parameters = ['index', '/data/sample.bam'] - docker_call(work_dir=work_dir, parameters=parameters, - tool='quay.io/ucsc_cgl/samtools:0.1.19--dd5ac549b95eb3e5d166a5e310417ef13651994e') - # Write to fileStore - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bam.bai')) - - -def run_picard_create_sequence_dictionary(job, ref_id): - """ - Use Picard-tools to create reference dictionary - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str ref_id: FileStoreID for the reference genome - :return: FileStoreID for reference dictionary - :rtype: str - """ - job.fileStore.logToMaster('Created reference dictionary') - work_dir = job.fileStore.getLocalTempDir() - job.fileStore.readGlobalFile(ref_id, os.path.join(work_dir, 'ref.fasta')) - command = ['CreateSequenceDictionary', 'R=ref.fasta', 'O=ref.dict'] - docker_call(work_dir=work_dir, parameters=command, - tool='quay.io/ucsc_cgl/picardtools:1.95--dd5ac549b95eb3e5d166a5e310417ef13651994e') - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'ref.dict')) - - -def run_gatk_preprocessing(job, bam, bai, ref, ref_dict, fai, phase, mills, dbsnp, mem='10G', unsafe=False): - """ - Convenience method for grouping together GATK preprocessing - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str bam: Sample BAM FileStoreID - :param str bai: Bam Index FileStoreID - :param str ref: Reference genome FileStoreID - :param str ref_dict: Reference dictionary FileStoreID - :param str fai: Reference index FileStoreID - :param str phase: Phase VCF FileStoreID - :param str mills: Mills VCF FileStoreID - :param str dbsnp: DBSNP VCF FileStoreID - :param str mem: Memory value to be passed to children. Needed for CI tests - :param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" - :return: BAM and BAI FileStoreIDs from Print Reads - :rtype: tuple(str, str) - """ - rtc = job.wrapJobFn(run_realigner_target_creator, bam, bai, ref, ref_dict, - fai, phase, mills, mem, unsafe, cores=job.cores) - ir = job.wrapJobFn(run_indel_realignment, rtc.rv(), bam, bai, ref, ref_dict, - fai, phase, mills, mem, unsafe, cores=job.cores) - br = job.wrapJobFn(run_base_recalibration, ir.rv(0), ir.rv(1), ref, ref_dict, - fai, dbsnp, mem, unsafe, cores=job.cores) - pr = job.wrapJobFn(run_print_reads, br.rv(), ir.rv(0), ir.rv(1), ref, ref_dict, - fai, mem, unsafe, cores=job.cores) - # Wiring - job.addChild(rtc) - rtc.addChild(ir) - ir.addChild(br) - br.addChild(pr) - return pr.rv(0), pr.rv(1) - - -def run_realigner_target_creator(job, bam, bai, ref, ref_dict, fai, phase, mills, mem, unsafe=False): - """ - Creates intervals file needed for indel realignment - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str bam: Sample BAM FileStoreID - :param str bai: Bam Index FileStoreID - :param str ref: Reference genome FileStoreID - :param str ref_dict: Reference dictionary FileStoreID - :param str fai: Reference index FileStoreID - :param str phase: Phase VCF FileStoreID - :param str mills: Mills VCF FileStoreID - :param str mem: Memory value to be passed to children. Needed for CI tests - :param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" - :return: FileStoreID for the processed bam - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - file_ids = [ref, fai, ref_dict, bam, bai, phase, mills] - inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.bam', 'sample.bam.bai', 'phase.vcf', 'mills.vcf'] - for file_store_id, name in zip(file_ids, inputs): - job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) - # Call: GATK -- RealignerTargetCreator - parameters = ['-T', 'RealignerTargetCreator', - '-nt', str(job.cores), - '-R', '/data/ref.fasta', - '-I', '/data/sample.bam', - '-known', '/data/phase.vcf', - '-known', '/data/mills.vcf', - '--downsampling_type', 'NONE', - '-o', '/data/sample.intervals'] - if unsafe: - parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) - docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', - inputs=inputs, - outputs={'sample.intervals': None}, - work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem))) - # Write to fileStore - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.intervals')) - - -def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, phase, mills, mem, unsafe=False): - """ - Creates realigned bams using the intervals file from previous step - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str intervals: Indel interval FileStoreID - :param str bam: Sample BAM FileStoreID - :param str bai: Bam Index FileStoreID - :param str ref: Reference genome FileStoreID - :param str ref_dict: Reference dictionary FileStoreID - :param str fai: Reference index FileStoreID - :param str phase: Phase VCF FileStoreID - :param str mills: Mills VCF FileStoreID - :param str mem: Memory value to be passed to children. Needed for CI tests - :param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" - :return: FileStoreID for the processed bam - :rtype: tuple(str, str) - """ - work_dir = job.fileStore.getLocalTempDir() - file_ids = [ref, fai, ref_dict, intervals, bam, bai, phase, mills] - inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.intervals', - 'sample.bam', 'sample.bam.bai', 'phase.vcf', 'mills.vcf'] - for file_store_id, name in zip(file_ids, inputs): - job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) - # Call: GATK -- IndelRealigner - parameters = ['-T', 'IndelRealigner', - '-R', '/data/ref.fasta', - '-I', '/data/sample.bam', - '-known', '/data/phase.vcf', - '-known', '/data/mills.vcf', - '-targetIntervals', '/data/sample.intervals', - '--downsampling_type', 'NONE', - '-maxReads', str(720000), # Taken from MC3 pipeline - '-maxInMemory', str(5400000), # Taken from MC3 pipeline - '-o', '/data/sample.indel.bam'] - if unsafe: - parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) - docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', - inputs=inputs, - outputs={'sample.indel.bam': None, 'sample.indel.bai': None}, - work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem))) - # Write to fileStore - indel_bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.indel.bam')) - indel_bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.indel.bai')) - return indel_bam, indel_bai - - -def run_base_recalibration(job, indel_bam, indel_bai, ref, ref_dict, fai, dbsnp, mem, unsafe=False): - """ - Creates recal table used in Base Quality Score Recalibration - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str indel_bam: Indel interval FileStoreID - :param str indel_bai: Bam Index FileStoreID - :param str ref: Reference genome FileStoreID - :param str ref_dict: Reference dictionary FileStoreID - :param str fai: Reference index FileStoreID - :param str dbsnp: DBSNP VCF FileStoreID - :param str mem: Memory value to be passed to children. Needed for CI tests - :param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" - :return: FileStoreID for the processed bam - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - file_ids = [ref, fai, ref_dict, indel_bam, indel_bai, dbsnp] - inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.indel.bam', 'sample.indel.bai', 'dbsnp.vcf'] - for file_store_id, name in zip(file_ids, inputs): - job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) - # Call: GATK -- IndelRealigner - parameters = ['-T', 'BaseRecalibrator', - '-nct', str(job.cores), - '-R', '/data/ref.fasta', - '-I', '/data/sample.indel.bam', - '-knownSites', '/data/dbsnp.vcf', - '-o', '/data/sample.recal.table'] - if unsafe: - parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) - docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', - inputs=inputs, - outputs={'sample.recal.table': None}, - work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem))) - # Write output to file store - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.recal.table')) - - -def run_print_reads(job, table, indel_bam, indel_bai, ref, ref_dict, fai, mem, unsafe=False): - """ - Creates BAM that has had the base quality scores recalibrated - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str table: Recalibration table FileStoreID - :param str indel_bam: Indel interval FileStoreID - :param str indel_bai: Bam Index FileStoreID - :param str ref: Reference genome FileStoreID - :param str ref_dict: Reference dictionary FileStoreID - :param str fai: Reference index FileStoreID - :param str mem: Memory value to be passed to children. Needed for CI tests - :param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" - :return: FileStoreID for the processed bam - :rtype: tuple(str, str) - """ - work_dir = job.fileStore.getLocalTempDir() - file_ids = [ref, fai, ref_dict, table, indel_bam, indel_bai] - inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.recal.table', - 'sample.indel.bam', 'sample.indel.bai'] - for file_store_id, name in zip(file_ids, inputs): - job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) - # Call: GATK -- PrintReads - parameters = ['-T', 'PrintReads', - '-nct', str(job.cores), - '-R', '/data/ref.fasta', - '--emit_original_quals', - '-I', '/data/sample.indel.bam', - '-BQSR', '/data/sample.recal.table', - '-o', '/data/sample.bqsr.bam'] - if unsafe: - parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) - docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', - inputs=inputs, - outputs={'sample.bqsr.bam': None, 'sample.bqsr.bai': None}, - work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem))) - # Write ouptut to file store - bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bqsr.bam')) - bai_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bqsr.bai')) - return bam_id, bai_id diff --git a/src/toil_scripts/tools/quantifiers.py b/src/toil_scripts/tools/quantifiers.py deleted file mode 100644 index 983e7233..00000000 --- a/src/toil_scripts/tools/quantifiers.py +++ /dev/null @@ -1,127 +0,0 @@ -import os - -import subprocess - -from toil_scripts.lib.files import tarball_files -from toil_scripts.lib.programs import docker_call -from toil_scripts.lib.urls import download_url - - -def run_kallisto(job, r1_id, r2_id, kallisto_index_url): - """ - RNA quantification via Kallisto - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str r1_id: FileStoreID of fastq (pair 1) - :param str r2_id: FileStoreID of fastq (pair 2 if applicable, otherwise pass None for single-end) - :param str kallisto_index_url: FileStoreID for Kallisto index file - :return: FileStoreID from Kallisto output - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - download_url(url=kallisto_index_url, name='kallisto_hg38.idx', work_dir=work_dir) - # Retrieve files - parameters = ['quant', - '-i', '/data/kallisto_hg38.idx', - '-t', str(job.cores), - '-o', '/data/', - '-b', '100'] - if r1_id and r2_id: - job.fileStore.readGlobalFile(r1_id, os.path.join(work_dir, 'R1_cutadapt.fastq')) - job.fileStore.readGlobalFile(r2_id, os.path.join(work_dir, 'R2_cutadapt.fastq')) - parameters.extend(['/data/R1_cutadapt.fastq', '/data/R2_cutadapt.fastq']) - else: - job.fileStore.readGlobalFile(r1_id, os.path.join(work_dir, 'R1_cutadapt.fastq')) - parameters.extend(['--single', '-l', '200', '-s', '15', '/data/R1_cutadapt.fastq']) - - # Call: Kallisto - docker_call(tool='quay.io/ucsc_cgl/kallisto:0.42.4--35ac87df5b21a8e8e8d159f26864ac1e1db8cf86', - work_dir=work_dir, parameters=parameters) - # Tar output files together and store in fileStore - output_files = [os.path.join(work_dir, x) for x in ['run_info.json', 'abundance.tsv', 'abundance.h5']] - tarball_files(tar_name='kallisto.tar.gz', file_paths=output_files, output_dir=work_dir) - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'kallisto.tar.gz')) - - -def run_rsem(job, bam_id, rsem_ref_url, paired=True): - """ - RNA quantification with RSEM - - :param JobFunctionWrappingJob job: Passed automatically by Toil - :param str bam_id: FileStoreID of transcriptome bam for quantification - :param str rsem_ref_url: URL of RSEM reference (tarball) - :param bool paired: If True, uses parameters for paired end data - :return: FileStoreIDs for RSEM's gene and isoform output - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - download_url(url=rsem_ref_url, name='rsem_ref.tar.gz', work_dir=work_dir) - subprocess.check_call(['tar', '-xvf', os.path.join(work_dir, 'rsem_ref.tar.gz'), '-C', work_dir]) - os.remove(os.path.join(work_dir, 'rsem_ref.tar.gz')) - # Determine tarball structure - based on it, ascertain folder name and rsem reference prefix - rsem_files = [] - for root, directories, files in os.walk(work_dir): - rsem_files.extend([os.path.join(root, x) for x in files]) - # "grp" is a required RSEM extension that should exist in the RSEM reference - ref_prefix = [os.path.basename(os.path.splitext(x)[0]) for x in rsem_files if 'grp' in x][0] - ref_folder = os.path.join('/data', os.listdir(work_dir)[0]) if len(os.listdir(work_dir)) == 1 else '/data' - # I/O - job.fileStore.readGlobalFile(bam_id, os.path.join(work_dir, 'transcriptome.bam')) - output_prefix = 'rsem' - # Call: RSEM - parameters = ['--quiet', - '--no-qualities', - '-p', str(job.cores), - '--forward-prob', '0.5', - '--seed-length', '25', - '--fragment-length-mean', '-1.0', - '--bam', '/data/transcriptome.bam', - os.path.join(ref_folder, ref_prefix), - output_prefix] - if paired: - parameters = ['--paired-end'] + parameters - docker_call(tool='quay.io/ucsc_cgl/rsem:1.2.25--d4275175cc8df36967db460b06337a14f40d2f21', - parameters=parameters, work_dir=work_dir) - os.rename(os.path.join(work_dir, output_prefix + '.genes.results'), os.path.join(work_dir, 'rsem_gene.tab')) - os.rename(os.path.join(work_dir, output_prefix + '.isoforms.results'), os.path.join(work_dir, 'rsem_isoform.tab')) - # Write to FileStore - gene_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'rsem_gene.tab')) - isoform_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'rsem_isoform.tab')) - return gene_id, isoform_id - - -def run_rsem_postprocess(job, uuid, rsem_gene_id, rsem_isoform_id): - """ - Parses RSEMs output to produce the separate .tab files (TPM, FPKM, counts) for both gene and isoform. - These are two-column files: Genes and Quantifications. - HUGO files are also provided that have been mapped from Gencode/ENSEMBLE names. - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str uuid: UUID to mark the samples with - :param str rsem_gene_id: FileStoreID of rsem_gene_ids - :param str rsem_isoform_id: FileStoreID of rsem_isoform_ids - :return: FileStoreID from RSEM post process tarball - :rytpe: str - """ - work_dir = job.fileStore.getLocalTempDir() - # I/O - job.fileStore.readGlobalFile(rsem_gene_id, os.path.join(work_dir, 'rsem_gene.tab'), mutable=True) - job.fileStore.readGlobalFile(rsem_isoform_id, os.path.join(work_dir, 'rsem_isoform.tab'), mutable=True) - # Convert RSEM files into individual .tab files. - docker_call(tool='jvivian/rsem_postprocess', parameters=[uuid], work_dir=work_dir) - os.rename(os.path.join(work_dir, 'rsem_gene.tab'), os.path.join(work_dir, 'rsem_genes.results')) - os.rename(os.path.join(work_dir, 'rsem_isoform.tab'), os.path.join(work_dir, 'rsem_isoforms.results')) - output_files = ['rsem.genes.norm_counts.tab', 'rsem.genes.raw_counts.tab', 'rsem.isoform.norm_counts.tab', - 'rsem.isoform.raw_counts.tab', 'rsem_genes.results', 'rsem_isoforms.results'] - # Perform HUGO gene / isoform name mapping - genes = [x for x in output_files if 'rsem.genes' in x] - isoforms = [x for x in output_files if 'rsem.isoform' in x] - command = ['-g'] + genes + ['-i'] + isoforms - docker_call(tool='jvivian/gencode_hugo_mapping', parameters=command, work_dir=work_dir) - hugo_files = [os.path.splitext(x)[0] + '.hugo' + os.path.splitext(x)[1] for x in genes + isoforms] - # Create tarballs for outputs - tarball_files('rsem.tar.gz', file_paths=[os.path.join(work_dir, x) for x in output_files], output_dir=work_dir) - tarball_files('rsem_hugo.tar.gz', [os.path.join(work_dir, x) for x in hugo_files], output_dir=work_dir) - rsem_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'rsem.tar.gz')) - hugo_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'rsem_hugo.tar.gz')) - return rsem_id, hugo_id diff --git a/src/toil_scripts/tools/spark_tools.py b/src/toil_scripts/tools/spark_tools.py deleted file mode 100644 index c6830bfd..00000000 --- a/src/toil_scripts/tools/spark_tools.py +++ /dev/null @@ -1,190 +0,0 @@ -""" -Functions for calling raw tools in the UCSC Computational Genomics Lab -ADAM/Spark pipeline - -@author Audrey Musselman-Brown, almussel@ucsc.edu -@author Frank Austin Nothaft, fnothaft@berkeley. -""" - -import os.path - -from toil_scripts.lib import require -from toil_scripts.lib.programs import docker_call - - -SPARK_MASTER_PORT = "7077" -HDFS_MASTER_PORT = "8020" - - -class MasterAddress(str): - """ - A string containing the hostname or IP of the Spark/HDFS master. The Spark master expects its own address to - match what the client uses to connect to it. For example, if the master is configured with a host name, - the driver can't use an IP address to connect to it, and vice versa. This class works around by distinguishing - between the notional master address (self) and the actual one (self.actual) and adds support for the special - master address "auto" in order to implement auto-discovery of the master of a standalone. - - >>> foo = MasterAddress('foo') - >>> foo == 'foo' - True - >>> foo.actual == 'foo' - True - >>> foo.actual == foo - True - """ - def __init__(self, master_ip): - super(MasterAddress, self).__init__(master_ip) - self.actual = self - - def docker_parameters(self, docker_parameters=None): - """ - Augment a list of "docker run" arguments with those needed to map the notional Spark master address to the - real one, if they are different. - """ - if self != self.actual: - add_host_option = '--add-host=spark-master:' + self.actual - if docker_parameters is None: - docker_parameters = [add_host_option] - else: - docker_parameters.append(add_host_option) - return docker_parameters - -def _make_parameters(master_ip, default_parameters, memory, arguments, override_parameters): - """ - Makes a Spark Submit style job submission line. - - :param masterIP: The Spark leader IP address. - :param default_parameters: Application specific Spark configuration parameters. - :param memory: The memory to allocate to each Spark driver and executor. - :param arguments: Arguments to pass to the submitted job. - :param override_parameters: Parameters passed by the user, that override our defaults. - - :type masterIP: MasterAddress - :type default_parameters: list of string - :type arguments: list of string - :type memory: int or None - :type override_parameters: list of string or None - """ - - # python doesn't support logical xor? - # anywho, exactly one of memory or override_parameters must be defined - require((override_parameters is not None or memory is not None) and - (override_parameters is None or memory is None), - "Either the memory setting must be defined or you must provide Spark configuration parameters.") - - # if the user hasn't provided overrides, set our defaults - parameters = [] - if memory is not None: - parameters = ["--master", "spark://%s:%s" % (master_ip, SPARK_MASTER_PORT), - "--conf", "spark.driver.memory=%sg" % memory, - "--conf", "spark.executor.memory=%sg" % memory, - "--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT))] - else: - parameters.extend(override_parameters) - - # add the tool specific spark parameters - parameters.extend(default_parameters) - - # spark submit expects a '--' to split the spark conf arguments from tool arguments - parameters.append('--') - - # now add the tool arguments and return - parameters.extend(arguments) - - return parameters - - -def call_conductor(master_ip, src, dst, memory=None, override_parameters=None): - """ - Invokes the Conductor container to copy files between S3 and HDFS and vice versa. - Find Conductor at https://github.com/BD2KGenomics/conductor. - - :param masterIP: The Spark leader IP address. - :param src: URL of file to copy. - :param src: URL of location to copy file to. - :param memory: Gigabytes of memory to provision for Spark driver/worker. - :param override_parameters: Parameters passed by the user, that override our defaults. - - :type masterIP: MasterAddress - :type src: string - :type dst: string - :type memory: int or None - :type override_parameters: list of string or None - """ - - arguments = ["-C", src, dst] - - docker_call(rm=False, - tool="quay.io/ucsc_cgl/conductor", - docker_parameters=master_ip.docker_parameters(["--net=host"]), - parameters=_make_parameters(master_ip, - [], # no conductor specific spark configuration - memory, - arguments, - override_parameters), - mock=False) - - -def call_adam(master_ip, arguments, - memory=None, - override_parameters=None, - run_local=False, - native_adam_path=None): - """ - Invokes the ADAM container. Find ADAM at https://github.com/bigdatagenomics/adam. - - :param masterIP: The Spark leader IP address. - :param arguments: Arguments to pass to ADAM. - :param memory: Gigabytes of memory to provision for Spark driver/worker. - :param override_parameters: Parameters passed by the user, that override our defaults. - :param native_adam_path: Path to ADAM executable. If not provided, Docker is used. - :param run_local: If true, runs Spark with the --master local[*] setting, which uses - all cores on the local machine. The master_ip will be disregarded. - - :type masterIP: MasterAddress - :type arguments: list of string - :type memory: int or None - :type override_parameters: list of string or None - :type native_adam_path: string or None - :type run_local: boolean - """ - if local: - master = ["--master", "local[*]"] - else: - master = ["--master", - ("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)), - "--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT)),] - - default_params = (master + [ - # set max result size to unlimited, see #177 - "--conf", "spark.driver.maxResultSize=0", - # these memory tuning parameters were derived in the course of running the - # experiments for the ADAM sigmod paper: - # - # Nothaft, Frank Austin, et al. "Rethinking data-intensive science using scalable - # analytics systems." Proceedings of the 2015 ACM SIGMOD International Conference - # on Management of Data. ACM, 2015. - # - # the memory tunings reduce the amount of memory dedicated to caching, which we don't - # take advantage of, and the network timeout flag reduces the number of job failures - # caused by heavy gc load - "--conf", "spark.storage.memoryFraction=0.3", - "--conf", "spark.storage.unrollFraction=0.1", - "--conf", "spark.network.timeout=300s"]) - - # are we running adam via docker, or do we have a native path? - if native_adam_path is None: - docker_call(rm=False, - tool="quay.io/ucsc_cgl/adam:962-ehf--6e7085f8cac4b9a927dc9fb06b48007957256b80", - docker_parameters=master_ip.docker_parameters(["--net=host"]), - parameters=_make_parameters(master_ip, - default_params, - memory, - arguments, - override_parameters), - mock=False) - else: - check_call([os.path.join(native_adam_path, "bin/adam-submit")] + - default_params + - arguments) -