Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,32 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
"fs2.io.net.tls.TLSContext#Builder.fs2$io$net$tls$TLSContextCompanionPlatform$BuilderPlatform$$$outer"
),
// Process stream redirection: #3170
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.stdin"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.stdout"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.stderr"),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.io.process.ProcessBuilder.redirectErrorStream"
),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.withStdin"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.withStdout"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.process.ProcessBuilder.withStderr"),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.io.process.ProcessBuilder.withRedirectErrorStream"
),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.io.process.ProcessBuilder.inheritStdio"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.io.process.ProcessBuilder#ProcessBuilderImpl.copy"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.io.process.ProcessBuilder#ProcessBuilderImpl.this"
),
ProblemFilters.exclude[MissingTypesProblem]("fs2.io.process.ProcessBuilder$ProcessBuilderImpl$"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.io.process.ProcessBuilder#ProcessBuilderImpl.apply"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[io] object child_process {
var cwd: js.UndefOr[String] = js.undefined

var env: js.UndefOr[js.Dictionary[String]] = js.undefined

var stdio: js.UndefOr[js.Any] = js.undefined
}

@js.native
Expand Down
7 changes: 7 additions & 0 deletions io/js/src/main/scala/fs2/io/internal/facade/fs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ private[io] object fs {
@JSImport("fs", "createWriteStream")
def createWriteStream(path: String, options: WriteStreamOptions): fs2.io.Writable =
js.native
@js.native
@JSImport("fs", "openSync")
def openSync(path: String, flags: String): Int = js.native

@js.native
@JSImport("fs", "closeSync")
def closeSync(fd: Int): Unit = js.native

@js.native
@JSImport("fs", "read")
Expand Down
143 changes: 97 additions & 46 deletions io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,65 +33,116 @@ import scala.scalajs.js.JSConverters._

private[process] trait ProcessesCompanionPlatform {
def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] {
def spawn(process: ProcessBuilder): Resource[F, Process[F]] =
Resource {
F.async_[(Process[F], F[Unit])] { cb =>
val childProcess = facade.child_process.spawn(
process.command,
process.args.toJSArray,
new facade.child_process.SpawnOptions {
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString)
env =
if (process.inheritEnv)
(facade.process.env ++ process.extraEnv).toJSDictionary
else
process.extraEnv.toJSDictionary
}
)
def spawn(process: ProcessBuilder): Resource[F, Process[F]] = {

val fs2Process = new UnsealedProcess[F] {
def open(redirect: Redirect, flags: String): Resource[F, js.Any] =
redirect match {
case Redirect.Pipe => Resource.pure("pipe")
case Redirect.Inherit => Resource.pure("inherit")
case Redirect.Discard => Resource.pure("ignore")
case Redirect.FromPath(path) =>
Resource
.make(F.delay(facade.fs.openSync(path.toString, flags)))(fd =>
F.delay(facade.fs.closeSync(fd))
)
.map(_.asInstanceOf[js.Any])
case Redirect.ToPath(path, append) =>
val f = if (append) "a" else "w"
Resource
.make(F.delay(facade.fs.openSync(path.toString, f)))(fd =>
F.delay(facade.fs.closeSync(fd))
)
.map(_.asInstanceOf[js.Any])
}

def isAlive: F[Boolean] = F.delay {
(childProcess.exitCode eq null) && (childProcess.signalCode eq null)
}
(
open(process.stdin, "r"),
open(process.stdout, "w"),
open(process.stderr, "w")
).tupled.flatMap { case (stdinO, stdoutO, stderrO) =>
Resource {
F.async_[(Process[F], F[Unit])] { cb =>
val childProcess = facade.child_process.spawn(
process.command,
process.args.toJSArray,
new facade.child_process.SpawnOptions {
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString)
env =
if (process.inheritEnv)
(facade.process.env ++ process.extraEnv).toJSDictionary
else
process.extraEnv.toJSDictionary
stdio = js.Array(
stdinO,
stdoutO,
if (process.redirectErrorStream) stdoutO else stderrO
)
}
)

def exitValue: F[Int] = F.asyncCheckAttempt[Int] { cb =>
F.delay {
(childProcess.exitCode: Any) match {
case i: Int => Right(i)
case _ =>
val f: js.Function1[Any, Unit] = {
case i: Int => cb(Right(i))
case _ => // do nothing
}
childProcess.once("exit", f)
Left(Some(F.delay(childProcess.removeListener("exit", f))))
val fs2Process = new UnsealedProcess[F] {

def isAlive: F[Boolean] = F.delay {
(childProcess.exitCode eq null) && (childProcess.signalCode eq null)
}

def exitValue: F[Int] = F.asyncCheckAttempt[Int] { cb =>
F.delay {
(childProcess.exitCode: Any) match {
case i: Int => Right(i)
case _ =>
val f: js.Function1[Any, Unit] = {
case i: Int => cb(Right(i))
case _ => // do nothing
}
childProcess.once("exit", f)
Left(Some(F.delay(childProcess.removeListener("exit", f))))
}
}
}
}

def stdin = writeWritable(F.delay(childProcess.stdin))
def stdin = childProcess.stdin match {
case null => _.drain
case s => writeWritable(F.delay(s))
}

def stdout = unsafeReadReadable(childProcess.stdout)
def stdout = {
val out = childProcess.stdout match {
case null => Stream.empty
case s => unsafeReadReadable(s)
}
if (process.redirectErrorStream) {
val err = childProcess.stderr match {
case null => Stream.empty
case s => unsafeReadReadable(s)
}
out.merge(err)
} else out
}

def stderr = unsafeReadReadable(childProcess.stderr)
}
def stderr = childProcess.stderr match {
case null => Stream.empty
case s => if (process.redirectErrorStream) Stream.empty else unsafeReadReadable(s)
}
}

val finalize = F.asyncCheckAttempt[Unit] { cb =>
F.delay {
if ((childProcess.exitCode ne null) || (childProcess.signalCode ne null)) {
Either.unit
} else {
childProcess.kill()
childProcess.once("exit", () => cb(Either.unit))
Left(None)
val finalize = F.asyncCheckAttempt[Unit] { cb =>
F.delay {
if ((childProcess.exitCode ne null) || (childProcess.signalCode ne null)) {
Either.unit
} else {
childProcess.kill()
childProcess.once("exit", () => cb(Either.unit))
Left(None)
}
}
}
}

childProcess.once("spawn", () => cb(Right(fs2Process -> finalize)))
childProcess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e))))
childProcess.once("spawn", () => cb(Right(fs2Process -> finalize)))
childProcess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e))))
}
}
}
}
}
}
66 changes: 48 additions & 18 deletions io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,27 @@ private[process] trait ProcessesCompanionPlatform {
env.put(k, v)
}

def toJavaRedirect(redirect: Redirect): lang.ProcessBuilder.Redirect =
redirect match {
case Redirect.Pipe => lang.ProcessBuilder.Redirect.PIPE
case Redirect.Inherit => lang.ProcessBuilder.Redirect.INHERIT
case Redirect.Discard =>
val devNull =
if (System.getProperty("os.name").toLowerCase.contains("windows")) "NUL"
else "/dev/null"
lang.ProcessBuilder.Redirect.to(new java.io.File(devNull))
case Redirect.FromPath(path) =>
lang.ProcessBuilder.Redirect.from(path.toNioPath.toFile)
case Redirect.ToPath(path, append) =>
if (append) lang.ProcessBuilder.Redirect.appendTo(path.toNioPath.toFile)
else lang.ProcessBuilder.Redirect.to(path.toNioPath.toFile)
}

builder.redirectInput(toJavaRedirect(process.stdin))
builder.redirectOutput(toJavaRedirect(process.stdout))
builder.redirectError(toJavaRedirect(process.stderr))
builder.redirectErrorStream(process.redirectErrorStream)

builder.start()
}
} { process =>
Expand All @@ -64,31 +85,40 @@ private[process] trait ProcessesCompanionPlatform {
F.unit
)
}
.map { process =>
.map { jProcess =>
new UnsealedProcess[F] {
def isAlive = F.delay(process.isAlive())
def isAlive = F.delay(jProcess.isAlive())

def exitValue = isAlive.ifM(
evalOnVirtualThreadIfAvailable(F.interruptible(process.waitFor())),
F.delay(process.exitValue())
evalOnVirtualThreadIfAvailable(F.interruptible(jProcess.waitFor())),
F.delay(jProcess.exitValue())
)

def stdin = writeOutputStreamCancelable(
F.delay(process.getOutputStream()),
F.blocking(process.destroy())
)
def stdin =
if (process.stdin == Redirect.Pipe)
writeOutputStreamCancelable(
F.delay(jProcess.getOutputStream()),
F.blocking(jProcess.destroy())
)
else _.drain

def stdout = readInputStreamCancelable(
F.delay(process.getInputStream()),
F.blocking(process.destroy()),
8192
)
def stdout =
if (process.stdout == Redirect.Pipe)
readInputStreamCancelable(
F.delay(jProcess.getInputStream()),
F.blocking(jProcess.destroy()),
8192
)
else Stream.empty

def stderr = readInputStreamCancelable(
F.delay(process.getErrorStream()),
F.blocking(process.destroy()),
8192
)
def stderr =
if (process.stderr == Redirect.Pipe && !process.redirectErrorStream)
readInputStreamCancelable(
F.delay(jProcess.getErrorStream()),
F.blocking(jProcess.destroy()),
8192
)
else Stream.empty

}
}
Expand Down
Loading