-
Notifications
You must be signed in to change notification settings - Fork 41
Expand file tree
/
Copy pathsbatch_cluster.py
More file actions
executable file
·106 lines (79 loc) · 3.8 KB
/
sbatch_cluster.py
File metadata and controls
executable file
·106 lines (79 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/env python3
"""
Submit this clustering script for sbatch to snakemake with:
snakemake -j 99 --debug --cluster-config cluster.json --cluster 'bsub_cluster.py'
"""
## In order to submit all the jobs to the queuing system, one needs to write a wrapper.
## This wrapper is inspired by Daniel Park https://github.com/broadinstitute/viral-ngs/blob/master/pipes/Broad_LSF/cluster-submitter.py
## I asked him questions on the snakemake google group and he kindly answered: https://groups.google.com/forum/#!topic/snakemake/1QelazgzilY
import sys
import re
import os
import errno
from snakemake.utils import read_job_properties
## snakemake will generate a jobscript containing all the (shell) commands from your Snakefile.
## I think that's something baked into snakemake's code itself. It passes the jobscript as the last parameter.
## https://bitbucket.org/snakemake/snakemake/wiki/Documentation#markdown-header-job-properties
## make a directory for the logs from the cluster
try:
os.makedirs("sbatch_log")
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
jobscript = sys.argv[-1]
job_properties = read_job_properties(jobscript)
## the jobscript is something like snakejob.index_bam.23.sh
mo = re.match(r'(\S+)/snakejob\.\S+\.(\d+)\.sh', jobscript)
assert mo
sm_tmpdir, sm_jobid = mo.groups()
## set up jobname.
jobname = "{rule}-{jobid}".format(rule = job_properties["rule"], jobid = sm_jobid)
## it is safer to use get method in case the key is not present
# the job_properties is a dictionary of dictonary. I set up job name in the Snake file under the params directive and associate the sample name with the
# job
jobname_tag_sample = job_properties.get('params', {}). get('jobname')
if jobname_tag_sample:
jobname = jobname + "-" + jobname_tag_sample
# access property defined in the cluster configuration file (Snakemake >=3.6.0), cluster.json
time = job_properties["cluster"]["time"]
node = job_properties["cluster"]["N"]
cpu = job_properties["cluster"]["n"]
mem = job_properties["cluster"]["mem"]
queue = job_properties["cluster"]["p"]
#EmailNotice = job_properties["cluster"]["EmailNotice"]
#email = job_properties["cluster"]["email"]
cmdline = 'sbatch -N {node} -n {cpu} --time=={time} -p {queue} -J {jobname} -o sbatch_log/{out}.out -e sbatch_log/{err}.err' \
.format(node = node, cpu = cpu, time = time, queue = queue, jobname = jobname, out = jobname, err = jobname)
cmdline += ' --mem=={}'.format(int(mem))
# figure out job dependencies, the last argument is the jobscript which is baked in snakemake
# man bsub to see -w documentation
# exit(job_ID | "job_name"
# The job state is EXIT, and the job's exit code
# satisfies the comparison test.
# If you specify an exit code with no operator,
# the test is for equality (== is assumed).
# If you specify only the job, any exit code
# satisfies the test.
#Enclose the dependency expression in single quotes (')
#to prevent the shell from interpreting special
# characters (space, any logic operator, or parentheses).
# If you use single quotes for the dependency expression,
# use double quotes (") for quoted items within it, such
# as job names.
# e.g.
# dependencies = {'1234', '2345', '122'}
#
# " -w '{}' ".format(" && ".join(["done({})".format(dependency) for dependency in dependencies]) )
# " -w 'done(1234) && done(2345) && done(122)'
# -w done(1234) is the same as -w 1234
# single quote is important '{}'
#dependencies = set(sys.argv[1:-1])
#if dependencies:
# cmdline += " -w '{}' ".format(" && ".join(["done({})".format(dependency) for dependency in dependencies]))
# the actual job
cmdline += jobscript
# the part that strips bsub's output to just the job id
#cmdline += r" | tail -1 | cut -f 2 -d \< | cut -f 1 -d \>"
# call the command
print(cmdline)
os.system(cmdline)