From a835b600b166bd267c73451cc2d4a9478550ebf0 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Wed, 10 May 2023 09:23:38 -0500 Subject: [PATCH 1/3] Walk file tree on directory publish Signed-off-by: Ben Sherman --- .../nextflow/processor/PublishDir.groovy | 29 ++++++++++++++++--- .../processor/PublishDirS3Test.groovy | 2 +- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index b1353db3fe..8aeb11613f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -19,11 +19,14 @@ package nextflow.processor import java.nio.file.FileAlreadyExistsException import java.nio.file.FileSystem import java.nio.file.FileSystems +import java.nio.file.FileVisitResult import java.nio.file.Files import java.nio.file.LinkOption import java.nio.file.NoSuchFileException import java.nio.file.Path import java.nio.file.PathMatcher +import java.nio.file.SimpleFileVisitor +import java.nio.file.attribute.BasicFileAttributes import java.util.concurrent.ExecutorService import groovy.transform.CompileDynamic @@ -326,10 +329,10 @@ class PublishDir { } if( inProcess ) { - safeProcessFile(source, destination) + safeProcessPath(source, destination) } else { - threadPool.submit({ safeProcessFile(source, destination) } as Runnable) + threadPool.submit({ safeProcessPath(source, destination) } as Runnable) } } @@ -354,9 +357,9 @@ class PublishDir { } @CompileStatic - protected void safeProcessFile(Path source, Path target) { + protected void safeProcessPath(Path source, Path target) { try { - processFile(source, target) + processPath(source, target) } catch( Throwable e ) { log.warn "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details", e @@ -367,6 +370,24 @@ class PublishDir { } } + @CompileStatic + protected void processPath( Path source, Path target ) { + + // publish each file in the directory tree + if( Files.isDirectory(source) ) + Files.walkFileTree(source, new SimpleFileVisitor() { + FileVisitResult visitFile(Path sourceFile, BasicFileAttributes attrs) { + final targetFile = target.resolve(source.relativize(sourceFile)) + processFile(sourceFile, targetFile) + FileVisitResult.CONTINUE + } + }) + + // otherwise publish file directly + else + processFile(source, target) + } + @CompileStatic protected void processFile( Path source, Path destination ) { diff --git a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy index 460de36581..e60909da96 100644 --- a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy +++ b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy @@ -63,7 +63,7 @@ class PublishDirS3Test extends Specification { when: spy.apply1(source, true) then: - 1 * spy.safeProcessFile(source, _) >> { sourceFile, s3File -> + 1 * spy.safeProcessPath(source, _) >> { sourceFile, s3File -> assert s3File instanceof S3Path assert (s3File as S3Path).getTagsList().find{ it.getKey()=='FOO'}.value == 'this' assert (s3File as S3Path).getTagsList().find{ it.getKey()=='BAR'}.value == 'that' From b7b69674fb28c33b0e68f5e8f233dc288fcc3b23 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 8 Feb 2024 16:40:11 -0600 Subject: [PATCH 2/3] Fix failing tests Signed-off-by: Ben Sherman --- .../src/main/groovy/nextflow/processor/PublishDir.groovy | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 02d33fefdd..f2f151ab83 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -366,19 +366,19 @@ class PublishDir { } catch( Throwable e ) { log.warn "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details", e - if( NF.strictMode || failOnError){ + if( NF.strictMode || failOnError ) { session?.abort(e) } } } - protected void processPath( Path source, Path target ) { + protected void processPath(Path source, Path target) { // publish each file in the directory tree if( Files.isDirectory(source) ) Files.walkFileTree(source, new SimpleFileVisitor() { FileVisitResult visitFile(Path sourceFile, BasicFileAttributes attrs) { - final targetFile = target.resolve(source.relativize(sourceFile)) + final targetFile = target.resolve(source.relativize(sourceFile).toString()) processFile(sourceFile, targetFile) FileVisitResult.CONTINUE } From ad3c4cbb8a9a6c3dea58895985b72be08c317b22 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Sun, 19 May 2024 01:26:59 -0500 Subject: [PATCH 3/3] Switch directory walk and retry Signed-off-by: Ben Sherman --- .../nextflow/processor/PublishDir.groovy | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 8cc4162c69..ecc67d9ae3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -368,7 +368,21 @@ class PublishDir { protected void safeProcessPath(Path source, Path target) { try { - retryableProcessPath(source, target) + // publish each file in the directory tree + if( Files.isDirectory(source) ) { + Files.walkFileTree(source, new SimpleFileVisitor() { + FileVisitResult visitFile(Path sourceFile, BasicFileAttributes attrs) { + final targetFile = target.resolve(source.relativize(sourceFile).toString()) + retryableProcessFile(sourceFile, targetFile) + FileVisitResult.CONTINUE + } + }) + } + + // otherwise publish file directly + else { + retryableProcessFile(source, target) + } } catch( Throwable e ) { final msg = "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details" @@ -382,7 +396,7 @@ class PublishDir { } } - protected void retryableProcessPath(Path source, Path target) { + protected void retryableProcessFile(Path source, Path target) { final listener = new EventListener() { @Override void accept(ExecutionAttemptedEvent event) throws Throwable { @@ -398,26 +412,7 @@ class PublishDir { .build() Failsafe .with( retryPolicy ) - .get { it-> processPath(source, target) } - } - - protected void processPath(Path source, Path target) { - - // publish each file in the directory tree - if( Files.isDirectory(source) ) { - Files.walkFileTree(source, new SimpleFileVisitor() { - FileVisitResult visitFile(Path sourceFile, BasicFileAttributes attrs) { - final targetFile = target.resolve(source.relativize(sourceFile).toString()) - processFile(sourceFile, targetFile) - FileVisitResult.CONTINUE - } - }) - } - - // otherwise publish file directly - else { - processFile(source, target) - } + .get { it-> processFile(source, target) } } protected void processFile( Path source, Path destination ) {