diff --git a/build.sbt b/build.sbt index 0c445431a8..4ea993463e 100644 --- a/build.sbt +++ b/build.sbt @@ -361,6 +361,32 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( ), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]( "fs2.io.net.tls.TLSContext#Builder.fs2$io$net$tls$TLSContextCompanionPlatform$BuilderPlatform$$$outer" + ), + // Process stream redirection: #3170 + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.stdin"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.stdout"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.stderr"), + ProblemFilters.exclude[ReversedMissingMethodProblem]( + "fs2.io.process.ProcessBuilder.redirectErrorStream" + ), + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.withStdin"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.withStdout"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.withStderr"), + ProblemFilters.exclude[ReversedMissingMethodProblem]( + "fs2.io.process.ProcessBuilder.withRedirectErrorStream" + ), + ProblemFilters.exclude[ReversedMissingMethodProblem]( + "fs2.io.process.ProcessBuilder.inheritStdio" + ), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "fs2.io.process.ProcessBuilder#ProcessBuilderImpl.copy" + ), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "fs2.io.process.ProcessBuilder#ProcessBuilderImpl.this" + ), + ProblemFilters.exclude[MissingTypesProblem]("fs2.io.process.ProcessBuilder$ProcessBuilderImpl$"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "fs2.io.process.ProcessBuilder#ProcessBuilderImpl.apply" ) ) diff --git a/io/js/src/main/scala/fs2/io/internal/facade/child_process.scala b/io/js/src/main/scala/fs2/io/internal/facade/child_process.scala index a719b0345f..b627774485 100644 --- a/io/js/src/main/scala/fs2/io/internal/facade/child_process.scala +++ b/io/js/src/main/scala/fs2/io/internal/facade/child_process.scala @@ -45,7 +45,7 @@ private[io] object child_process { var cwd: js.UndefOr[String] = js.undefined var env: js.UndefOr[js.Dictionary[String]] = js.undefined - + var stdio: js.UndefOr[js.Any] = js.undefined } @js.native diff --git a/io/js/src/main/scala/fs2/io/internal/facade/fs.scala b/io/js/src/main/scala/fs2/io/internal/facade/fs.scala index 73e9b0e1d2..3521e863c2 100644 --- a/io/js/src/main/scala/fs2/io/internal/facade/fs.scala +++ b/io/js/src/main/scala/fs2/io/internal/facade/fs.scala @@ -48,6 +48,13 @@ private[io] object fs { @JSImport("fs", "createWriteStream") def createWriteStream(path: String, options: WriteStreamOptions): fs2.io.Writable = js.native + @js.native + @JSImport("fs", "openSync") + def openSync(path: String, flags: String): Int = js.native + + @js.native + @JSImport("fs", "closeSync") + def closeSync(fd: Int): Unit = js.native @js.native @JSImport("fs", "read") diff --git a/io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala index ce21b17c8d..876a17537e 100644 --- a/io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -33,65 +33,116 @@ import scala.scalajs.js.JSConverters._ private[process] trait ProcessesCompanionPlatform { def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] { - def spawn(process: ProcessBuilder): Resource[F, Process[F]] = - Resource { - F.async_[(Process[F], F[Unit])] { cb => - val childProcess = facade.child_process.spawn( - process.command, - process.args.toJSArray, - new facade.child_process.SpawnOptions { - cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString) - env = - if (process.inheritEnv) - (facade.process.env ++ process.extraEnv).toJSDictionary - else - process.extraEnv.toJSDictionary - } - ) + def spawn(process: ProcessBuilder): Resource[F, Process[F]] = { - val fs2Process = new UnsealedProcess[F] { + def open(redirect: Redirect, flags: String): Resource[F, js.Any] = + redirect match { + case Redirect.Pipe => Resource.pure("pipe") + case Redirect.Inherit => Resource.pure("inherit") + case Redirect.Discard => Resource.pure("ignore") + case Redirect.FromPath(path) => + Resource + .make(F.delay(facade.fs.openSync(path.toString, flags)))(fd => + F.delay(facade.fs.closeSync(fd)) + ) + .map(_.asInstanceOf[js.Any]) + case Redirect.ToPath(path, append) => + val f = if (append) "a" else "w" + Resource + .make(F.delay(facade.fs.openSync(path.toString, f)))(fd => + F.delay(facade.fs.closeSync(fd)) + ) + .map(_.asInstanceOf[js.Any]) + } - def isAlive: F[Boolean] = F.delay { - (childProcess.exitCode eq null) && (childProcess.signalCode eq null) - } + ( + open(process.stdin, "r"), + open(process.stdout, "w"), + open(process.stderr, "w") + ).tupled.flatMap { case (stdinO, stdoutO, stderrO) => + Resource { + F.async_[(Process[F], F[Unit])] { cb => + val childProcess = facade.child_process.spawn( + process.command, + process.args.toJSArray, + new facade.child_process.SpawnOptions { + cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString) + env = + if (process.inheritEnv) + (facade.process.env ++ process.extraEnv).toJSDictionary + else + process.extraEnv.toJSDictionary + stdio = js.Array( + stdinO, + stdoutO, + if (process.redirectErrorStream) stdoutO else stderrO + ) + } + ) - def exitValue: F[Int] = F.asyncCheckAttempt[Int] { cb => - F.delay { - (childProcess.exitCode: Any) match { - case i: Int => Right(i) - case _ => - val f: js.Function1[Any, Unit] = { - case i: Int => cb(Right(i)) - case _ => // do nothing - } - childProcess.once("exit", f) - Left(Some(F.delay(childProcess.removeListener("exit", f)))) + val fs2Process = new UnsealedProcess[F] { + + def isAlive: F[Boolean] = F.delay { + (childProcess.exitCode eq null) && (childProcess.signalCode eq null) + } + + def exitValue: F[Int] = F.asyncCheckAttempt[Int] { cb => + F.delay { + (childProcess.exitCode: Any) match { + case i: Int => Right(i) + case _ => + val f: js.Function1[Any, Unit] = { + case i: Int => cb(Right(i)) + case _ => // do nothing + } + childProcess.once("exit", f) + Left(Some(F.delay(childProcess.removeListener("exit", f)))) + } } } - } - def stdin = writeWritable(F.delay(childProcess.stdin)) + def stdin = childProcess.stdin match { + case null => _.drain + case s => writeWritable(F.delay(s)) + } - def stdout = unsafeReadReadable(childProcess.stdout) + def stdout = { + val out = childProcess.stdout match { + case null => Stream.empty + case s => unsafeReadReadable(s) + } + if (process.redirectErrorStream) { + val err = childProcess.stderr match { + case null => Stream.empty + case s => unsafeReadReadable(s) + } + out.merge(err) + } else out + } - def stderr = unsafeReadReadable(childProcess.stderr) - } + def stderr = childProcess.stderr match { + case null => Stream.empty + case s => if (process.redirectErrorStream) Stream.empty else unsafeReadReadable(s) + } + } - val finalize = F.asyncCheckAttempt[Unit] { cb => - F.delay { - if ((childProcess.exitCode ne null) || (childProcess.signalCode ne null)) { - Either.unit - } else { - childProcess.kill() - childProcess.once("exit", () => cb(Either.unit)) - Left(None) + val finalize = F.asyncCheckAttempt[Unit] { cb => + F.delay { + if ((childProcess.exitCode ne null) || (childProcess.signalCode ne null)) { + Either.unit + } else { + childProcess.kill() + childProcess.once("exit", () => cb(Either.unit)) + Left(None) + } } } - } - childProcess.once("spawn", () => cb(Right(fs2Process -> finalize))) - childProcess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e)))) + childProcess.once("spawn", () => cb(Right(fs2Process -> finalize))) + childProcess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e)))) + } } } + } } } diff --git a/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 0c9f5452bf..98d5c9b183 100644 --- a/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -49,6 +49,27 @@ private[process] trait ProcessesCompanionPlatform { env.put(k, v) } + def toJavaRedirect(redirect: Redirect): lang.ProcessBuilder.Redirect = + redirect match { + case Redirect.Pipe => lang.ProcessBuilder.Redirect.PIPE + case Redirect.Inherit => lang.ProcessBuilder.Redirect.INHERIT + case Redirect.Discard => + val devNull = + if (System.getProperty("os.name").toLowerCase.contains("windows")) "NUL" + else "/dev/null" + lang.ProcessBuilder.Redirect.to(new java.io.File(devNull)) + case Redirect.FromPath(path) => + lang.ProcessBuilder.Redirect.from(path.toNioPath.toFile) + case Redirect.ToPath(path, append) => + if (append) lang.ProcessBuilder.Redirect.appendTo(path.toNioPath.toFile) + else lang.ProcessBuilder.Redirect.to(path.toNioPath.toFile) + } + + builder.redirectInput(toJavaRedirect(process.stdin)) + builder.redirectOutput(toJavaRedirect(process.stdout)) + builder.redirectError(toJavaRedirect(process.stderr)) + builder.redirectErrorStream(process.redirectErrorStream) + builder.start() } } { process => @@ -64,31 +85,40 @@ private[process] trait ProcessesCompanionPlatform { F.unit ) } - .map { process => + .map { jProcess => new UnsealedProcess[F] { - def isAlive = F.delay(process.isAlive()) + def isAlive = F.delay(jProcess.isAlive()) def exitValue = isAlive.ifM( - evalOnVirtualThreadIfAvailable(F.interruptible(process.waitFor())), - F.delay(process.exitValue()) + evalOnVirtualThreadIfAvailable(F.interruptible(jProcess.waitFor())), + F.delay(jProcess.exitValue()) ) - def stdin = writeOutputStreamCancelable( - F.delay(process.getOutputStream()), - F.blocking(process.destroy()) - ) + def stdin = + if (process.stdin == Redirect.Pipe) + writeOutputStreamCancelable( + F.delay(jProcess.getOutputStream()), + F.blocking(jProcess.destroy()) + ) + else _.drain - def stdout = readInputStreamCancelable( - F.delay(process.getInputStream()), - F.blocking(process.destroy()), - 8192 - ) + def stdout = + if (process.stdout == Redirect.Pipe) + readInputStreamCancelable( + F.delay(jProcess.getInputStream()), + F.blocking(jProcess.destroy()), + 8192 + ) + else Stream.empty - def stderr = readInputStreamCancelable( - F.delay(process.getErrorStream()), - F.blocking(process.destroy()), - 8192 - ) + def stderr = + if (process.stderr == Redirect.Pipe && !process.redirectErrorStream) + readInputStreamCancelable( + F.delay(jProcess.getErrorStream()), + F.blocking(jProcess.destroy()), + 8192 + ) + else Stream.empty } } diff --git a/io/shared/src/main/scala/fs2/io/process/ProcessBuilder.scala b/io/shared/src/main/scala/fs2/io/process/ProcessBuilder.scala index 7ab5a19eeb..2fe41aa42d 100644 --- a/io/shared/src/main/scala/fs2/io/process/ProcessBuilder.scala +++ b/io/shared/src/main/scala/fs2/io/process/ProcessBuilder.scala @@ -25,6 +25,20 @@ package process import cats.effect.kernel.Resource import fs2.io.file.Path +sealed trait Redirect + +object Redirect { + case object Pipe extends Redirect + case object Inherit extends Redirect + case object Discard extends Redirect + case class FromPath(path: Path) extends Redirect + case class ToPath(path: Path, append: Boolean) extends Redirect + + def fromPath(path: Path): Redirect = FromPath(path) + def toPath(path: Path, append: Boolean = false): Redirect = ToPath(path, append) + def discard: Redirect = Discard +} + sealed abstract class ProcessBuilder private { /** Command to run. */ @@ -49,6 +63,18 @@ sealed abstract class ProcessBuilder private { */ def workingDirectory: Option[Path] + /** Redirection for `stdin`. Defaults to [[Redirect.Pipe]]. */ + def stdin: Redirect = Redirect.Pipe + + /** Redirection for `stdout`. Defaults to [[Redirect.Pipe]]. */ + def stdout: Redirect = Redirect.Pipe + + /** Redirection for `stderr`. Defaults to [[Redirect.Pipe]]. */ + def stderr: Redirect = Redirect.Pipe + + /** Whether to merge `stderr` into `stdout`. Defaults to `false`. */ + def redirectErrorStream: Boolean = false + /** @see [[command]] */ def withCommand(command: String): ProcessBuilder @@ -67,6 +93,34 @@ sealed abstract class ProcessBuilder private { /** @see [[workingDirectory]] */ def withCurrentWorkingDirectory: ProcessBuilder + /** @see [[stdin]] */ + def withStdin(stdin: Redirect): ProcessBuilder = { + val _ = stdin + this + } + + /** @see [[stdout]] */ + def withStdout(stdout: Redirect): ProcessBuilder = { + val _ = stdout + this + } + + /** @see [[stderr]] */ + def withStderr(stderr: Redirect): ProcessBuilder = { + val _ = stderr + this + } + + /** @see [[redirectErrorStream]] */ + def withRedirectErrorStream(redirectErrorStream: Boolean): ProcessBuilder = { + val _ = redirectErrorStream + this + } + + /** Sets `stdin`, `stdout`, and `stderr` to [[Redirect.Inherit]]. */ + def inheritStdio: ProcessBuilder = + withStdin(Redirect.Inherit).withStdout(Redirect.Inherit).withStderr(Redirect.Inherit) + /** Starts the process and returns a handle for interacting with it. * Closing the resource will kill the process if it has not already terminated. */ @@ -77,7 +131,17 @@ sealed abstract class ProcessBuilder private { object ProcessBuilder { def apply(command: String, args: List[String]): ProcessBuilder = - ProcessBuilderImpl(command, args, true, Map.empty, None) + ProcessBuilderImpl( + command, + args, + true, + Map.empty, + None, + Redirect.Pipe, + Redirect.Pipe, + Redirect.Pipe, + false + ) def apply(command: String, args: String*): ProcessBuilder = apply(command, args.toList) @@ -87,7 +151,11 @@ object ProcessBuilder { args: List[String], inheritEnv: Boolean, extraEnv: Map[String, String], - workingDirectory: Option[Path] + workingDirectory: Option[Path], + override val stdin: Redirect, + override val stdout: Redirect, + override val stderr: Redirect, + override val redirectErrorStream: Boolean ) extends ProcessBuilder { def withCommand(command: String): ProcessBuilder = copy(command = command) @@ -101,6 +169,15 @@ object ProcessBuilder { def withWorkingDirectory(workingDirectory: Path): ProcessBuilder = copy(workingDirectory = Some(workingDirectory)) def withCurrentWorkingDirectory: ProcessBuilder = copy(workingDirectory = None) + + override def withStdin(stdin: Redirect): ProcessBuilder = copy(stdin = stdin) + override def withStdout(stdout: Redirect): ProcessBuilder = copy(stdout = stdout) + override def withStderr(stderr: Redirect): ProcessBuilder = copy(stderr = stderr) + override def withRedirectErrorStream(redirectErrorStream: Boolean): ProcessBuilder = + copy(redirectErrorStream = redirectErrorStream) + + override def inheritStdio: ProcessBuilder = + copy(stdin = Redirect.Inherit, stdout = Redirect.Inherit, stderr = Redirect.Inherit) } } diff --git a/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala b/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala index 118aa0edd1..c9061cfbdb 100644 --- a/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala +++ b/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala @@ -188,4 +188,95 @@ class ProcessSuite extends Fs2Suite { } } + test("redirectErrorStream") { + ProcessBuilder( + "node", + "-e", + "console.log('stdout'); console.error('stderr')" + ).withRedirectErrorStream(true) + .spawn[IO] + .use { p => + p.stdout + .through(fs2.text.utf8.decode) + .compile + .string + .assertEquals("stdout\nstderr\n") + } + } + + test("Redirect.ToPath") { + Files[IO].tempFile.use { path => + ProcessBuilder("echo", "hello").withStdout(Redirect.toPath(path)).spawn[IO].use { p => + p.exitValue *> + Files[IO] + .readAll(path) + .through(fs2.text.utf8.decode) + .compile + .string + .assertEquals("hello\n") *> + p.stdout.compile.toVector.assertEquals(Vector.empty) *> + p.stderr.compile.toVector.assertEquals(Vector.empty) + } + } + } + + test("Redirect.ToPath append") { + Files[IO].tempFile.use { path => + val msg1 = "hello\n" + val msg2 = "world\n" + Stream + .emit(msg1) + .through(fs2.text.utf8.encode) + .through(Files[IO].writeAll(path)) + .compile + .drain *> + ProcessBuilder("echo", "world") + .withStdout(Redirect.toPath(path, append = true)) + .spawn[IO] + .use { p => + p.exitValue *> + Files[IO] + .readAll(path) + .through(fs2.text.utf8.decode) + .compile + .string + .assertEquals(msg1 + msg2) + } + } + } + + test("Redirect.FromPath") { + Files[IO].tempFile.use { path => + val msg = "hello from file" + Stream + .emit(msg) + .through(fs2.text.utf8.encode) + .through(Files[IO].writeAll(path)) + .compile + .drain *> + ProcessBuilder("cat").withStdin(Redirect.fromPath(path)).spawn[IO].use { p => + p.stdout.through(fs2.text.utf8.decode).compile.string.assertEquals(msg) <* p.exitValue + } + } + } + + test("Redirect.Discard") { + ProcessBuilder("echo", "hello").withStdout(Redirect.discard).spawn[IO].use { p => + p.exitValue.assertEquals(0) *> + p.stdout.compile.toVector.assertEquals(Vector.empty) + } + } + + test("redirectErrorStream empty stderr") { + ProcessBuilder( + "node", + "-e", + "console.log('stdout'); console.error('stderr')" + ).withRedirectErrorStream(true) + .spawn[IO] + .use { p => + p.stderr.compile.toVector.assertEquals(Vector.empty) + } + } + }