From 4ecbc3f8c120182e2f45bb58ff79e25a6548bea7 Mon Sep 17 00:00:00 2001 From: Kevin Rue-Albrecht Date: Mon, 10 Dec 2018 21:19:16 +0000 Subject: [PATCH 1/5] Split loading of feature count table in two: concatenate as cluster job, database load on headnode --- pipelines/pipeline_scrnaseq.py | 60 ++++++++++++++++++++---- pipelines/pipeline_scrnaseq/pipeline.yml | 1 + 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/pipelines/pipeline_scrnaseq.py b/pipelines/pipeline_scrnaseq.py index 728410b..ae7629a 100644 --- a/pipelines/pipeline_scrnaseq.py +++ b/pipelines/pipeline_scrnaseq.py @@ -935,19 +935,61 @@ def featureCounts(infiles, outfile): @merge(featureCounts, - "featureCounts.dir/featurecounts.load") -def loadFeatureCounts(infiles, outfile): + "featureCounts.dir/featurecounts.txt") +def concatenateFeatureCounts(infiles, outfile): + ''' + Combine count data in the project database. + ''' + + infiles = " ".join(infiles) + + cat="track,gene_id,counts" + + cat_options = ["--regex-filename='%s'" % ".*/(.*).counts.gz", + "--no-titles", + "--header-names=track,gene_id,counts"] + + cat_options = " ".join(cat_options) + + missing_value = "na" + + statement = '''python -m cgatcore.tables + --cat=track + --missing-value=na + --regex-filename='.*/(.*).counts.gz' + --no-titles + %(infiles)s + > %(outfile)s + ''' + + P.run(statement, job_memory=PARAMS["sql_himem"]) + + +@transform(concatenateFeatureCounts, + regex(r"featureCounts.dir/(.*).txt"), + r"featureCounts.dir/\1.load") +def loadFeatureCounts(infile, outfile): ''' Combine and load count data in the project database. ''' - P.concatenate_and_load(infiles, outfile, - regex_filename=".*/(.*).counts.gz", - has_titles=False, - cat="track", - header="track,gene_id,counts", - options='-i "gene_id"', - job_memory=PARAMS["sql_himem"]) + tablename = infile.replace(".load", "") + + database_url = PARAMS["database"]["url"] + + statement = '''cat %(infile)s + | python -m cgatcore.csv2db + --retry + --database-url=%(database_url)s + --add-index=track + --header-names=track,gene_id,counts -i "gene_id" + --table=featurecounts + > %(outfile)s + ''' + + to_cluster = False + + P.run(statement) @files(loadFeatureCounts, diff --git a/pipelines/pipeline_scrnaseq/pipeline.yml b/pipelines/pipeline_scrnaseq/pipeline.yml index 61047e4..135dd07 100644 --- a/pipelines/pipeline_scrnaseq/pipeline.yml +++ b/pipelines/pipeline_scrnaseq/pipeline.yml @@ -77,6 +77,7 @@ strandedness: none # location of the local sqlite3 database database: file: csvdb + url: sqlite:///./csvdb # Spike-in options From 958dccbb7e5d5a150a5f77b7e2a5b033ccc3ed85 Mon Sep 17 00:00:00 2001 From: Kevin Rue-Albrecht Date: Mon, 10 Dec 2018 21:21:36 +0000 Subject: [PATCH 2/5] Discard unused code --- pipelines/pipeline_scrnaseq.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pipelines/pipeline_scrnaseq.py b/pipelines/pipeline_scrnaseq.py index ae7629a..4c60a3e 100644 --- a/pipelines/pipeline_scrnaseq.py +++ b/pipelines/pipeline_scrnaseq.py @@ -943,16 +943,6 @@ def concatenateFeatureCounts(infiles, outfile): infiles = " ".join(infiles) - cat="track,gene_id,counts" - - cat_options = ["--regex-filename='%s'" % ".*/(.*).counts.gz", - "--no-titles", - "--header-names=track,gene_id,counts"] - - cat_options = " ".join(cat_options) - - missing_value = "na" - statement = '''python -m cgatcore.tables --cat=track --missing-value=na From c37b63d543b6a4fa32406efa5f180afcbf4c0857 Mon Sep 17 00:00:00 2001 From: Kevin Rue-Albrecht Date: Mon, 10 Dec 2018 21:26:22 +0000 Subject: [PATCH 3/5] Fix code indentation --- pipelines/pipeline_scrnaseq.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/pipeline_scrnaseq.py b/pipelines/pipeline_scrnaseq.py index 4c60a3e..2c3222e 100644 --- a/pipelines/pipeline_scrnaseq.py +++ b/pipelines/pipeline_scrnaseq.py @@ -956,8 +956,8 @@ def concatenateFeatureCounts(infiles, outfile): @transform(concatenateFeatureCounts, - regex(r"featureCounts.dir/(.*).txt"), - r"featureCounts.dir/\1.load") + regex(r"featureCounts.dir/(.*).txt"), + r"featureCounts.dir/\1.load") def loadFeatureCounts(infile, outfile): ''' Combine and load count data in the project database. From 960c56bd659333edadacaec9955198c6fe0520d4 Mon Sep 17 00:00:00 2001 From: Kevin Rue-Albrecht Date: Mon, 10 Dec 2018 21:30:07 +0000 Subject: [PATCH 4/5] compress text file --- pipelines/pipeline_scrnaseq.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pipelines/pipeline_scrnaseq.py b/pipelines/pipeline_scrnaseq.py index 2c3222e..91c0f80 100644 --- a/pipelines/pipeline_scrnaseq.py +++ b/pipelines/pipeline_scrnaseq.py @@ -935,7 +935,7 @@ def featureCounts(infiles, outfile): @merge(featureCounts, - "featureCounts.dir/featurecounts.txt") + "featureCounts.dir/featurecounts.txt.gz") def concatenateFeatureCounts(infiles, outfile): ''' Combine count data in the project database. @@ -949,6 +949,7 @@ def concatenateFeatureCounts(infiles, outfile): --regex-filename='.*/(.*).counts.gz' --no-titles %(infiles)s + | gzip -c > %(outfile)s ''' @@ -956,7 +957,7 @@ def concatenateFeatureCounts(infiles, outfile): @transform(concatenateFeatureCounts, - regex(r"featureCounts.dir/(.*).txt"), + regex(r"featureCounts.dir/(.*).txt.gz"), r"featureCounts.dir/\1.load") def loadFeatureCounts(infile, outfile): ''' @@ -967,7 +968,7 @@ def loadFeatureCounts(infile, outfile): database_url = PARAMS["database"]["url"] - statement = '''cat %(infile)s + statement = '''zcat %(infile)s | python -m cgatcore.csv2db --retry --database-url=%(database_url)s From 47c00b7588a94f1ed8718c8d361f7bd121233bd2 Mon Sep 17 00:00:00 2001 From: Kevin Rue-Albrecht Date: Fri, 14 Dec 2018 20:51:15 +0000 Subject: [PATCH 5/5] Optionally disable tasks that involve large sqlite tables --- pipelines/pipeline_scrnaseq.py | 21 +++++++++++++++++++++ pipelines/pipeline_scrnaseq/pipeline.yml | 2 ++ 2 files changed, 23 insertions(+) diff --git a/pipelines/pipeline_scrnaseq.py b/pipelines/pipeline_scrnaseq.py index 91c0f80..2797e14 100644 --- a/pipelines/pipeline_scrnaseq.py +++ b/pipelines/pipeline_scrnaseq.py @@ -934,6 +934,7 @@ def featureCounts(infiles, outfile): P.run(statement) +@active_if(PARAMS["sql_load_concatenated_table"]) @merge(featureCounts, "featureCounts.dir/featurecounts.txt.gz") def concatenateFeatureCounts(infiles, outfile): @@ -983,6 +984,7 @@ def loadFeatureCounts(infile, outfile): P.run(statement) +@active_if(PARAMS["sql_load_concatenated_table"]) @files(loadFeatureCounts, "featureCounts.dir/featurecounts_counts.txt") def featurecountsGeneCounts(infile, outfile): @@ -1003,6 +1005,7 @@ def featurecountsGeneCounts(infile, outfile): df.to_csv(outfile, sep="\t", index=True, index_label="gene_id") +@active_if(PARAMS["sql_load_concatenated_table"]) @transform(featurecountsGeneCounts, suffix(".txt"), ".load") @@ -1067,6 +1070,7 @@ def salmon(infiles, outfile): P.run(statement) +@active_if(PARAMS["sql_load_concatenated_table"]) @active_if(fastqMode) @merge(salmon, "salmon.dir/salmon.transcripts.load") def loadSalmonTranscriptQuant(infiles, outfile): @@ -1083,6 +1087,7 @@ def loadSalmonTranscriptQuant(infiles, outfile): job_memory=PARAMS["sql_himem"]) +@active_if(PARAMS["sql_load_concatenated_table"]) @active_if(fastqMode) @merge(salmon, "salmon.dir/salmon.genes.load") def loadSalmonGeneQuant(infiles, outfile): @@ -1336,6 +1341,7 @@ def loadCuffNormUQ(infile, outfile): run_copy_number_estimation = False +@active_if(PARAMS["sql_load_concatenated_table"]) @active_if(run_copy_number_estimation) @follows(mkdir("copy.number.dir"), loadSalmonTPMs) @files("salmon.dir/salmon.genes.tpms.txt", @@ -1357,6 +1363,7 @@ def estimateCopyNumber(infile, outfile): P.run(statement) +@active_if(PARAMS["sql_load_concatenated_table"]) @active_if(run_copy_number_estimation) @transform(estimateCopyNumber, suffix(".txt"), @@ -1441,6 +1448,7 @@ def collectRnaSeqMetrics(infiles, outfile): P.run(statement) +@active_if(PARAMS["sql_load_concatenated_table"]) @merge(collectRnaSeqMetrics, "qc.dir/qc_rnaseq_metrics.load") def loadCollectRnaSeqMetrics(infiles, outfile): @@ -1481,6 +1489,7 @@ def threePrimeBias(infile, outfile): out_file.write("%.2f\n" % bias) +@active_if(PARAMS["sql_load_concatenated_table"]) @merge(threePrimeBias, "qc.dir/qc_three_prime_bias.load") def loadThreePrimeBias(infiles, outfile): @@ -1531,6 +1540,7 @@ def estimateLibraryComplexity(infile, outfile): P.run(statement) +@active_if(PARAMS["sql_load_concatenated_table"]) @active_if(PAIRED) @merge(estimateLibraryComplexity, "qc.dir/qc_library_complexity.load") @@ -1586,6 +1596,7 @@ def alignmentSummaryMetrics(infile, outfile): P.run(statement) +@active_if(PARAMS["sql_load_concatenated_table"]) @merge(alignmentSummaryMetrics, "qc.dir/qc_alignment_summary_metrics.load") def loadAlignmentSummaryMetrics(infiles, outfile): @@ -1657,6 +1668,7 @@ def insertSizeMetricsAndHistograms(infile, outfiles): P.run(statement) +@active_if(PARAMS["sql_load_concatenated_table"]) @merge(insertSizeMetricsAndHistograms, "qc.dir/qc_insert_size_metrics.load") def loadInsertSizeMetrics(infiles, outfile): @@ -1680,6 +1692,7 @@ def loadInsertSizeMetrics(infiles, outfile): P.run(statement) +@active_if(PARAMS["sql_load_concatenated_table"]) @merge(insertSizeMetricsAndHistograms, "qc.dir/qc_insert_size_histogram.load") def loadInsertSizeHistograms(infiles, outfile): @@ -1735,6 +1748,7 @@ def spikeVsGenome(infile, outfile): P.run(statement) +@active_if(PARAMS["sql_load_concatenated_table"]) @merge(spikeVsGenome, "qc.dir/qc_spike_vs_genome.load") def loadSpikeVsGenome(infiles, outfile): @@ -1751,6 +1765,7 @@ def loadSpikeVsGenome(infiles, outfile): # ------------------------- No. genes detected ------------------------------ # +@active_if(PARAMS["sql_load_concatenated_table"]) @active_if(fastqMode) @follows(mkdir("qc.dir/"), loadSalmonTPMs, loadEnsemblAnnotations) @files("salmon.dir/salmon.genes.tpms.load", @@ -1786,6 +1801,7 @@ def numberGenesDetectedSalmon(infile, outfile): count_df.to_csv(outfile, index=False, sep="\t") +@active_if(PARAMS["sql_load_concatenated_table"]) @active_if(fastqMode) @follows(annotations) @files(numberGenesDetectedSalmon, @@ -1799,6 +1815,7 @@ def loadNumberGenesDetectedSalmon(infile, outfile): options='-i "sample_id"') +@active_if(PARAMS["sql_load_concatenated_table"]) @follows(annotations) @files(loadFeatureCounts, "qc.dir/number.genes.detected.featurecounts") @@ -1832,6 +1849,7 @@ def numberGenesDetectedFeatureCounts(infile, outfile): count_df.to_csv(outfile, index=False, sep="\t") +@active_if(PARAMS["sql_load_concatenated_table"]) @files(numberGenesDetectedFeatureCounts, "qc.dir/qc_no_genes_featurecounts.load") def loadNumberGenesDetectedFeatureCounts(infile, outfile): @@ -1869,6 +1887,7 @@ def fractionReadsSpliced(infile, outfile): P.run(statement) +@active_if(PARAMS["sql_load_concatenated_table"]) @merge(fractionReadsSpliced, "qc.dir/qc_fraction_spliced.load") def loadFractionReadsSpliced(infiles, outfile): @@ -1906,6 +1925,7 @@ def loadSampleInformation(infile, outfile): P.load(infile, outfile) +@active_if(PARAMS["sql_load_concatenated_table"]) @merge([loadSampleInformation, loadCollectRnaSeqMetrics, loadThreePrimeBias, @@ -2001,6 +2021,7 @@ def qcSummary(infiles, outfile): df.to_csv(outfile, sep="\t", index=False) +@active_if(PARAMS["sql_load_concatenated_table"]) @transform(qcSummary, suffix(".txt"), ".load") diff --git a/pipelines/pipeline_scrnaseq/pipeline.yml b/pipelines/pipeline_scrnaseq/pipeline.yml index 135dd07..6a444a8 100644 --- a/pipelines/pipeline_scrnaseq/pipeline.yml +++ b/pipelines/pipeline_scrnaseq/pipeline.yml @@ -288,3 +288,5 @@ sql: # RAM required for high memory operations (e.g. 5000M) himem: 10000M + + load_concatenated_table: True