-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrfhap.nf
More file actions
345 lines (271 loc) · 10.9 KB
/
rfhap.nf
File metadata and controls
345 lines (271 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
nextflow.enable.dsl=2
def help_function() {
help = """rfhap.nf: haplotype binning using trios, multiple-k and random forest.
|Required arguments:
| --paternal_reads paternal short reads
| [default: ${params.paternal_reads}]
| --maternal_reads maternal short reads
| [default: ${params.maternal_reads}]
| --child_reads child long reads
| [default: ${params.child_reads}]
|
|Optional arguments:
| --outdir The rfhap result directory.
| [default: ${params.outdir}]
| --kmers The size of k-mers (24 27 30 33)
| [default: ${params.kmers}]
| --num_cases Random forest training matrix size
| [default ${params.num_cases}]
| --size Random forest prediction matrix size
| [default ${params.size}]
| """.stripMargin()
// Print the help with the stripped margin and exit
println(help)
exit(0)
}
process create_kmers_database {
publishDir (
path: "${params.outdir}/ukmers_per_haplotype",
mode: "copy",
pattern: "uk*/*.txt*",
saveAs: { fn -> fn }
)
//singularity and nextflow container
container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
'oras://community.wave.seqera.io/library/kmc_mappy_seqtk_r-caret_pruned:46de1e7eb6bf9c6d' :
'community.wave.seqera.io/library/kmc_mappy_seqtk_r-caret_pruned:27ab4d2f39aac56f' }"
input:
val kmer_size
path paternal
path maternal
output:
path "uk${kmer_size}/hapA_only_kmers.txt", emit: hpA
path "uk${kmer_size}/hapB_only_kmers.txt", emit: hpB
path "uk${kmer_size}/uk${kmer_size}.log.txt", emit : log
path "uk${kmer_size}/uk${kmer_size}.err.txt", emit : err
path "uk${kmer_size}/hapA_only_kmers.txt.counts" , emit : countsA
path "uk${kmer_size}/hapB_only_kmers.txt.counts" , emit : countsB
script:
def fm=maternal.join(",")
def fp=paternal.join(",")
if(params.debug){
"""
mkdir uk${kmer_size}
touch uk${kmer_size}/hapA_only_kmers.txt
touch uk${kmer_size}/hapB_only_kmers.txt
touch uk${kmer_size}/uk${kmer_size}.log.txt
touch uk${kmer_size}/uk${kmer_size}.err.txt
touch uk${kmer_size}/hapA_only_kmers.txt.counts
touch uk${kmer_size}/hapB_only_kmers.txt.counts
echo $paternal
echo $maternal
echo $fm
echo find_unique_kmers.py -k${kmer_size} -p $task.cpus -o uk${kmer_size} --path-to-kmc kmc ${fm} ${fp}
"""
}else{
"""
mkdir uk${kmer_size}
${baseDir}/aux_scripts/find_unique_kmers.py -k${kmer_size} -p $task.cpus -o uk${kmer_size} --path-to-kmc kmc ${fm} ${fp} > uk${kmer_size}/uk${kmer_size}.log.txt 2> uk${kmer_size}/uk${kmer_size}.err.txt
"""
}
}
process print_paths{
publishDir "$params.outdir/ukmers_per_haplotype/" , mode : "copy"
input:
val pathsA
val pathsB
output:
path("kmerdb.txt"), emit : kmerdb
script:
"""
echo "${pathsA} \$(echo "${pathsA}" | grep -oE 'uk([1-9][0-9]|100)' | grep -oE '[0-9]+') \$(echo "${pathsA}" | grep -oE 'hap[AB]_only_kmers.txt' | grep -oE 'A|B')" > kmerdb.txt
echo "${pathsB} \$(echo "${pathsB}" | grep -oE 'uk([1-9][0-9]|100)' | grep -oE '[0-9]+') \$(echo "${pathsB}" | grep -oE 'hap[AB]_only_kmers.txt' | grep -oE 'A|B')" >> kmerdb.txt
"""
}
process sort_kmer_db{
publishDir "$params.outdir/ukmers_per_haplotype/" , mode : "copy"
input:
val(kmerdb)
output:
path("kmerdb_sorted.txt"),emit: kmerdb_sorted
script:
"""
cat ${kmerdb} | sort -k2,2 -n | grep -v '^\$' > kmerdb_sorted.txt
"""
}
//todo add cpus and memory
//cmd ./FastKM kmerdb.txt long-reads.fastq.gz <number_of_cores>
process FastKM {
publishDir "${params.outdir}/fastkm/", mode: "copy"
input:
tuple path(long_reads), val(kmer_db)
output:
tuple path("${long_reads.baseName}_fastkm_matrix.txt"), path("${long_reads.baseName}_fastkm_kbz.txt") , emit:fkm
path("${long_reads.baseName}_fastkm_matrix.log") , emit:kf
//path("${long_reads.baseName}_fastkm_kbz.txt") , emit:kbz //size of k-mers database
script:
def filename=long_reads.baseName
if(params.debug){
"""
echo ${baseDir}/bin/FastKM ${kmer_db} ${long_reads} $task.cpus > ${filename}_fastkm_matrix.txt
touch ${filename}_fastkm_matrix.txt
touch ${filename}_fastkm_matrix.log
touch ${filename}_fastkm_kbz.txt
"""
}else{
"""
${baseDir}/bin/FastKM ${kmer_db} ${long_reads} $task.cpus > ${filename}_fastkm_matrix.txt 2> ${filename}_fastkm_matrix.log
grep "^#" ${filename}_fastkm_matrix.log | awk 'BEGIN{print "ks hap n"}{print \$2" "\$4" "\$3}' > ${filename}_fastkm_kbz.txt
"""
}
}
process trainRF{
publishDir "${params.outdir}/rf/train/", mode: "copy"
//singularity and nextflow container
container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
'oras://community.wave.seqera.io/library/kmc_mappy_seqtk_r-caret_pruned:46de1e7eb6bf9c6d' :
'community.wave.seqera.io/library/kmc_mappy_seqtk_r-caret_pruned:27ab4d2f39aac56f' }"
input:
tuple path(matrix), path(kmer_number)
output:
tuple val(matrix.baseName), path("${matrix.baseName}_rf_model.rds"), emit: model
tuple path("${matrix.baseName}_confusion_matrix.rds"), path("${matrix.baseName}_confusion_matrix.txt"), emit: dc
path("${matrix.baseName}.train_matrix.txt"), emit: tmatrix
path("${matrix.baseName}_feature_importance.txt"), emit: fimport
script:
def filename=matrix.baseName
def r_script = "${baseDir}/aux_scripts/train-rf-v03.R"
if(params.debug){
"""
echo Rscript ${r_script} ${matrix} ${kmer_number} ${matrix.baseName}
"""
}else{
"""
cat ${matrix} | grep -v "^id"| shuf -n ${params.num_cases} > small_fastkm_matrix.tmp
head -1 ${matrix} > h
cat h small_fastkm_matrix.tmp > ${filename}.train_matrix.txt
Rscript ${r_script} ${filename}.train_matrix.txt ${kmer_number} ${filename}
"""
}
}
process predictRF{
publishDir "${params.outdir}/rf/prediction/", mode: "copy"
//singularity and nextflow container
container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
'oras://community.wave.seqera.io/library/kmc_mappy_seqtk_r-caret_pruned:46de1e7eb6bf9c6d' :
'community.wave.seqera.io/library/kmc_mappy_seqtk_r-caret_pruned:27ab4d2f39aac56f' }"
input:
tuple val(id), path(matrix), path(kmer_number), path(model)
output:
tuple val(id), path("${matrix.baseName}_rf_prediction.txt"), emit: pred
script:
def r_script = "${baseDir}/aux_scripts/predict_rf-v03.R"
if(params.debug){
"""
echo Rscript ${r_script} ${matrix} ${kmer_number} ${model} ${matrix.baseName}
touch ${filename}_rf_prediction.txt
"""
}else{
"""
Rscript ${r_script} ${matrix} ${kmer_number} ${model} ${matrix.baseName}
"""
}
}
//split the haplotypes using a treshold of 0.7
process setHaplotypes{
publishDir "${params.outdir}/rf/haplotypes/" , mode : "copy"
input:
tuple val(id),path(predfiles)
output:
tuple val(id), path("${id}.hap*.txt") , emit : hap
script:
def awk_script = "${baseDir}/aux_scripts/filter_predictions.awk"
"""
awk -f ${awk_script} ${predfiles} | grep -v "idr" > hap.tmp.txt
cat hap.tmp.txt | grep "^A" | awk '{print \$2}' > ${id}.hapA.txt
cat hap.tmp.txt | grep "^B" | awk '{print \$2}' > ${id}.hapB.txt
cat hap.tmp.txt | grep "^U" | awk '{print \$2}' > ${id}.hapU.txt
"""
}
//get the file list for each file
process seqtk{
publishDir "${params.outdir}/hap_fastq/" , mode : "copy"
//singularity and nextflow container
container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
'oras://community.wave.seqera.io/library/seqtk_pigz:831a2b8e7799c179' :
'community.wave.seqera.io/library/seqtk_pigz:7b301904d63da1f1' }"
input:
tuple val(id),path(hap),path(fastq)
output:
tuple val(id), path("${hap.baseName}.fq.gz"), emit: fastq
script:
"""
seqtk subseq ${fastq} ${hap} | pigz -p $task.cpus > ${hap.baseName}.fq.gz
"""
}
//we init the workflow
workflow{
if(params.help){
help_function()
}
if(params.child_reads == null || params.maternal_reads == null || params.paternal_reads == null){
help_function()
}
input_ch = Channel.of(params.kmers.split(" "))
paternal_reads_ch=Channel.fromPath(params.paternal_reads) \
| splitCsv(header:true) \
| map {row->file(row.path)}
maternal_reads_ch=Channel.fromPath(params.maternal_reads) \
| splitCsv(header:true) \
| map {row->file(row.path)}
//child reads
long_reads_ch=Channel.fromPath(params.child_reads) \
| splitCsv(header:true) \
| map {row->file(row.path)}
haplotypesA=create_kmers_database(input_ch,paternal_reads_ch.collect(),maternal_reads_ch.collect()).hpA
haplotypesB=create_kmers_database.out.hpB
print_paths(haplotypesA,haplotypesB)
kmerdb=print_paths.out.kmerdb.collectFile(name :"kmerdb.txt",newLine: false, storeDir : "$baseDir/$params.outdir/trio_binning/")
sort_kmer_db(kmerdb)
//we run fastKm
lr_ch=long_reads_ch.combine(sort_kmer_db.out.kmerdb_sorted)
FastKM(lr_ch)
//we train with a small subset of the data a random forest classifier
trainRF(FastKM.out.fkm)
//we split the fastKM matrix into chunks
chunks = FastKM.out.fkm.splitText(by: params.size , file : true, keepHeader: true)
// chunks.view()
chunks=chunks.map { paths ->
// Extract the basename from the first file path
def bn = paths[0].baseName.toString().tokenize('.')[0..-2].join('.')
// Create a new label with the basename at the beginning, followed by the paths
["$bn"] + paths
}
//chunks.view()
//we prepare the input for running the random forest id, matrix, nkmers, model
pred_rf=chunks.combine(trainRF.out.model, by:0)
//pred_rf.view()
predictRF(pred_rf)
//we group all predictions
mark_hap=predictRF.out.pred.groupTuple(by:0)
//mark_hap.view()
setHaplotypes(mark_hap)
//setHaplotypes.out.hap.view()
//we match the long reads with the haplotypes
lrc=long_reads_ch.map{ lr ->
def bn = lr.baseName+"_fastkm_matrix"
tuple(bn,lr)
}
hapseq=setHaplotypes.out.hap.combine(lrc,by:0)
//we prepare a tuple of id, hap, fastq
hapseq=hapseq.map { id, fileList, singleFile ->
// Return three separate tuples, one for each file in fileList
[
[id, fileList[0], singleFile],
[id, fileList[1], singleFile],
[id, fileList[2], singleFile]
]
}.flatten().collate(3)
//we create the fastq file for each element
seqtk(hapseq)
}