diff --git a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/AlignerIndexUtil.java b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/AlignerIndexUtil.java index ddbaba6ed..1e55f7e1a 100644 --- a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/AlignerIndexUtil.java +++ b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/AlignerIndexUtil.java @@ -40,7 +40,7 @@ public static boolean copyIndexIfExists(PipelineContext ctx, AlignmentOutputImpl public static boolean copyIndexIfExists(PipelineContext ctx, AlignmentOutputImpl output, String localName, String webserverName, ReferenceGenome genome, boolean forceCopyLocal) throws PipelineJobException { - ctx.getLogger().debug("copying index to shared dir if exists: " + localName); + ctx.getLogger().debug("checking if index exists: " + localName + ". copy local: " + forceCopyLocal); if (ctx.getWorkDir() == null) { throw new PipelineJobException("PipelineContext.getWorkDir() is null"); diff --git a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/PedigreeToolParameterDescriptor.java b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/PedigreeToolParameterDescriptor.java index 3bb26661b..01e0cd580 100644 --- a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/PedigreeToolParameterDescriptor.java +++ b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/PedigreeToolParameterDescriptor.java @@ -6,11 +6,25 @@ public class PedigreeToolParameterDescriptor extends ToolParameterDescriptor { public static String NAME = "pedigreeSource"; + private final boolean _isRequired; + public PedigreeToolParameterDescriptor() + { + this(true); + } + + public PedigreeToolParameterDescriptor(final boolean isRequired) { super(null, NAME, "Pedigree Source", "This is the table used for pedigree data", "laboratory-pedigreeselectorfield", "laboratory.subjects", new JSONObject(){{ - put("allowBlank", false); + put("allowBlank", !isRequired); }}); + + _isRequired = isRequired; + } + + public boolean isRequired() + { + return _isRequired; } public static String getClientDependencyPath() diff --git a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/ReferenceGenome.java b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/ReferenceGenome.java index eeb5ec226..f0e943867 100644 --- a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/ReferenceGenome.java +++ b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/ReferenceGenome.java @@ -79,7 +79,7 @@ public interface ReferenceGenome extends Serializable /** * @param name The name used by the aligner to identify its cached directory - * @return The folder expected containing the cached index, which is not guarenteed to exist. See AlignerIndexUtil for related methods. + * @return The folder expected containing the cached index, which is not guaranteed to exist. See AlignerIndexUtil for related methods. */ File getAlignerIndexDir(String name); diff --git a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/SequencePipelineService.java b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/SequencePipelineService.java index dd19bba68..b57555a1c 100644 --- a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/SequencePipelineService.java +++ b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/SequencePipelineService.java @@ -100,6 +100,8 @@ static public void setInstance(SequencePipelineService instance) */ abstract public String getDockerCommand(); + abstract public boolean useLocalDockerContainerStorage(); + abstract public Collection getDockerVolumes(Container c); /** diff --git a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/ToolParameterDescriptor.java b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/ToolParameterDescriptor.java index dc46f0fc8..12a054d57 100644 --- a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/ToolParameterDescriptor.java +++ b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/ToolParameterDescriptor.java @@ -41,7 +41,7 @@ public class ToolParameterDescriptor private String _label; private String _description; private final String _fieldXtype; - private final JSONObject _additionalExtConfig; + protected JSONObject _additionalExtConfig; private final Object _defaultValue; public ToolParameterDescriptor(CommandLineParam ca, String name, String label, String description, String fieldXtype, @Nullable Object defaultValue, @Nullable JSONObject additionalExtConfig) diff --git a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/VariantProcessingStep.java b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/VariantProcessingStep.java index 4b77e5dd7..545815b0b 100644 --- a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/VariantProcessingStep.java +++ b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/VariantProcessingStep.java @@ -72,21 +72,46 @@ interface Output extends PipelineStepOutput File getVCF(); } - interface RequiresPedigree + interface RequiresPedigree extends SupportsPedigree + { + @Override + default boolean isRequired() + { + return true; + } + } + + interface SupportsPedigree { default String getDemographicsProviderName(PipelineStepProvider provider, PipelineJob job, int stepIdx) { return provider.getParameterByName(PedigreeToolParameterDescriptor.NAME).extractValue(job, provider, stepIdx, String.class); } - default DemographicsProvider getDemographicsProvider(PipelineStepProvider provider, PipelineJob job, int stepIdx) + default @Nullable DemographicsProvider getDemographicsProvider(PipelineStepProvider provider, PipelineJob job, int stepIdx) { if (PipelineJobService.get().getLocationType() != PipelineJobService.LocationType.WebServer) { throw new IllegalStateException("getDemographicsProvider() can only be run from the webserver"); } - return LaboratoryService.get().getDemographicsProviderByName(job.getContainer(), job.getUser(), getDemographicsProviderName(provider, job, stepIdx)); + String dpn = getDemographicsProviderName(provider, job, stepIdx); + if (dpn == null) + { + if (isRequired()) + { + throw new IllegalArgumentException("The DemographicsProvider name cannot be null"); + } + + return null; + } + + return LaboratoryService.get().getDemographicsProviderByName(job.getContainer(), job.getUser(), dpn); + } + + default boolean isRequired() + { + return false; } } diff --git a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/run/DockerWrapper.java b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/run/DockerWrapper.java index a91cea5c7..dfc2bacc1 100644 --- a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/run/DockerWrapper.java +++ b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/run/DockerWrapper.java @@ -1,5 +1,6 @@ package org.labkey.api.sequenceanalysis.run; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; @@ -7,6 +8,7 @@ import org.labkey.api.sequenceanalysis.pipeline.PipelineContext; import org.labkey.api.sequenceanalysis.pipeline.PipelineOutputTracker; import org.labkey.api.sequenceanalysis.pipeline.SequencePipelineService; +import org.labkey.api.util.FileUtil; import org.labkey.api.writer.PrintWriters; import java.io.File; @@ -28,7 +30,8 @@ public class DockerWrapper extends AbstractCommandWrapper private final PipelineContext _ctx; private File _tmpDir = null; private String _entryPoint = null; - private boolean _runPrune = true; + private boolean _runPrune = false; + private boolean _useLocalContainerStorage; private String _alternateUserHome = null; private final Map _dockerEnvironment = new HashMap<>(); @@ -37,6 +40,8 @@ public DockerWrapper(String containerName, Logger log, PipelineContext ctx) super(log); _containerName = containerName; _ctx = ctx; + + _useLocalContainerStorage = SequencePipelineService.get().useLocalDockerContainerStorage(); } public void setAlternateUserHome(String alternateUserHome) @@ -59,6 +64,11 @@ public void setRunPrune(boolean runPrune) _runPrune = runPrune; } + public void setUseLocalContainerStorage(boolean useLocalContainerStorage) + { + _useLocalContainerStorage = useLocalContainerStorage; + } + public void executeWithDocker(List containerArgs, File workDir, PipelineOutputTracker tracker) throws PipelineJobException { executeWithDocker(containerArgs, workDir, tracker, null); @@ -79,14 +89,34 @@ public void executeWithDocker(List containerArgs, File workDir, Pipeline writer.println("set -e"); writer.println("DOCKER='" + SequencePipelineService.get().getDockerCommand() + "'"); - writer.println("$DOCKER pull " + _containerName); + + writer.println("IMAGE_EXISTS=`$DOCKER images -q \"" + getEffectiveContainerName() + "\" | wc -l`"); + writer.println("LOCAL=not_present"); + writer.println("if [[ $IMAGE_EXISTS > 0 ]];then"); + writer.println("\tLOCAL=`docker inspect --format='{{.Digest}}' " + getEffectiveContainerName() + "`"); + writer.println("fi"); + writer.println("LATEST=`regctl image digest --list " + getEffectiveContainerName() + "`"); + writer.println("if [ $LOCAL != $LATEST ];then"); + writer.println("\t$DOCKER pull " + getLocalStorageArgs() + getEffectiveContainerName()); + writer.println("else"); + writer.println("\techo 'Image up to date'"); + writer.println("fi"); + if (_runPrune) { - writer.println("$DOCKER image prune -f"); + writer.println("$DOCKER image prune " + getLocalStorageArgs() + "-f"); } writer.println("$DOCKER run --rm=true \\"); writer.println("\t--group-add keep-groups \\"); + writer.println("\t--transient-store \\"); + + if (_useLocalContainerStorage) + { + getLogger().debug("Using local container storage: " + getLocalContainerDir().getPath()); + prepareLocalStorage(); + writer.println("\t" + getLocalStorageArgs() + "\\"); + } // NOTE: getDockerVolumes() should be refactored to remove the -v and this logic should be updated accordingly: File homeDir = new File(System.getProperty("user.home")); @@ -149,7 +179,7 @@ public void executeWithDocker(List containerArgs, File workDir, Pipeline { writer.println("\t-e " + key + "='" + _dockerEnvironment.get(key) + "' \\"); } - writer.println("\t" + _containerName + " \\"); + writer.println("\t" + getEffectiveContainerName() + " \\"); writer.println("\t" + dockerBashScript.getPath()); writer.println("DOCKER_EXIT_CODE=$?"); writer.println("echo 'Docker run exit code: '$DOCKER_EXIT_CODE"); @@ -170,6 +200,23 @@ public void executeWithDocker(List containerArgs, File workDir, Pipeline localBashScript.setExecutable(true); dockerBashScript.setExecutable(true); execute(Arrays.asList("/bin/bash", localBashScript.getPath())); + + if (_useLocalContainerStorage) + { + try + { + FileUtils.deleteDirectory(getLocalContainerDir()); + } + catch (IOException e) + { + throw new PipelineJobException(e); + } + } + } + + private String getEffectiveContainerName() + { + return _containerName; } public void addToDockerEnvironment(String key, String value) @@ -203,4 +250,39 @@ private Collection inspectInputFiles(Collection inputFiles) return Collections.emptySet(); } + + private File getLocalContainerDir() + { + return new File(SequencePipelineService.get().getJavaTempDir(), "containers"); + } + + private File prepareLocalStorage() throws PipelineJobException + { + try + { + if (getLocalContainerDir().exists()) + { + getLogger().debug("Deleting existing container dir: " + getLocalContainerDir()); + FileUtils.deleteDirectory(getLocalContainerDir()); + } + + FileUtil.createDirectory(getLocalContainerDir().toPath()); + + return getLocalContainerDir(); + } + catch (IOException e) + { + throw new PipelineJobException(e); + } + } + + private String getLocalStorageArgs() + { + if (!_useLocalContainerStorage) + { + return ""; + } + + return "--root=" + getLocalContainerDir().getPath() + " "; + } } diff --git a/SequenceAnalysis/pipeline_code/extra_tools_install.sh b/SequenceAnalysis/pipeline_code/extra_tools_install.sh index 33686252b..823aa7dcd 100755 --- a/SequenceAnalysis/pipeline_code/extra_tools_install.sh +++ b/SequenceAnalysis/pipeline_code/extra_tools_install.sh @@ -228,4 +228,80 @@ then python3 -m pip install --user multiqc else echo "Already installed" -fi \ No newline at end of file +fi + + +if [[ ! -e ${LKTOOLS_DIR}/gt || ! -z $FORCE_REINSTALL ]]; +then + echo "Cleaning up previous installs" + rm -Rf gt* + rm -Rf $LKTOOLS_DIR/gt* + + wget https://github.com/genometools/genometools/releases/download/v1.6.5/gt-1.6.5-Linux_x86_64-64bit-complete.tar.gz + tar -xf gt-1.6.5-Linux_x86_64-64bit-complete.tar.gz + + install ./gt-1.6.5-Linux_x86_64-64bit-complete/bin/gt $LKTOOLS_DIR/ + mv ./gt-1.6.5-Linux_x86_64-64bit-complete/gtdata $LKTOOLS_DIR/ +else + echo "Already installed" +fi + +if [[ ! -e ${LKTOOLS_DIR}/king || ! -z $FORCE_REINSTALL ]]; +then + echo "Cleaning up previous installs" + rm -Rf king* + rm -Rf Linux-king* + rm -Rf $LKTOOLS_DIR/king* + + wget https://www.kingrelatedness.com/Linux-king.tar.gz + tar -xf Linux-king.tar.gz + + install king $LKTOOLS_DIR/ +else + echo "Already installed" +fi + +if [[ ! -e ${LKTOOLS_DIR}/regctl || ! -z $FORCE_REINSTALL ]]; +then + echo "Cleaning up previous installs" + rm -Rf regctl* + rm -Rf $LKTOOLS_DIR/regctl* + + curl -L https://github.com/regclient/regclient/releases/latest/download/regctl-linux-amd64 > regctl + chmod 755 regctl + + install regctl $LKTOOLS_DIR/ +else + echo "Already installed" +fi + +if [[ ! -e ${LKTOOLS_DIR}/svtyper || ! -z $FORCE_REINSTALL ]]; +then + echo "Cleaning up previous installs" + rm -Rf $LKTOOLS_DIR/svtyper* + + # NOTE: this fork is used to ensure python3 compatibility + #python3 -m pip install --user git+https://github.com/hall-lab/svtyper.git + python3 -m pip install --user git+https://github.com/bbimber/svtyper.git + + SVTYPER=`which svtyper` + ln -s $SVTYPER ${LKTOOLS_DIR}/svtyper + + SVTYPER=`which svtyper-sso` + ln -s $SVTYPER ${LKTOOLS_DIR}/svtyper-sso +else + echo "Already installed" +fi + +if [[ ! -e ${LKTOOLS_DIR}/graphtyper || ! -z $FORCE_REINSTALL ]]; +then + echo "Cleaning up previous installs" + rm -Rf $LKTOOLS_DIR/graphtyper* + + wget https://github.com/DecodeGenetics/graphtyper/releases/download/v2.7.7/graphtyper + chmod a+x graphtyper + + mv ./graphtyper $LKTOOLS_DIR/ +else + echo "Already installed" +fi diff --git a/SequenceAnalysis/pipeline_code/sequence_tools_install.sh b/SequenceAnalysis/pipeline_code/sequence_tools_install.sh index d3d768a04..3e31985f4 100755 --- a/SequenceAnalysis/pipeline_code/sequence_tools_install.sh +++ b/SequenceAnalysis/pipeline_code/sequence_tools_install.sh @@ -985,15 +985,15 @@ then rm -Rf $LKTOOLS_DIR/blast_formatter rm -Rf $LKTOOLS_DIR/makeblastdb - wget $WGET_OPTS ftp://ftp.ncbi.nlm.nih.gov/blast/executables/blast+/2.2.31/ncbi-blast-2.2.31+-x64-linux.tar.gz - gunzip ncbi-blast-2.2.31+-x64-linux.tar.gz - tar -xf ncbi-blast-2.2.31+-x64-linux.tar - gzip ncbi-blast-2.2.31+-x64-linux.tar - - install ./ncbi-blast-2.2.31+/bin/blastn $LKTOOLS_DIR/blastn - install ./ncbi-blast-2.2.31+/bin/blast_formatter $LKTOOLS_DIR/blast_formatter - install ./ncbi-blast-2.2.31+/bin/makeblastdb $LKTOOLS_DIR/makeblastdb - install ./ncbi-blast-2.2.31+/bin/makembindex $LKTOOLS_DIR/makembindex + wget $WGET_OPTS ftp://ftp.ncbi.nlm.nih.gov/blast/executables/blast+/2.16.0/ncbi-blast-2.16.0+-x64-linux.tar.gz + gunzip ncbi-blast-2.16.0+-x64-linux.tar.gz + tar -xf ncbi-blast-2.16.0+-x64-linux.tar + gzip ncbi-blast-2.16.0+-x64-linux.tar + + install ./ncbi-blast-2.16.0+/bin/blastn $LKTOOLS_DIR/blastn + install ./ncbi-blast-2.16.0+/bin/blast_formatter $LKTOOLS_DIR/blast_formatter + install ./ncbi-blast-2.16.0+/bin/makeblastdb $LKTOOLS_DIR/makeblastdb + install ./ncbi-blast-2.16.0+/bin/makembindex $LKTOOLS_DIR/makembindex else echo "Already installed" fi diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisMaintenanceTask.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisMaintenanceTask.java index e6b593698..57ebce8cb 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisMaintenanceTask.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisMaintenanceTask.java @@ -63,7 +63,7 @@ public SequenceAnalysisMaintenanceTask() @Override public String getDescription() { - return "Delete SequenceAnalysis Artifacts"; + return "SequenceAnalysis File Maintenance"; } @Override @@ -293,12 +293,12 @@ else if (sf.getFilePath() == null) private void processContainer(Container c, Logger log) throws IOException, PipelineJobException { + if (!c.isWorkbook()) + log.info("processing container: " + c.getPath()); + PipeRoot root = PipelineService.get().getPipelineRootSetting(c); if (root != null && !root.isCloudRoot()) { - if (!c.isWorkbook()) - log.info("processing container: " + c.getPath()); - //first sequences File sequenceDir = new File(root.getRootPath(), ".sequences"); TableInfo tableRefNtSequences = SequenceAnalysisSchema.getTable(SequenceAnalysisSchema.TABLE_REF_NT_SEQUENCES); diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisModule.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisModule.java index 61c9a0fe1..2d7ea845f 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisModule.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisModule.java @@ -98,9 +98,11 @@ import org.labkey.sequenceanalysis.run.alignment.Bowtie2Wrapper; import org.labkey.sequenceanalysis.run.alignment.BowtieWrapper; import org.labkey.sequenceanalysis.run.alignment.GSnapWrapper; +import org.labkey.sequenceanalysis.run.alignment.Graphtyper; import org.labkey.sequenceanalysis.run.alignment.MosaikWrapper; import org.labkey.sequenceanalysis.run.alignment.ParagraphStep; import org.labkey.sequenceanalysis.run.alignment.Pbmm2Wrapper; +import org.labkey.sequenceanalysis.run.alignment.SVTyperStep; import org.labkey.sequenceanalysis.run.alignment.StarWrapper; import org.labkey.sequenceanalysis.run.alignment.VulcanWrapper; import org.labkey.sequenceanalysis.run.analysis.BamIterator; @@ -403,6 +405,8 @@ public static void registerPipelineSteps() SequenceAnalysisService.get().registerFileHandler(new DeepVariantHandler()); SequenceAnalysisService.get().registerFileHandler(new GLNexusHandler()); SequenceAnalysisService.get().registerFileHandler(new ParagraphStep()); + SequenceAnalysisService.get().registerFileHandler(new SVTyperStep()); + SequenceAnalysisService.get().registerFileHandler(new Graphtyper()); SequenceAnalysisService.get().registerFileHandler(new UpdateReadsetFilesHandler()); SequenceAnalysisService.get().registerReadsetHandler(new MultiQCHandler()); diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequencePipelineServiceImpl.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequencePipelineServiceImpl.java index bfaaaba78..60f02bd15 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequencePipelineServiceImpl.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequencePipelineServiceImpl.java @@ -459,6 +459,18 @@ public String getDockerCommand() return "docker"; } + @Override + public boolean useLocalDockerContainerStorage() + { + String value = PipelineJobService.get().getConfigProperties().getSoftwarePackagePath("USE_LOCAL_DOCKER_STORAGE"); + if (StringUtils.trimToNull(value) == null) + { + return false; + } + + return Boolean.parseBoolean(value); + } + @Override public Collection getDockerVolumes(Container c) { diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/CacheAlignerIndexesTask.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/CacheAlignerIndexesTask.java index 9663f4570..08fcaca50 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/CacheAlignerIndexesTask.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/CacheAlignerIndexesTask.java @@ -63,7 +63,7 @@ public List getProtocolActionNames() } @Override - public PipelineJob.Task createTask(PipelineJob job) + public PipelineJob.Task createTask(PipelineJob job) { return new CacheAlignerIndexesTask(this, job); } diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/OrphanFilePipelineJob.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/OrphanFilePipelineJob.java index 9ffea44dc..026177561 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/OrphanFilePipelineJob.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/OrphanFilePipelineJob.java @@ -396,11 +396,10 @@ public void getOrphanFilesForContainer(Container c, User u, Set orphanFile } } - //TODO: look for .deleted and /archive File deletedDir = new File(root.getRootPath().getParentFile(), ".deleted"); if (deletedDir.exists()) { - messages.add("## .deleted dir found: " + deletedDir.getPath()); + getJob().getLogger().warn("WARNING: .deleted dir found: " + deletedDir.getPath()); } File assayData = new File(root.getRootPath(), "assaydata"); @@ -513,12 +512,6 @@ private void getOrphanFilesForDirectory(Set knownExpDatas, Map knownExpDatas, Map (200*1024*1024)) + { + getJob().getLogger().warn("WARNING: Extremely large log file: " + f.getPath() + ", " + FileUtils.byteCountToDisplaySize(f.length())); + } + + if (f.getName().endsWith(".rds")) + { + if (!dataMap.containsKey(f.toURI())) + { + getJob().getLogger().warn("WARNING: Unknown RDS file: " + f.getPath()); + } + } + //orphan index if (f.getPath().toLowerCase().endsWith(".bai") || f.getPath().toLowerCase().endsWith(".tbi") || f.getPath().toLowerCase().endsWith(".idx") || f.getPath().toLowerCase().endsWith(".crai")) { diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java index c29fd8e05..1d28158d9 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java @@ -259,9 +259,13 @@ public static void initVariantProcessing(PipelineJob job, SequenceAnalysisJobSup } } - if (stepCtx.getProvider() instanceof VariantProcessingStep.RequiresPedigree rp) + if (stepCtx.getProvider() instanceof VariantProcessingStep.SupportsPedigree rp) { - demographicsProviders.add(rp.getDemographicsProvider(stepCtx.getProvider(), job, stepCtx.getStepIdx())); + DemographicsProvider dp = rp.getDemographicsProvider(stepCtx.getProvider(), job, stepCtx.getStepIdx()); + if (dp != null) + { + demographicsProviders.add(dp); + } } if (useScatterGather) diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/SequenceAlignmentTask.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/SequenceAlignmentTask.java index b254cb86d..19ea891a7 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/SequenceAlignmentTask.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/SequenceAlignmentTask.java @@ -702,6 +702,20 @@ private void alignSet(Readset rs, String basename, Map sraIDs = new HashSet<>(); for (ReadData rd : rs.getReadData()) { @@ -1933,14 +1973,29 @@ else if (sraIDs.contains(rd.getSra_accession())) continue; } - File outDir = new File(getHelper().getWorkingDirectory(), "cachedReadData"); - getTaskFileManagerImpl().addDeferredIntermediateFile(outDir); + File unzippedOutDir = new File(SequencePipelineService.get().getJavaTempDir(), "cachedReadDataGz"); + getTaskFileManagerImpl().addIntermediateFile(unzippedOutDir); + if (unzippedOutDir.exists()) + { + try + { + FileUtils.deleteDirectory(unzippedOutDir); + } + catch (IOException e) + { + throw new PipelineJobException(e); + } + } + + File outDir = getCachedReadDataDir(); + getTaskFileManagerImpl().addDeferredIntermediateFile(outDir); // NOTE: this must be deferred so it remains until the end File doneFile = new File(outDir, rd.getSra_accession() + ".done"); sraIDs.add(rd.getSra_accession()); RestoreSraDataHandler.FastqDumpWrapper sra = new RestoreSraDataHandler.FastqDumpWrapper(getJob().getLogger()); if (doneFile.exists()) { + getPipelineJob().getLogger().debug("Re-using existing SRA download: " + rd.getSra_accession()); rdi.setFile(new File(outDir, rd.getSra_accession() + (rd.isPairedEnd() ? "_1" : "") + ".fastq.gz"), 1); if (rd.getFileId2() != null) { @@ -1949,18 +2004,53 @@ else if (sraIDs.contains(rd.getSra_accession())) } else { - if (!outDir.exists()) + try { - outDir.mkdirs(); - } + if (outDir.exists()) + { + getHelper().getLogger().debug("Inspecting existing folder: " + outDir.getPath()); + Arrays.stream(outDir.listFiles()).forEach(f -> { + if (f.getName().startsWith(rd.getSra_accession())) + { + getPipelineJob().getLogger().debug("Deleting existing file: " + f.getPath()); + f.delete(); + } + }); + } + else + { + outDir.mkdirs(); + } - Pair downloaded = sra.downloadSra(rd.getSra_accession(), outDir, rd.isPairedEnd()); - rdi.setFile(downloaded.first, 1); - rdi.setFile(downloaded.second, 2); + Pair downloaded = sra.downloadSra(rd.getSra_accession(), unzippedOutDir, rd.isPairedEnd(), false); + File moved1 = new File(outDir, downloaded.first.getName()); + if (moved1.exists()) + { + moved1.delete(); + } + + getPipelineJob().getLogger().debug("Moving gzipped file to: " + moved1.getPath()); + FileUtils.moveFile(downloaded.first, moved1); + rdi.setFile(moved1, 1); + + if (downloaded.second != null) + { + File moved2 = new File(outDir, downloaded.second.getName()); + if (moved2.exists()) + { + moved2.delete(); + } + getPipelineJob().getLogger().debug("Moving gzipped file to: " + moved2.getPath()); + FileUtils.moveFile(downloaded.second, moved2); + rdi.setFile(moved2, 2); + } - try - { Files.touch(doneFile); + + if (unzippedOutDir.exists()) + { + FileUtils.deleteDirectory(unzippedOutDir); + } } catch (IOException e) { diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/RestoreSraDataHandler.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/RestoreSraDataHandler.java index c70726ec6..8779afda1 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/RestoreSraDataHandler.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/RestoreSraDataHandler.java @@ -384,7 +384,7 @@ public void processFilesRemote(List readsets, JobContext ctx) throws Un File expectedFile2 = rd.getFileId2() == null ? null : ctx.getSequenceSupport().getCachedData(rd.getFileId2()); FastqDumpWrapper wrapper = new FastqDumpWrapper(ctx.getLogger()); - Pair files = wrapper.downloadSra(accession, ctx.getOutputDir(), rd.isPairedEnd()); + Pair files = wrapper.downloadSra(accession, ctx.getOutputDir(), rd.isPairedEnd(), false); long lines1 = SequenceUtil.getLineCount(files.first) / 4; ctx.getJob().getLogger().debug("Reads in " + files.first.getName() + ": " + lines1); @@ -459,13 +459,16 @@ public FastqDumpWrapper(@Nullable Logger logger) super(logger); } - public Pair downloadSra(String dataset, File outDir, boolean expectPaired) throws PipelineJobException + public Pair downloadSra(String dataset, File outDir, boolean expectPaired, boolean includeTechnical) throws PipelineJobException { List args = new ArrayList<>(); args.add(getExe().getPath()); - args.add("-S"); - args.add("--include-technical"); + // NOTE: we probably want the --split-3 behavior, which is the default for fasterq-dump + if (includeTechnical) + { + args.add("--include-technical"); + } Integer threads = SequencePipelineService.get().getMaxThreads(getLogger()); if (threads != null) diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAMem2Wrapper.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAMem2Wrapper.java index d7dc80fd1..73d1e6412 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAMem2Wrapper.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAMem2Wrapper.java @@ -42,9 +42,23 @@ public static class BWAMem2AlignmentStep extends BWAMemAlignmentStep public BWAMem2AlignmentStep(AlignmentStepProvider provider, PipelineContext ctx) { super(provider, ctx, new BWAMem2Wrapper(ctx.getLogger())); + + _addBtwswArg = false; + } + + @Override + public String getIndexCachedDirName(PipelineJob job) + { + return "bwamem2"; } } + @Override + protected String getIndexDirName() + { + return("bwamem2"); + } + public static class Provider extends AbstractAlignmentStepProvider { public Provider() @@ -60,6 +74,8 @@ public Provider() }}, null) ), null, "https://github.com/bwa-mem2/bwa-mem2", true, true); + + setAlwaysCacheIndex(true); } @Override diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAMemWrapper.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAMemWrapper.java index 68599ecdd..3b2962329 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAMemWrapper.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAMemWrapper.java @@ -116,7 +116,7 @@ public void performMemAlignment(PipelineJob job, AlignmentOutputImpl output, Fil } appendThreads(job, bwaArgs); - bwaArgs.add("'" + new File(referenceGenome.getAlignerIndexDir("bwa"), FileUtil.getBaseName(referenceGenome.getWorkingFastaFile().getName()) + ".bwa.index").getPath() + "'"); + bwaArgs.add("'" + new File(referenceGenome.getAlignerIndexDir(getIndexDirName()), FileUtil.getBaseName(referenceGenome.getWorkingFastaFile().getName()) + ".bwa.index").getPath() + "'"); bwaArgs.add("'" + inputFastq1.getPath() + "'"); if (inputFastq2 != null) diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAWrapper.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAWrapper.java index d9662dea8..70e240a8c 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAWrapper.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/BWAWrapper.java @@ -41,7 +41,6 @@ public BWAWrapper(@Nullable Logger logger) super(logger); } - public static class Provider extends AbstractAlignmentStepProvider { public Provider() @@ -77,13 +76,15 @@ public String getDescription() @Override public AlignmentStep create(PipelineContext context) { - return new BWAAlignmentStep(this, context, new BWAWrapper(context.getLogger())); + return new BWAAlignmentStep<>(this, context, new BWAWrapper(context.getLogger())); } } public static class BWAAlignmentStep extends AbstractAlignmentPipelineStep implements AlignmentStep { - public BWAAlignmentStep(AlignmentStepProvider provider, PipelineContext ctx, WrapperType wrapper) + protected boolean _addBtwswArg = true; + + public BWAAlignmentStep(AlignmentStepProvider provider, PipelineContext ctx, WrapperType wrapper) { super(provider, ctx, wrapper); } @@ -103,7 +104,7 @@ public String getIndexCachedDirName(PipelineJob job) @Override public IndexOutput createIndex(ReferenceGenome referenceGenome, File outputDir) throws PipelineJobException { - getPipelineCtx().getLogger().info("Creating BWA index"); + getPipelineCtx().getLogger().info("Creating " + getProvider().getName() + " index"); IndexOutputImpl output = new IndexOutputImpl(referenceGenome); File indexDir = new File(outputDir, getIndexCachedDirName(getPipelineCtx().getJob())); @@ -116,8 +117,11 @@ public IndexOutput createIndex(ReferenceGenome referenceGenome, File outputDir) args.add("index"); //necessary for DBs larger than 2gb - args.add("-a"); - args.add("bwtsw"); + if (_addBtwswArg) + { + args.add("-a"); + args.add("bwtsw"); + } args.add("-p"); @@ -135,7 +139,7 @@ public IndexOutput createIndex(ReferenceGenome referenceGenome, File outputDir) output.appendOutputs(referenceGenome.getWorkingFastaFile(), indexDir); //recache if not already - AlignerIndexUtil.saveCachedIndex(hasCachedIndex, getPipelineCtx(), indexDir, "bwa", referenceGenome); + AlignerIndexUtil.saveCachedIndex(hasCachedIndex, getPipelineCtx(), indexDir, getIndexCachedDirName(getPipelineCtx().getJob()), referenceGenome); return output; } @@ -147,7 +151,7 @@ public final AlignmentOutput performAlignment(Readset rs, List inputFastqs File inputFastq2 = assertSingleFile(inputFastqs2); AlignmentOutputImpl output = new AlignmentOutputImpl(); - AlignerIndexUtil.copyIndexIfExists(this.getPipelineCtx(), output, "bwa", referenceGenome); + AlignerIndexUtil.copyIndexIfExists(this.getPipelineCtx(), output, getIndexCachedDirName(getPipelineCtx().getJob()), referenceGenome); doPerformAlignment(output, inputFastq1, inputFastq2, outputDirectory, referenceGenome, basename, rs, readGroupId, platformUnit); output.addCommandsExecuted(getWrapper().getCommandsExecuted()); @@ -193,7 +197,7 @@ private File runBWAAln(PipelineJob job, ReferenceGenome referenceGenome, File in args.add("aln"); appendThreads(job, args); args.addAll(bwaAlnArgs); - args.add(new File(referenceGenome.getAlignerIndexDir("bwa"), FileUtil.getBaseName(referenceGenome.getWorkingFastaFile().getName()) + ".bwa.index").getPath()); + args.add(new File(referenceGenome.getAlignerIndexDir(getIndexDirName()), FileUtil.getBaseName(referenceGenome.getWorkingFastaFile().getName()) + ".bwa.index").getPath()); args.add(inputFile.getPath()); File output = new File(getOutputDir(inputFile), inputFile.getName() + ".sai"); @@ -226,12 +230,12 @@ protected void performBwaAlignment(PipelineJob job, AlignmentOutputImpl output, throw new PipelineJobException("Reference FASTA does not exist: " + referenceGenome.getWorkingFastaFile().getPath()); } - File expectedIndex = new File(referenceGenome.getAlignerIndexDir("bwa"), FileUtil.getBaseName(referenceGenome.getWorkingFastaFile()) + ".bwa.index.sa"); + File expectedIndex = new File(referenceGenome.getAlignerIndexDir(getIndexDirName()), FileUtil.getBaseName(referenceGenome.getWorkingFastaFile()) + ".bwa.index.sa"); if (!expectedIndex.exists()) { throw new PipelineJobException("Expected index does not exist: " + expectedIndex); } - args.add(new File(referenceGenome.getAlignerIndexDir("bwa"), FileUtil.getBaseName(referenceGenome.getWorkingFastaFile().getName()) + ".bwa.index").getPath()); + args.add(new File(referenceGenome.getAlignerIndexDir(getIndexDirName()), FileUtil.getBaseName(referenceGenome.getWorkingFastaFile().getName()) + ".bwa.index").getPath()); //add SAI args.add(sai1.getPath()); @@ -279,4 +283,9 @@ public File getExe() { return SequencePipelineService.get().getExeForPackage("BWAPATH", "bwa"); } + + protected String getIndexDirName() + { + return("bwa"); + } } diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/Graphtyper.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/Graphtyper.java new file mode 100644 index 000000000..4c0b47120 --- /dev/null +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/Graphtyper.java @@ -0,0 +1,203 @@ +package org.labkey.sequenceanalysis.run.alignment; + +import htsjdk.samtools.util.CloseableIterator; +import htsjdk.variant.variantcontext.VariantContext; +import htsjdk.variant.vcf.VCFFileReader; +import org.json.JSONObject; +import org.labkey.api.module.ModuleLoader; +import org.labkey.api.pipeline.PipelineJob; +import org.labkey.api.pipeline.PipelineJobException; +import org.labkey.api.pipeline.RecordedAction; +import org.labkey.api.sequenceanalysis.SequenceAnalysisService; +import org.labkey.api.sequenceanalysis.SequenceOutputFile; +import org.labkey.api.sequenceanalysis.pipeline.AbstractParameterizedOutputHandler; +import org.labkey.api.sequenceanalysis.pipeline.ReferenceGenome; +import org.labkey.api.sequenceanalysis.pipeline.SequenceAnalysisJobSupport; +import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler; +import org.labkey.api.sequenceanalysis.pipeline.SequencePipelineService; +import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor; +import org.labkey.api.sequenceanalysis.run.AbstractCommandWrapper; +import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper; +import org.labkey.api.writer.PrintWriters; +import org.labkey.sequenceanalysis.SequenceAnalysisModule; +import org.labkey.sequenceanalysis.util.SequenceUtil; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +//https://github.com/hall-lab/svtyper + +public class Graphtyper extends AbstractParameterizedOutputHandler +{ + public Graphtyper() + { + super(ModuleLoader.getInstance().getModule(SequenceAnalysisModule.class), "Graphtyper (SV Genotyping)", "This will run Graphtyper on one or more BAM files to genotype SVs", null, Arrays.asList( + ToolParameterDescriptor.createExpDataParam("svVCF", "Input VCF", "This is the DataId of the VCF containing the SVs to genotype", "ldk-expdatafield", new JSONObject() + {{ + put("allowBlank", false); + }}, null), + ToolParameterDescriptor.create("useOutputFileContainer", "Submit to Source File Workbook", "If checked, each job will be submitted to the same workbook as the input file, as opposed to submitting all jobs to the same workbook. This is primarily useful if submitting a large batch of files to process separately. This only applies if 'Run Separately' is selected.", "checkbox", new JSONObject(){{ + put("checked", true); + }}, true) + )); + } + + @Override + public boolean doSplitJobs() + { + return true; + } + + @Override + public boolean canProcess(SequenceOutputFile o) + { + return o.getFile() != null && o.getFile().exists() && SequenceUtil.FILETYPE.bamOrCram.getFileType().isType(o.getFile()); + } + + @Override + public boolean doRunRemote() + { + return true; + } + + @Override + public boolean doRunLocal() + { + return false; + } + + @Override + public SequenceOutputProcessor getProcessor() + { + return new Processor(); + } + + public static class Processor implements SequenceOutputProcessor + { + @Override + public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport support, List inputFiles, JSONObject params, File outputDir, List actions, List outputsToCreate) throws UnsupportedOperationException, PipelineJobException + { + + } + + @Override + public void processFilesRemote(List inputFiles, JobContext ctx) throws UnsupportedOperationException, PipelineJobException + { + int svVcfId = ctx.getParams().optInt("svVCF", 0); + if (svVcfId == 0) + { + throw new PipelineJobException("svVCF param was null"); + } + + File svVcf = ctx.getSequenceSupport().getCachedData(svVcfId); + if (svVcf == null) + { + throw new PipelineJobException("File not found for ID: " + svVcfId); + } + else if (!svVcf.exists()) + { + throw new PipelineJobException("Missing file: " + svVcf.getPath()); + } + + Integer threads = SequencePipelineService.get().getMaxThreads(ctx.getLogger()); + + for (SequenceOutputFile so : inputFiles) + { + List args = new ArrayList<>(); + SimpleScriptWrapper wrapper = new SimpleScriptWrapper(ctx.getLogger()); + args.add(AbstractCommandWrapper.resolveFileInPath("graphtyper", null, true).getPath()); + args.add("genotype_sv"); + + ReferenceGenome rg = ctx.getSequenceSupport().getCachedGenome(so.getLibrary_id()); + if (rg == null) + { + throw new PipelineJobException("Missing reference genome: " + so.getLibrary_id()); + } + + args.add(rg.getWorkingFastaFile().getPath()); + args.add(svVcf.toString()); + + args.add("--sam"); + args.add(so.getFile().getPath()); + + if (threads != null) + { + args.add("--threads"); + args.add(threads.toString()); + } + + args.add("--region_file"); + File regionFile = new File(ctx.getWorkingDirectory(), "regions.txt"); + args.add(regionFile.getPath()); + + generateRegionFile(svVcf, regionFile); + ctx.getFileManager().addIntermediateFile(regionFile); + wrapper.execute(args); + + File genotypes = new File(ctx.getWorkingDirectory(), "sv_results/" + SequenceAnalysisService.get().getUnzippedBaseName(so.getName()) + ".vcf.gz"); + if (!genotypes.exists()) + { + throw new PipelineJobException("Missing file: " + genotypes.getPath()); + } + + try + { + SequenceAnalysisService.get().ensureVcfIndex(genotypes, ctx.getLogger()); + } + catch (IOException e) + { + throw new PipelineJobException(e); + } + + ctx.getFileManager().addSequenceOutput(genotypes, "Graphtyper Genotypes: " + so.getName(), "Graphtyper Genoypes", so.getReadset(), null, so.getLibrary_id(), "Input VCF: " + svVcf.getName() + " (" + svVcfId + ")"); + } + } + } + + private static void generateRegionFile(File svVcf, File regionFile) throws PipelineJobException + { + try (PrintWriter writer = PrintWriters.getPrintWriter(regionFile)) + { + try (VCFFileReader reader = new VCFFileReader(svVcf, true); CloseableIterator iterator = reader.iterator()) + { + String chr = null; + int start = 1; + int end = -1; + while (iterator.hasNext()) + { + VariantContext vc = iterator.next(); + if (chr == null) + { + chr = vc.getContig(); + } + + if (!vc.getContig().equals(chr)) + { + writer.println(chr + ":" + start + "-" + end); + + // Reset + chr = vc.getContig(); + start = 1; + end = -1; + } + + start = Math.min(start, vc.getStart()); + end = Math.max(end, vc.getEnd()); + } + + if (chr != null) + { + writer.println(chr + ":" + start + "-" + end); + } + } + } + catch (IOException e) + { + throw new PipelineJobException(e); + } + } +} diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/SVTyperStep.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/SVTyperStep.java new file mode 100644 index 000000000..2c5b7a629 --- /dev/null +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/SVTyperStep.java @@ -0,0 +1,181 @@ +package org.labkey.sequenceanalysis.run.alignment; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.json.JSONObject; +import org.labkey.api.module.ModuleLoader; +import org.labkey.api.pipeline.PipelineJob; +import org.labkey.api.pipeline.PipelineJobException; +import org.labkey.api.pipeline.RecordedAction; +import org.labkey.api.sequenceanalysis.SequenceAnalysisService; +import org.labkey.api.sequenceanalysis.SequenceOutputFile; +import org.labkey.api.sequenceanalysis.pipeline.AbstractParameterizedOutputHandler; +import org.labkey.api.sequenceanalysis.pipeline.SequenceAnalysisJobSupport; +import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler; +import org.labkey.api.sequenceanalysis.pipeline.SequencePipelineService; +import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor; +import org.labkey.api.sequenceanalysis.run.AbstractCommandWrapper; +import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper; +import org.labkey.sequenceanalysis.SequenceAnalysisModule; +import org.labkey.sequenceanalysis.util.SequenceUtil; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +//https://github.com/hall-lab/svtyper + +public class SVTyperStep extends AbstractParameterizedOutputHandler +{ + public SVTyperStep() + { + super(ModuleLoader.getInstance().getModule(SequenceAnalysisModule.class), "SVTyper SV Genotyping", "This will run SVTyper on one or more BAM files to genotype SVs", null, Arrays.asList( + ToolParameterDescriptor.createExpDataParam("svVCF", "Input VCF", "This is the DataId of the VCF containing the SVs to genotype", "ldk-expdatafield", new JSONObject() + {{ + put("allowBlank", false); + }}, null), + ToolParameterDescriptor.create("useOutputFileContainer", "Submit to Source File Workbook", "If checked, each job will be submitted to the same workbook as the input file, as opposed to submitting all jobs to the same workbook. This is primarily useful if submitting a large batch of files to process separately. This only applies if 'Run Separately' is selected.", "checkbox", new JSONObject(){{ + put("checked", true); + }}, true) + )); + } + + @Override + public boolean doSplitJobs() + { + return true; + } + + @Override + public boolean canProcess(SequenceOutputFile o) + { + return o.getFile() != null && o.getFile().exists() && SequenceUtil.FILETYPE.bamOrCram.getFileType().isType(o.getFile()); + } + + @Override + public boolean doRunRemote() + { + return true; + } + + @Override + public boolean doRunLocal() + { + return false; + } + + @Override + public SequenceOutputProcessor getProcessor() + { + return new Processor(); + } + + public static class Processor implements SequenceOutputProcessor + { + @Override + public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport support, List inputFiles, JSONObject params, File outputDir, List actions, List outputsToCreate) throws UnsupportedOperationException, PipelineJobException + { + + } + + @Override + public void processFilesRemote(List inputFiles, JobContext ctx) throws UnsupportedOperationException, PipelineJobException + { + int svVcfId = ctx.getParams().optInt("svVCF", 0); + if (svVcfId == 0) + { + throw new PipelineJobException("svVCF param was null"); + } + + File svVcf = ctx.getSequenceSupport().getCachedData(svVcfId); + if (svVcf == null) + { + throw new PipelineJobException("File not found for ID: " + svVcfId); + } + else if (!svVcf.exists()) + { + throw new PipelineJobException("Missing file: " + svVcf.getPath()); + } + + Integer threads = SequencePipelineService.get().getMaxThreads(ctx.getLogger()); + + for (SequenceOutputFile so : inputFiles) + { + List jsonArgs = new ArrayList<>(); + SimpleScriptWrapper wrapper = new SimpleScriptWrapper(ctx.getLogger()); + jsonArgs.add(AbstractCommandWrapper.resolveFileInPath("svtyper", null, true).getPath()); + jsonArgs.add("-B"); + jsonArgs.add(so.getFile().getPath()); + + File coverageJson = new File(ctx.getWorkingDirectory(), "bam.json"); + jsonArgs.add("-l"); + jsonArgs.add(coverageJson.getPath()); + + File doneFile = new File(ctx.getWorkingDirectory(), "json.done"); + ctx.getFileManager().addIntermediateFile(doneFile); + if (doneFile.exists()) + { + ctx.getLogger().info("BAM json already generated, skipping"); + } + else + { + wrapper.execute(jsonArgs); + try + { + FileUtils.touch(doneFile); + ctx.getFileManager().addIntermediateFile(doneFile); + } + catch (IOException e) + { + throw new PipelineJobException(e); + } + } + + if (!coverageJson.exists()) + { + throw new PipelineJobException("Missing file: " + coverageJson.getPath()); + } + ctx.getFileManager().addIntermediateFile(coverageJson); + + List svtyperArgs = new ArrayList<>(); + svtyperArgs.add(AbstractCommandWrapper.resolveFileInPath("svtyper-sso", null, true).getPath()); + + svtyperArgs.add("-i"); + svtyperArgs.add(svVcf.getPath()); + + svtyperArgs.add("-B"); + svtyperArgs.add(so.getFile().getPath()); + + svtyperArgs.add("-l"); + svtyperArgs.add(coverageJson.getPath()); + + if (threads != null) + { + svtyperArgs.add("--core"); + svtyperArgs.add(threads.toString()); + } + + File genotypes = new File(ctx.getWorkingDirectory(), SequenceAnalysisService.get().getUnzippedBaseName(so.getName()) + ".svtyper.vcf.gz"); + wrapper.execute(Arrays.asList("/bin/bash", "-c", StringUtils.join(svtyperArgs, " ") + "| bgzip -c"), ProcessBuilder.Redirect.to(genotypes)); + + if (!genotypes.exists()) + { + throw new PipelineJobException("Missing file: " + genotypes.getPath()); + } + + try + { + SequenceAnalysisService.get().ensureVcfIndex(genotypes, ctx.getLogger()); + } + catch (IOException e) + { + throw new PipelineJobException(e); + } + + ctx.getFileManager().addSequenceOutput(genotypes, "SVTyper Genotypes: " + so.getName(), "SVTyper Genoypes", so.getReadset(), null, so.getLibrary_id(), "Input VCF: " + svVcf.getName() + " (" + svVcfId + ")"); + } + } + } +} diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/KingInferenceStep.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/KingInferenceStep.java index 62c4a3c0e..f67b412ae 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/KingInferenceStep.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/KingInferenceStep.java @@ -10,10 +10,12 @@ import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; import org.json.JSONObject; +import org.labkey.api.collections.CaseInsensitiveHashMap; import org.labkey.api.pipeline.PipelineJobException; +import org.labkey.api.reader.Readers; import org.labkey.api.sequenceanalysis.SequenceAnalysisService; import org.labkey.api.sequenceanalysis.pipeline.AbstractVariantProcessingStepProvider; -import org.labkey.api.sequenceanalysis.pipeline.CommandLineParam; +import org.labkey.api.sequenceanalysis.pipeline.PedigreeToolParameterDescriptor; import org.labkey.api.sequenceanalysis.pipeline.PipelineContext; import org.labkey.api.sequenceanalysis.pipeline.PipelineStepProvider; import org.labkey.api.sequenceanalysis.pipeline.ReferenceGenome; @@ -23,11 +25,19 @@ import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStepOutputImpl; import org.labkey.api.sequenceanalysis.run.AbstractCommandPipelineStep; import org.labkey.api.sequenceanalysis.run.AbstractCommandWrapper; +import org.labkey.api.util.Compress; +import org.labkey.api.writer.PrintWriters; +import org.labkey.sequenceanalysis.pipeline.ProcessVariantsHandler; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.PrintWriter; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class KingInferenceStep extends AbstractCommandPipelineStep implements VariantProcessingStep { @@ -36,7 +46,7 @@ public KingInferenceStep(PipelineStepProvider provider, PipelineContext ctx) super(provider, ctx, new KingInferenceStep.KingWrapper(ctx.getLogger())); } - public static class Provider extends AbstractVariantProcessingStepProvider + public static class Provider extends AbstractVariantProcessingStepProvider implements VariantProcessingStep.SupportsPedigree { public Provider() { @@ -47,7 +57,8 @@ public Provider() }}, true), ToolParameterDescriptor.create("excludedContigs", "Excluded Contigs", "A comma separated list of contigs to exclude, such as X,Y,MT.", "textfield", new JSONObject(){{ - }}, "X,Y,MT") + }}, "X,Y,MT"), + new PedigreeToolParameterDescriptor(false) ), null, "https://www.kingrelatedness.com/manual.shtml"); } @@ -150,6 +161,23 @@ public Output processVariants(File inputVCF, File outputDirectory, ReferenceGeno kingArgs.add("--prefix"); kingArgs.add(SequenceAnalysisService.get().getUnzippedBaseName(inputVCF.getName())); + // Update the pedigree / fam file: + String demographicsProviderName = getProvider().getParameterByName(PedigreeToolParameterDescriptor.NAME).extractValue(getPipelineCtx().getJob(), getProvider(), getStepIdx()); + if (demographicsProviderName != null) + { + File pedFile = ProcessVariantsHandler.getPedigreeFile(getPipelineCtx().getSourceDirectory(true), demographicsProviderName); + if (!pedFile.exists()) + { + throw new PipelineJobException("Unable to find pedigree file: " + pedFile.getPath()); + } + + File kingFam = createFamFile(pedFile, new File(plinkOutBed.getParentFile(), "plink.fam")); + kingArgs.add("--ped"); + kingArgs.add(kingFam.getPath()); + + output.addIntermediateFile(kingFam); + } + if (threads != null) { kingArgs.add("--cpus"); @@ -165,7 +193,7 @@ public Output processVariants(File inputVCF, File outputDirectory, ReferenceGeno throw new PipelineJobException("Unable to find file: " + kinshipOutput.getPath()); } - File kinshipOutputTxt = new File(kinshipOutput.getPath() + ".txt"); + File kinshipOutputTxt = new File(kinshipOutput.getPath() + ".txt.gz"); if (kinshipOutputTxt.exists()) { kinshipOutputTxt.delete(); @@ -173,6 +201,7 @@ public Output processVariants(File inputVCF, File outputDirectory, ReferenceGeno try { + kinshipOutput = Compress.compressGzip(kinshipOutput); FileUtils.moveFile(kinshipOutput, kinshipOutputTxt); } catch (IOException e) @@ -185,6 +214,61 @@ public Output processVariants(File inputVCF, File outputDirectory, ReferenceGeno return output; } + private File createFamFile(File pedFile, File famFile) throws PipelineJobException + { + File newFamFile = new File(famFile.getParentFile(), "king.fam"); + + Map pedMap = new CaseInsensitiveHashMap<>(); + try (BufferedReader reader = Readers.getReader(pedFile)) + { + String line; + while ((line = reader.readLine()) != null) + { + String[] tokens = line.split(" "); + if (tokens.length != 6) + { + throw new PipelineJobException("Improper ped line length: " + tokens.length); + } + + pedMap.put(tokens[1], StringUtils.join(Arrays.asList("0", tokens[1], tokens[2], tokens[3], tokens[4], "-9"), "\t")); + } + } + catch (IOException e) + { + throw new PipelineJobException(e); + } + + try (BufferedReader reader = Readers.getReader(famFile);PrintWriter writer = PrintWriters.getPrintWriter(newFamFile)) + { + String line; + while ((line = reader.readLine()) != null) + { + String[] tokens = line.split("\t"); + if (tokens.length != 6) + { + throw new PipelineJobException("Improper ped line length: " + tokens.length); + } + + String newRow = pedMap.get(tokens[1]); + if (newRow == null) + { + getPipelineCtx().getLogger().warn("Unable to find pedigree entry for: " + tokens[1] + ", reusing original"); + writer.println(line); + } + else + { + writer.println(newRow); + } + } + } + catch (IOException e) + { + throw new PipelineJobException(e); + } + + return newFamFile; + } + public static class KingWrapper extends AbstractCommandWrapper { public KingWrapper(@Nullable Logger logger) diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/PlinkPcaStep.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/PlinkPcaStep.java index 0a2ab89f6..01ce28d43 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/PlinkPcaStep.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/PlinkPcaStep.java @@ -80,6 +80,10 @@ public Provider() put("valueField", "application"); put("sortField", "application"); }}, null), + ToolParameterDescriptor.create("kingCutoff", "KING Cutoff", "Can be used to prune close relations, identified using KING", "ldk-numberfield", new JSONObject(){{ + put("minValue", 0); + put("maxValue", 1); + }}, 0.25), ToolParameterDescriptor.create("allowMissingSamples", "Allow Missing Samples", "When using split by application, this controls whether or not the job should fail if a matching readset cannot be found for specific samples.", "checkbox", null, false), ToolParameterDescriptor.create("fileSets", "File Set(s) For Readset Query", "If Split By Application is used, the system needs to query each sample name in the VCF and find the corresponding readset. If this is provided, this query will be limited to redsets where a gVCF exists and is tagged as one of these file sets", "sequenceanalysis-trimmingtextarea", null, null), ToolParameterDescriptor.create(SelectSamplesStep.SAMPLE_INCLUDE, "Sample(s) Include", "Only the following samples will be included in the analysis.", "sequenceanalysis-trimmingtextarea", null, null), @@ -178,6 +182,13 @@ private void runBatch(File inputVCF, File outputDirectory, VariantProcessingStep String samplesToExclude = getProvider().getParameterByName(SelectSamplesStep.SAMPLE_EXCLUDE).extractValue(getPipelineCtx().getJob(), getProvider(), getStepIdx(), String.class); addSubjectSelectOptions(samplesToExclude, args, "--exclude", new File(outputDirectory, "samplesToExclude.txt"), output); + Double kingCutoff = getProvider().getParameterByName("kingCutoff").extractValue(getPipelineCtx().getJob(), getProvider(), getStepIdx(), Double.class); + if (kingCutoff != null) + { + args.add("--king-cutoff"); + args.add(kingCutoff.toString()); + } + args.add("--vcf"); args.add(inputVCF.getPath()); diff --git a/singlecell/api-src/org/labkey/api/singlecell/CellHashingService.java b/singlecell/api-src/org/labkey/api/singlecell/CellHashingService.java index a6f314dd3..9939f706c 100644 --- a/singlecell/api-src/org/labkey/api/singlecell/CellHashingService.java +++ b/singlecell/api-src/org/labkey/api/singlecell/CellHashingService.java @@ -104,6 +104,7 @@ public static class CellHashingParameters public Integer minCountPerCell = 5; public Double majorityConsensusThreshold = null; public Double minAllowableDoubletRateFilter = null; + public Double minAllowableSingletRate = null; public Double callerDisagreementThreshold = null; public List methods = CALLING_METHOD.getDefaultConsensusMethods(); //Default to just executing the set used for default consensus calls, rather than additional ones public List consensusMethods = null; @@ -127,6 +128,7 @@ public static CellHashingService.CellHashingParameters createFromStep(SequenceOu ret.minCountPerCell = step.getProvider().getParameterByName("minCountPerCell").extractValue(ctx.getJob(), step.getProvider(), step.getStepIdx(), Integer.class, 3); ret.majorityConsensusThreshold = step.getProvider().getParameterByName("majorityConsensusThreshold").extractValue(ctx.getJob(), step.getProvider(), step.getStepIdx(), Double.class, null); ret.minAllowableDoubletRateFilter = step.getProvider().getParameterByName("minAllowableDoubletRateFilter").extractValue(ctx.getJob(), step.getProvider(), step.getStepIdx(), Double.class, null); + ret.minAllowableSingletRate = step.getProvider().getParameterByName("minAllowableSingletRate").extractValue(ctx.getJob(), step.getProvider(), step.getStepIdx(), Double.class, null); ret.callerDisagreementThreshold = step.getProvider().getParameterByName("callerDisagreementThreshold").extractValue(ctx.getJob(), step.getProvider(), step.getStepIdx(), Double.class, null); ret.doTSNE = step.getProvider().getParameterByName("doTSNE").extractValue(ctx.getJob(), step.getProvider(), step.getStepIdx(), Boolean.class, false); ret.doNotAllowResume = step.getProvider().getParameterByName("doNotAllowResume").extractValue(ctx.getJob(), step.getProvider(), step.getStepIdx(), Boolean.class, false); @@ -171,6 +173,7 @@ public static CellHashingParameters createFromJson(BARCODE_TYPE type, File webse ret.minCountPerCell = params.optInt("minCountPerCell", 3); ret.majorityConsensusThreshold = params.get("majorityConsensusThreshold") == null ? null : params.getDouble("majorityConsensusThreshold"); ret.minAllowableDoubletRateFilter = params.get("minAllowableDoubletRateFilter") == null ? null : params.getDouble("minAllowableDoubletRateFilter"); + ret.minAllowableSingletRate = params.get("minAllowableSingletRate") == null ? null : params.getDouble("minAllowableSingletRate"); ret.callerDisagreementThreshold = params.get("callerDisagreementThreshold") == null ? null : params.getDouble("callerDisagreementThreshold"); ret.doTSNE = params.optBoolean("doTSNE", false); ret.doNotAllowResume = params.optBoolean("doNotAllowResume", false); diff --git a/singlecell/resources/chunks/AppendMetadata.R b/singlecell/resources/chunks/AppendMetadata.R index addefee43..f14a25f65 100644 --- a/singlecell/resources/chunks/AppendMetadata.R +++ b/singlecell/resources/chunks/AppendMetadata.R @@ -11,7 +11,13 @@ for (datasetId in names(seuratObjects)) { printName(datasetId) seuratObj <- readSeuratRDS(seuratObjects[[datasetId]]) - seuratObj <- Rdiscvr::QueryAndApplyCdnaMetadata(seuratObj) + if ('BarcodePrefix' %in% names(seuratObj@meta.data) && !any(is.na(as.integer(seuratObj$BarcodePrefix)))) { + seuratObj <- Rdiscvr::QueryAndApplyCdnaMetadata(seuratObj) + } else if ('cDNA_ID' %in% names(seuratObj@meta.data)) { + seuratObj <- Rdiscvr::QueryAndApplyMetadataUsingCDNA(seuratObj) + } else { + stop('Unable to find either BarcodePrefix or cDNA_ID in meta.data') + } saveData(seuratObj, datasetId) diff --git a/singlecell/resources/chunks/AppendNimble.R b/singlecell/resources/chunks/AppendNimble.R index 7a913d757..d24ac73bf 100644 --- a/singlecell/resources/chunks/AppendNimble.R +++ b/singlecell/resources/chunks/AppendNimble.R @@ -17,8 +17,10 @@ for (datasetId in names(seuratObjects)) { seuratObj <- readSeuratRDS(seuratObjects[[datasetId]]) for (genomeId in names(nimbleGenomes)) { - maxAmbiguityAllowed <- !nimbleGenomeAmbiguousPreference[[genomeId]] - seuratObj <- Rdiscvr::DownloadAndAppendNimble(seuratObject = seuratObj, allowableGenomes = genomeId, ensureSamplesShareAllGenomes = ensureSamplesShareAllGenomes, targetAssayName = nimbleGenomes[[genomeId]], enforceUniqueFeatureNames = TRUE, maxAmbiguityAllowed = maxAmbiguityAllowed, maxLibrarySizeRatio = maxLibrarySizeRatio) + maxAmbiguityAllowed <- nimbleGenomeAmbiguousPreference[[genomeId]] + queryDatabaseForLineageUpdates <- queryDatabaseForLineageUpdatesPreference[[genomeId]] + replaceExistingAssayData <- replaceExistingAssayDataByGenome[[genomeId]] + seuratObj <- Rdiscvr::DownloadAndAppendNimble(seuratObject = seuratObj, allowableGenomes = genomeId, ensureSamplesShareAllGenomes = ensureSamplesShareAllGenomes, targetAssayName = nimbleGenomes[[genomeId]], enforceUniqueFeatureNames = TRUE, maxAmbiguityAllowed = maxAmbiguityAllowed, maxLibrarySizeRatio = maxLibrarySizeRatio, queryDatabaseForLineageUpdates = queryDatabaseForLineageUpdates, replaceExistingAssayData = replaceExistingAssayData) } saveData(seuratObj, datasetId) diff --git a/singlecell/resources/chunks/MergeSeurat.R b/singlecell/resources/chunks/MergeSeurat.R index 8b0d72d7c..134005614 100644 --- a/singlecell/resources/chunks/MergeSeurat.R +++ b/singlecell/resources/chunks/MergeSeurat.R @@ -5,15 +5,21 @@ if (!doDiet && length(seuratObjects) > 20 && !disableAutoDietSeurat) { doDiet <- TRUE } -mergeBatch <- function(dat) { +filesToDelete <- c() + +mergeBatchInMemory <- function(datasetIdToFilePath, saveFile) { + if (all(is.null(names(datasetIdToFilePath)))) { + stop('No names provided on datasetIdToFilePath') + } + toMerge <- list() - for (datasetId in names(dat)) { + for (datasetId in names(datasetIdToFilePath)) { print(paste0('Loading: ', datasetId)) if (doDiet) { - toMerge[[datasetId]] <- Seurat::DietSeurat(readSeuratRDS(dat[[datasetId]])) + toMerge[[datasetId]] <- Seurat::DietSeurat(readSeuratRDS(datasetIdToFilePath[[datasetId]])) gc() } else { - toMerge[[datasetId]] <- readSeuratRDS(dat[[datasetId]]) + toMerge[[datasetId]] <- readSeuratRDS(datasetIdToFilePath[[datasetId]]) } if (ncol(toMerge[[datasetId]]) == 1) { @@ -37,60 +43,88 @@ mergeBatch <- function(dat) { stop('There were no passing seurat objects!') } - seuratObj <- CellMembrane::MergeSeuratObjs(toMerge, projectName = projectName, doGC = doDiet, errorOnBarcodeSuffix = errorOnBarcodeSuffix) - return(seuratObj) + saveRDS(CellMembrane::MergeSeuratObjs(toMerge, projectName = projectName, doGC = doDiet, errorOnBarcodeSuffix = errorOnBarcodeSuffix), file = saveFile) + filesToDelete <<- c(filesToDelete, saveFile) + + logger::log_info(paste0('mem used: ', R.utils::hsize(as.numeric(pryr::mem_used())))) + gc() + logger::log_info(paste0('after gc: ', R.utils::hsize(as.numeric(pryr::mem_used())))) + + return(saveFile) } -if (length(seuratObjects) == 1) { - print('There is only one seurat object, no need to merge') - datasetId <- names(seuratObjects)[[1]] - saveData(readSeuratRDS(seuratObjects[[datasetId]]), datasetId) -} else { - batchSize <- 20 - numBatches <- ceiling(length(seuratObjects) / batchSize) - mergedObjectFiles <- list() - for (i in 1:numBatches) { - logger::log_info(paste0('Merging batch ', i, ' of ', numBatches)) - start <- 1 + (i-1)*batchSize - end <- min(start+batchSize-1, length(seuratObjects)) - logger::log_info(paste0('processing: ', start, ' to ', end, ' of ', length(seuratObjects))) - - fn <- paste0('mergeBatch.', i, '.rds') - saveRDS(mergeBatch(seuratObjects[start:end]), file = fn) - mergedObjectFiles[[i]] <- fn - - logger::log_info(paste0('mem used: ', R.utils::hsize(as.numeric(pryr::mem_used())))) - gc() - logger::log_info(paste0('after gc: ', R.utils::hsize(as.numeric(pryr::mem_used())))) +mergeBatch <- function(seuratObjects, outerBatchIdx, maxBatchSize = 20, maxInputFileSizeMb = maxAllowableInputFileSizeMb) { + logger::log_info(paste0('Beginning outer batch: ', outerBatchIdx, ' with total files: ', length(seuratObjects))) + + if (length(seuratObjects) == 1) { + print('Single file, nothing to do') + return(seuratObjects) } - logger::log_info('Done with batches') - if (length(mergedObjectFiles) == 1) { - seuratObj <- readRDS(mergedObjectFiles[[1]]) - unlink(mergedObjectFiles[[1]]) - } else { - logger::log_info('performing final merge') - # TODO: check for single cell in object - seuratObj <- readRDS(mergedObjectFiles[[1]]) - unlink(mergedObjectFiles[[1]]) - - for (i in 2:length(mergedObjectFiles)) { - logger::log_info(paste0('Merging final file ', i, ' of ', length(mergedObjectFiles))) - seuratObj <- merge(x = seuratObj, y = readRDS(mergedObjectFiles[[i]]), project = seuratObj@project.name) - if (HasSplitLayers(seuratObj)) { - seuratObj <- MergeSplitLayers(seuratObj) - } + # Phase 1: group into batches: + batchList <- list() + activeBatch <- list() + sizeOfBatch <- 0 + batchIdx <- 1 + for (datasetId in names(seuratObjects)) { + activeBatch[[datasetId]] <- seuratObjects[[datasetId]] + sizeInMb <- (file.size(seuratObjects[[datasetId]]) / 1024^2) + sizeOfBatch <- sizeOfBatch + sizeInMb - unlink(mergedObjectFiles[[i]]) + if (length(activeBatch) >= maxBatchSize || (sizeOfBatch >= maxInputFileSizeMb && length(activeBatch) > 1)) { + logger::log_info(paste0('adding to batch with ', length(activeBatch), ' files and ', sizeOfBatch, ' MB')) + batchList[[batchIdx]] <- activeBatch + activeBatch <- list() + sizeOfBatch <- 0 + batchIdx <- batchIdx + 1 + next + } + } - logger::log_info(paste0('mem used: ', R.utils::hsize(as.numeric(pryr::mem_used())))) - logger::log_info(paste0('seurat object: ', R.utils::hsize(as.numeric(utils::object.size(seuratObj))))) - gc() - logger::log_info(paste0('after gc: ', R.utils::hsize(as.numeric(pryr::mem_used())))) + # Account for final files: + if (length(activeBatch) > 0) { + logger::log_info(paste0('finalizing batch with ', length(activeBatch), ' files and ', sizeOfBatch, ' MB')) + batchList[[batchIdx]] <- activeBatch + } + + if (length(batchList) == 0){ + stop('Error: zero length batchList') + } + + mergedObjectFiles <- list() + for (i in 1:length(batchList)) { + activeBatch <- batchList[[i]] + logger::log_info(paste0('Merging inner batch ', i, ' of ', length(batchList), ' with ', length(activeBatch), ' files')) + + batchName <- paste0('merge.', outerBatchIdx, '.', i) + saveFile <- paste0(batchName, '.rds') + if (length(activeBatch) == 1){ + print('Only single file in batch, skipping merge') + mergedObjectFiles[[batchName]] <- activeBatch[[1]] + next } + mergedObjectFiles[[batchName]] <- mergeBatchInMemory(activeBatch, saveFile = saveFile) } + logger::log_info('Done with inner batch') - gc() + if (length(mergedObjectFiles) > 1) { + return(mergeBatch(mergedObjectFiles, outerBatchIdx = (outerBatchIdx + 1), maxInputFileSizeMb = maxInputFileSizeMb, maxBatchSize = maxBatchSize)) + } + + return(mergedObjectFiles) +} + +mergedObjectFiles <- mergeBatch(seuratObjects, outerBatchIdx = 1) +if (length(mergedObjectFiles) != 1) { + stop(paste0('Expected single file, found: ', length(mergedObjectFiles))) +} + +print('Overall merge complete') +gc() + +saveData(readRDS(mergedObjectFiles[[1]]), projectName) - saveData(seuratObj, projectName) +# Cleanup: +for (fn in filesToDelete) { + unlink(fn) } \ No newline at end of file diff --git a/singlecell/resources/chunks/PerformDefaultNimbleAppend.R b/singlecell/resources/chunks/PerformDefaultNimbleAppend.R new file mode 100644 index 000000000..f4f37dcd1 --- /dev/null +++ b/singlecell/resources/chunks/PerformDefaultNimbleAppend.R @@ -0,0 +1,26 @@ +netRc <- paste0(Sys.getenv('USER_HOME'), '/.netrc') +if (!file.exists(netRc)) { + print(list.files(Sys.getenv('USER_HOME'))) + stop(paste0('Unable to find file: ', netRc)) +} + +invisible(Rlabkey::labkey.setCurlOptions(NETRC_FILE = netRc)) +Rdiscvr::SetLabKeyDefaults(baseUrl = serverBaseUrl, defaultFolder = defaultLabKeyFolder) + +# NOTE: this file is created by DownloadAndAppendNimble if there was an error. It might exist if a job failed and then was restarted +if (file.exists('debug.nimble.txt.gz')) { + unlink('debug.nimble.txt.gz') +} + +for (datasetId in names(seuratObjects)) { + printName(datasetId) + seuratObj <- readSeuratRDS(seuratObjects[[datasetId]]) + + seuratObj <- Rdiscvr::PerformDefaultNimbleAppend(seuratObj) + + saveData(seuratObj, datasetId) + + # Cleanup + rm(seuratObj) + gc() +} \ No newline at end of file diff --git a/singlecell/resources/chunks/RunCelltypist.R b/singlecell/resources/chunks/RunCelltypist.R index 571d3028e..1f98079a3 100644 --- a/singlecell/resources/chunks/RunCelltypist.R +++ b/singlecell/resources/chunks/RunCelltypist.R @@ -1,3 +1,31 @@ +if (!reticulate::py_module_available(module = 'celltypist')) { + logger::log_warn('python celltypist not found!') + logger::log_warn(paste0('Python available: ', reticulate::py_available())) + logger::log_warn('Python config') + pyConfig <- reticulate::py_config() + for (pn in names(pyConfig)) { + logger::log_warn(paste0(pn, ': ', paste0(pyConfig[[pn]]), collapse = ',')) + } + + logger::log_warn(paste0('pythonpath: ', reticulate::py_config()$pythonpath)) + + logger::log_warn('Python packages:') + for (pn in reticulate::py_list_packages()$package) { + logger::log_warn(pn) + } + + if ('conga' %in% reticulate::py_list_packages()$package) { + tryCatch({ + logger::log_warn(reticulate::import('celltypist')) + }, error = function(e){ + logger::log_warn("Error with reticulate::import('celltypist')") + logger::log_warn(reticulate::py_last_error()) + logger::log_warn(conditionMessage(e)) + traceback() + }) + } +} + for (datasetId in names(seuratObjects)) { printName(datasetId) seuratObj <- readSeuratRDS(seuratObjects[[datasetId]]) diff --git a/singlecell/resources/chunks/RunDecoupler.R b/singlecell/resources/chunks/RunDecoupler.R new file mode 100644 index 000000000..6e4656164 --- /dev/null +++ b/singlecell/resources/chunks/RunDecoupler.R @@ -0,0 +1,12 @@ +for (datasetId in names(seuratObjects)) { + printName(datasetId) + seuratObj <- readSeuratRDS(seuratObjects[[datasetId]]) + + seuratObj <- CellMembrane::RunDecoupleR(seuratObj) + + saveData(seuratObj, datasetId) + + # Cleanup + rm(seuratObj) + gc() +} \ No newline at end of file diff --git a/singlecell/resources/chunks/SaveData.R b/singlecell/resources/chunks/SaveData.R index 307704ae8..46002cb05 100644 --- a/singlecell/resources/chunks/SaveData.R +++ b/singlecell/resources/chunks/SaveData.R @@ -11,8 +11,8 @@ if (length(errorMessages) > 0) { write(errorMessages, file = 'seuratErrors.txt') } -if (file.exists('savedSeuratObjects.txt')) { - print(paste0('Total lines in savedSeuratObjects.txt:', length(readLines('savedSeuratObjects.txt')))) +if (file.exists(trackerFile)) { + print(paste0('Total lines in ', trackerFile, ':', length(readLines(trackerFile)))) } else { - print('File does not exist: savedSeuratObjects.txt') + print(paste0('File does not exist: ', trackerFile)) } \ No newline at end of file diff --git a/singlecell/resources/chunks/StudyMetadata.R b/singlecell/resources/chunks/StudyMetadata.R index fd8d4e931..b64a2c66e 100644 --- a/singlecell/resources/chunks/StudyMetadata.R +++ b/singlecell/resources/chunks/StudyMetadata.R @@ -21,6 +21,10 @@ for (datasetId in names(seuratObjects)) { seuratObj <- Rdiscvr::ApplyPC531Metadata(seuratObj, errorIfUnknownIdsFound = errorIfUnknownIdsFound) } else if (studyName == 'AcuteNx') { seuratObj <- Rdiscvr::ApplyAcuteNxMetadata(seuratObj, errorIfUnknownIdsFound = errorIfUnknownIdsFound) + } else if (studyName == 'EC') { + seuratObj <- Rdiscvr::ApplyEC_Metadata(seuratObj, errorIfUnknownIdsFound = errorIfUnknownIdsFound) + } else if (studyName == 'PPG_Stims') { + seuratObj <- Rdiscvr::ApplyPPG_Stim_Metadata(seuratObj, errorIfUnknownIdsFound = errorIfUnknownIdsFound) } else { stop(paste0('Unknown study: ', studyName)) } diff --git a/singlecell/resources/web/singlecell/panel/NimbleAppendPanel.js b/singlecell/resources/web/singlecell/panel/NimbleAppendPanel.js index f01e307ce..9f346ff14 100644 --- a/singlecell/resources/web/singlecell/panel/NimbleAppendPanel.js +++ b/singlecell/resources/web/singlecell/panel/NimbleAppendPanel.js @@ -10,7 +10,7 @@ Ext4.define('SingleCell.panel.NimbleAppendPanel', { initComponent: function(){ Ext4.apply(this, { style: 'padding: 10px;margins: 5px;', - minWidth: 850, + minWidth: 1025, border: true, items: [{ html: 'This step will query nimble results for the selected genome(s). It will then append these results to the seurat object on the target assay.', @@ -20,7 +20,7 @@ Ext4.define('SingleCell.panel.NimbleAppendPanel', { },{ xtype: 'ldk-gridpanel', clicksToEdit: 1, - width: 775, + width: 1000, tbar: [{ text: 'Add', handler: function(btn){ @@ -40,7 +40,7 @@ Ext4.define('SingleCell.panel.NimbleAppendPanel', { },LABKEY.ext4.GRIDBUTTONS.DELETERECORD()], store: { type: 'array', - fields: ['genomeId', 'targetAssay','maxAmbiguityAllowed'] + fields: ['genomeId', 'targetAssay','maxAmbiguityAllowed', 'queryDatabaseForLineageUpdates', 'replaceExistingAssayData'] }, columns: [{ dataIndex: 'genomeId', @@ -77,6 +77,25 @@ Ext4.define('SingleCell.panel.NimbleAppendPanel', { allowBlank: true, minValue: 0 } + },{ + dataIndex: 'queryDatabaseForLineageUpdates', + width: 175, + header: 'Check for Lineage Updates', + editor: { + xtype: 'checkbox', + allowBlank: true, + value: false + } + },{ + dataIndex: 'replaceExistingAssayData', + width: 150, + header: 'Replace Existing Data?', + editor: { + xtype: 'checkbox', + allowBlank: true, + value: true + } + }] }] }); @@ -87,7 +106,7 @@ Ext4.define('SingleCell.panel.NimbleAppendPanel', { getValue: function(){ var ret = []; this.down('ldk-gridpanel').store.each(function(r, i) { - ret.push([r.data.genomeId, r.data.targetAssay, r.data.maxAmbiguityAllowed ?? '']); + ret.push([r.data.genomeId, r.data.targetAssay, r.data.maxAmbiguityAllowed ?? '', !!r.data.queryDatabaseForLineageUpdates], !!r.data.replaceExistingAssayData); }, this); return Ext4.isEmpty(ret) ? null : JSON.stringify(ret); @@ -123,7 +142,9 @@ Ext4.define('SingleCell.panel.NimbleAppendPanel', { var rec = grid.store.createModel({ genomeId: row[0], targetAssay: row[1], - maxAmbiguityAllowed: row[2] + maxAmbiguityAllowed: row[2], + queryDatabaseForLineageUpdates: !!row[3], + replaceExistingAssayData: !!row[4] }); grid.store.add(rec); }, this); diff --git a/singlecell/src/org/labkey/singlecell/CellHashingServiceImpl.java b/singlecell/src/org/labkey/singlecell/CellHashingServiceImpl.java index 81629915c..a291d1aa7 100644 --- a/singlecell/src/org/labkey/singlecell/CellHashingServiceImpl.java +++ b/singlecell/src/org/labkey/singlecell/CellHashingServiceImpl.java @@ -979,6 +979,11 @@ public List getHashingCallingParams(boolean allowMethod put("maxValue", 1); put("decimalPrecision", 2); }}, 0.2), + ToolParameterDescriptor.create("minAllowableSingletRate", "Min Allowable Singlet Rate", "This is an optional threshold designed to automatically discard poorly performing callers. If a given algorithm returns a singlet rate below this level, it is discarded from the consensus.", "ldk-numberfield", new JSONObject(){{ + put("minValue", 0); + put("maxValue", 1); + put("decimalPrecision", 2); + }}, 0.05), ToolParameterDescriptor.create("skipNormalizationQc", "Skip Normalization QC", null, "checkbox", null, true), ToolParameterDescriptor.create("doNotAllowResume", "Do Not Allow Resume", "If checked, on resume the job will repeat hashing scoring, rather than allowing resume from a saved state", "checkbox", null, false), ToolParameterDescriptor.create("doTSNE", "Do tSNE", "If true, tSNE will be performed as part of QC", "checkbox", null, false), @@ -1233,15 +1238,8 @@ public File generateCellHashingCalls(File citeSeqCountOutDir, File outputDir, St Set allowableBarcodes = parameters.getAllowableBarcodeNames(); String allowableBarcodeParam = allowableBarcodes != null ? "c('" + StringUtils.join(allowableBarcodes, "','") + "')" : "NULL"; - List methodNames = parameters.methods.stream().filter(m -> { - if (totalCellBarcodes < m.getMinCells()) - { - ctx.getLogger().debug("Dropping method due to insufficient cells: " + m.name()); - return false; - } - - return true; - }).map(CALLING_METHOD::getLabel).distinct().toList(); + // NOTE: we do not need to filter total methods on min cells: + List methodNames = parameters.methods.stream().map(CALLING_METHOD::getLabel).distinct().toList(); List consensusMethodNames = parameters.consensusMethods == null ? Collections.emptyList() : parameters.consensusMethods.stream().filter(m -> { if (totalCellBarcodes < m.getMinCells()) @@ -1275,6 +1273,7 @@ public File generateCellHashingCalls(File citeSeqCountOutDir, File outputDir, St ", majorityConsensusThreshold = " + (parameters.majorityConsensusThreshold == null ? "NULL" : parameters.majorityConsensusThreshold) + ", callerDisagreementThreshold = " + (parameters.callerDisagreementThreshold == null ? "NULL" : parameters.callerDisagreementThreshold) + (parameters.minAllowableDoubletRateFilter == null ? "" : ", minAllowableDoubletRateFilter = " + parameters.minAllowableDoubletRateFilter) + + (parameters.minAllowableSingletRate == null ? "" : ", minAllowableSingletRate = " + parameters.minAllowableSingletRate) + ", doTSNE = " + doTSNE + ")"); writer.println("print('Rmarkdown complete')"); diff --git a/singlecell/src/org/labkey/singlecell/SingleCellModule.java b/singlecell/src/org/labkey/singlecell/SingleCellModule.java index 922c93ee9..724b15efa 100644 --- a/singlecell/src/org/labkey/singlecell/SingleCellModule.java +++ b/singlecell/src/org/labkey/singlecell/SingleCellModule.java @@ -71,6 +71,7 @@ import org.labkey.singlecell.pipeline.singlecell.IntegrateData; import org.labkey.singlecell.pipeline.singlecell.MergeSeurat; import org.labkey.singlecell.pipeline.singlecell.NormalizeAndScale; +import org.labkey.singlecell.pipeline.singlecell.PerformDefaultNimbleAppend; import org.labkey.singlecell.pipeline.singlecell.PhenotypePlots; import org.labkey.singlecell.pipeline.singlecell.PlotAssayFeatures; import org.labkey.singlecell.pipeline.singlecell.PlotAverageCiteSeqCounts; @@ -82,6 +83,7 @@ import org.labkey.singlecell.pipeline.singlecell.RunCelltypistCustomModel; import org.labkey.singlecell.pipeline.singlecell.RunConga; import org.labkey.singlecell.pipeline.singlecell.RunCsCore; +import org.labkey.singlecell.pipeline.singlecell.RunDecoupler; import org.labkey.singlecell.pipeline.singlecell.RunEscape; import org.labkey.singlecell.pipeline.singlecell.RunLDA; import org.labkey.singlecell.pipeline.singlecell.RunPCA; @@ -291,6 +293,8 @@ public static void registerPipelineSteps() SequencePipelineService.get().registerPipelineStep(new CustomGSEA.Provider()); SequencePipelineService.get().registerPipelineStep(new StudyMetadata.Provider()); SequencePipelineService.get().registerPipelineStep(new UpdateSeuratPrototype.Provider()); + SequencePipelineService.get().registerPipelineStep(new RunDecoupler.Provider()); + SequencePipelineService.get().registerPipelineStep(new PerformDefaultNimbleAppend.Provider()); SequenceAnalysisService.get().registerReadsetListener(new SingleCellReadsetListener()); } diff --git a/singlecell/src/org/labkey/singlecell/pipeline/singlecell/AppendNimble.java b/singlecell/src/org/labkey/singlecell/pipeline/singlecell/AppendNimble.java index b023676bb..106464839 100644 --- a/singlecell/src/org/labkey/singlecell/pipeline/singlecell/AppendNimble.java +++ b/singlecell/src/org/labkey/singlecell/pipeline/singlecell/AppendNimble.java @@ -1,5 +1,6 @@ package org.labkey.singlecell.pipeline.singlecell; +import org.apache.commons.lang3.StringUtils; import org.json.JSONArray; import org.json.JSONObject; import org.labkey.api.pipeline.PipelineJobException; @@ -76,9 +77,9 @@ protected Chunk createParamChunk(SequenceOutputHandler.JobContext ctx, List + { + public Provider() + { + super("PerformDefaultNimbleAppend", "Default Nimble Append", "RDiscvr", "This uses Rdiscvr to run the default nimble append, adding MHC, KIR, NKG, Viral and Ig data", null, null, null); + } + + @Override + public PerformDefaultNimbleAppend create(PipelineContext ctx) + { + return new PerformDefaultNimbleAppend(ctx, this); + } + } + + @Override + public String getFileSuffix() + { + return "nbl"; + } +} + diff --git a/singlecell/src/org/labkey/singlecell/pipeline/singlecell/RunDecoupler.java b/singlecell/src/org/labkey/singlecell/pipeline/singlecell/RunDecoupler.java new file mode 100644 index 000000000..211d8d935 --- /dev/null +++ b/singlecell/src/org/labkey/singlecell/pipeline/singlecell/RunDecoupler.java @@ -0,0 +1,37 @@ +package org.labkey.singlecell.pipeline.singlecell; + +import org.labkey.api.sequenceanalysis.pipeline.AbstractPipelineStepProvider; +import org.labkey.api.sequenceanalysis.pipeline.PipelineContext; +import org.labkey.api.singlecell.pipeline.SingleCellStep; + +import java.util.Arrays; + +public class RunDecoupler extends AbstractCellMembraneStep +{ + public RunDecoupler(PipelineContext ctx, RunDecoupler.Provider provider) + { + super(provider, ctx); + } + + public static class Provider extends AbstractPipelineStepProvider + { + public Provider() + { + super("RunDecoupler", "Run decoupleR", "decoupleR", "This will run decoupleR to score transcription factor enrichment.", Arrays.asList( + + ), null, null); + } + + @Override + public RunDecoupler create(PipelineContext ctx) + { + return new RunDecoupler(ctx, this); + } + } + + @Override + public String getFileSuffix() + { + return "decoupler"; + } +} \ No newline at end of file diff --git a/singlecell/src/org/labkey/singlecell/pipeline/singlecell/StudyMetadata.java b/singlecell/src/org/labkey/singlecell/pipeline/singlecell/StudyMetadata.java index 666250a2a..27050d587 100644 --- a/singlecell/src/org/labkey/singlecell/pipeline/singlecell/StudyMetadata.java +++ b/singlecell/src/org/labkey/singlecell/pipeline/singlecell/StudyMetadata.java @@ -24,7 +24,7 @@ public Provider() {{ put("multiSelect", false); put("allowBlank", false); - put("storeValues", "PC475;PC531;TB;Malaria;AcuteNx"); + put("storeValues", "PC475;PC531;TB;Malaria;AcuteNx;EC;PPG_Stims"); put("delimiter", ";"); }}, null, null, false, false), SeuratToolParameter.create("errorIfUnknownIdsFound", "Error If Unknown Ids Found", "If true, the job will fail if the seurat object contains ID not present in the metadata", "checkbox", null, true) diff --git a/singlecell/src/org/labkey/singlecell/run/NimbleHelper.java b/singlecell/src/org/labkey/singlecell/run/NimbleHelper.java index fd6fef01d..b48c5cb14 100644 --- a/singlecell/src/org/labkey/singlecell/run/NimbleHelper.java +++ b/singlecell/src/org/labkey/singlecell/run/NimbleHelper.java @@ -292,17 +292,9 @@ public void doNimbleAlign(File bam, PipelineStepOutput output, Readset rs, Strin output.addSequenceOutput(results, basename + ": nimble align", "Nimble Results", rs.getRowId(), null, genome.getGenomeId(), description); + // NOTE: situations like zero alignments would result in no report being created. Rely on the code in doAlign to verify proper execution of nimble File reportHtml = getReportHtmlFileFromResults(results); - if (!reportHtml.exists()) - { - if (SequencePipelineService.get().hasMinLineCount(results, 2)) - { - long lineCount = SequencePipelineService.get().getLineCount(results); - _ctx.getLogger().debug("Found {} lines in file {}", lineCount, results.getPath()); - throw new PipelineJobException("Unable to find file: " + reportHtml.getPath()); - } - } - else + if (reportHtml.exists()) { output.addSequenceOutput(reportHtml, basename + ": nimble report", NIMBLE_REPORT_CATEGORY, rs.getRowId(), null, genome.getGenomeId(), description); } @@ -339,7 +331,7 @@ private void updateNimbleConfigFile(File configFile, NimbleGenome genome) throws } catch (IOException e) { - throw new PipelineJobException(e); + throw new PipelineJobException("Unable to parse JSON: " + configFile.getPath(), e); } JSONObject config = json.getJSONObject(0); @@ -460,6 +452,21 @@ private Map doAlignment(List genomes, List