Redirecting input, output and error

Similarly to customization, redirection is also implemented with capability traits. The ProcessIO type returned by the Process constructor implements all the three redirection capability traits:

  • RedirectableInput marks that the standard input of the process is not bound yet
  • RedirectableOutput marks that the standard output of the process is not bound yet
  • RedirectableError marks that the standard error output of the process is not bound yet

Each of the three channels can be only redirected once. The result type of each redirection method no longer implements the given capability.

Let’s see an example of this (redirection methods are described below on this page):

import zio._
import zio.stream._
import zio.prelude._

val proc1 = Process("echo", List("Hello world"))
// proc1: Process.ProcessImpl = ProcessImpl(
//   command = "echo",
//   arguments = List("Hello world"),
//   workingDirectory = None,
//   environmentVariables = Map(),
//   removedEnvironmentVariables = Set(),
//   outputRedirection = StdOut(),
//   runOutputStream = io.github.vigoo.prox.ProcessModule$Process$$$Lambda$10720/0x00000008026f7040@3281308b,
//   errorRedirection = StdOut(),
//   runErrorStream = io.github.vigoo.prox.ProcessModule$Process$$$Lambda$10721/0x0000000802701040@65fd9c21,
//   inputRedirection = StdIn()
// )
val proc2 = proc1 ># ZPipeline.utf8Decode
// proc2: Process.ProcessImplO[String] = ProcessImplO(
//   command = "echo",
//   arguments = List("Hello world"),
//   workingDirectory = None,
//   environmentVariables = Map(),
//   removedEnvironmentVariables = Set(),
//   outputRedirection = OutputStreamThroughPipe(
//     pipe = io.github.vigoo.prox.ProxZStream$$Lambda$10979/0x00000008031db840@30088784,
//     runner = io.github.vigoo.prox.RedirectionModule$RedirectableOutput$$Lambda$10898/0x0000000803107040@202f727d,
//     chunkSize = 8192
//   ),
//   runOutputStream = io.github.vigoo.prox.ProcessModule$Process$ProcessImpl$$Lambda$10724/0x00000008026f7840@1b110b9b,
//   errorRedirection = StdOut(),
//   runErrorStream = io.github.vigoo.prox.ProcessModule$Process$$$Lambda$10721/0x0000000802701040@65fd9c21,
//   inputRedirection = StdIn()
// )

It is no longer possible to redirect the output of proc2:

val proc3 = proc2 >? (ZPipeline.utf8Decode >>> ZPipeline.splitLines) 
// error: value >? is not a member of io.github.vigoo.prox.zstream.Process.ProcessImplO[String]
// did you mean !>??
// val proc3 = proc2 >? (ZPipeline.utf8Decode >>> ZPipeline.splitLines) 
//             ^^^^^^^^

Many redirection methods have an operator version but all of them have alphanumberic variants as well.

Input redirection

Input redirection is enabled by the RedirectableInput trait. The following operations are supported:

operator alternative parameter type what it does
< fromFile java.nio.file.Path Natively attach a source file to STDIN
< fromStream ZStream[Any, ProxError, Byte] Attach a ZIO byte stream to STDIN
!< fromStream ZStream[Any, ProxError, Byte] Attach a ZIO byte stream to STDIN and flush after each chunk

Output redirection

Output redirection is enabled by the RedirectableOutput trait. The following operations are supported:

operator alternative parameter type result type what it does
> toFile java.nio.file.Path Unit Natively attach STDOUT to a file
>> appendToFile java.nio.file.Path Unit Natively attach STDOUT to a file in append mode
> toSink TransformAndSink[Byte, _] Unit Drains the STDOUT through the given sink
># toFoldMonoid [O: Identity](ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) O Sends STDOUT through the stream and folds the result using its monoid instance
>? toVector ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) Vector[O] Sends STDOUT through the stream and collects the results
  drainOutput ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) Unit Drains the STDOUT through the given stream
  foldOutput ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]), R, (R, O) => R R Sends STDOUT through the stream and folds the result using a custom fold function

All the variants that accept a stream transformation (ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O])) are also usable by directly passing a ZPipeline.

TransformAndSink encapsulates a stream transformation and a unit sink. It is possible to use a sink directly if transformation is not needed.

case class TransformAndSink[A, B](transform: ZStream[Any, ProxError, A] => ZStream[Any, ProxError, B],
                                  sink: ZSink[Any, ProxError, B, Any, Unit])

Error redirection

Error redirection is enabled by the RedirectableError trait. The following operations are supported:

operator alternative parameter type result type what it does
!> errorToFile java.nio.file.Path Unit Natively attach STDERR to a file
!>> appendErrorToFile java.nio.file.Path Unit Natively attach STDERR to a file in append mode
!> errorToSink TransformAndSink[Byte, _] Unit Drains the STDERR through the given sink
!># errorToFoldMonoid [O: Monoid](ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) O Sends STDERR through the pipe and folds the result using its monoid instance
!>? errorToVector ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) Vector[O] Sends STDERR through the pipe and collects the results
  drainError ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) Unit Drains the STDERR through the given pipe
  foldError ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]), R, (R, O) => R R Sends STDERR through the pipe and folds the result using a custom fold function

Redirection for process groups

Process groups are two or more processes attached together through pipes. This connection is internally implemented using the above described redirection capabilities. This means that all but the first process has their inputs bound, and all but the last one has their outputs bound. Redirection of input and output for a process group is thus a well defined operation meaning redirection of input of the first process and redirection of output of the last process.

For this reason the class created via process piping implements the RedirectableInput and RedirectableOutput traits described above.

For the sake of simplicity the library does not support anymore the fully customizable per-process error redirection for process groups, but a reduced but still quite expressive version described by the RedirectableErrors trait.

The methods in this trait define error redirection for all process in the group at once:

operator alternative parameter type result type what it does
!> errorsToSink TransformAndSink[Byte, _] Unit Drains the STDERR through the given sink
!># errorsToFoldMonoid [O: Monoid](ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) O Sends STDERR through the stream and folds the result using its monoid instance
!>? errorsToVector ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) Vector[O] Sends STDERR through the stream and collects the results
  drainErrors ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) Unit Drains the STDERR through the given stream
  foldErrors ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]), R, (R, O) => R R Sends STDERR through the stream and folds the result using a custom fold function

Redirection to file is not possible through this interface as only a single path could be provided. The result of these redirections is accessible through the ProcessGroupResult interface as it is described in the running processes section.

By using the RedirectableErrors.customizedPerProcess interface (having the type RedirectableErrors.CustomizedPerProcess) it is possible to customize the redirection targets per process while keeping their types uniform:

operator alternative parameter type result type what it does
  errorsToFile Process => java.nio.file.Path Unit Natively attach STDERR to a file
  appendErrorsToFile Process => java.nio.file.Path Unit Natively attach STDERR to a file in append mode
  errorsToSink Process => TransformAndSink[Byte, _] Unit Drains the STDERR through the given sink
  errorsToFoldMonoid Process => [O: Monoid](ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) O Sends STDERR through the stream and folds the result using its monoid instance
  errorsToVector Process => ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) Vector[O] Sends STDERR through the stream and collects the results
  drainErrors Process => ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]) Unit Drains the STDERR through the given stream
  foldErrors Process => ZStream[Any, ProxError, Byte] => ZStream[Any, ProxError, O]), R, (R, O) => R R Sends STDERR through the stream and folds the result using a custom fold function

Let’s see an example of how this works!

First we define a queue where we want to send error lines from all the involved processes, then we define the two processes separately, connect them with a pipe and customize the error redirection where we prefix the parsed lines based on which process they came from:


for {
  errors <- Queue.unbounded[String]
  parseLines = (s: ZStream[Any, ProxError, Byte]) => s.via(ZPipeline.utf8Decode.mapError(UnknownProxError.apply) >>> ZPipeline.splitLines)
 
  p1 = Process("proc1")
  p2 = Process("proc2")
  group = (p1 | p2).customizedPerProcess.errorsToSink {
    case p if p == p1 => TransformAndSink(parseLines.andThen(_.map(s => "P1: " + s)), ZSink.foreach(errors.offer))
    case p if p == p2 => TransformAndSink(parseLines.andThen(_.map(s => "P2: " + s)), ZSink.foreach(errors.offer))
  }
} yield ()

Creating reusable functions

The Process object contains several useful type aliases for writing functions that work with any process by only specifying what redirection channels we want unbounded.

The UnboundProcess represents a process which is fully unbound, no redirection has been done yet. It is defined as follows:

type UnboundProcess = Process[Unit, Unit]
    with RedirectableInput[UnboundOEProcess]
    with RedirectableOutput[UnboundIEProcess[*]]
    with RedirectableError[UnboundIOProcess[*]]

where UnboundIOProcess[E] for example represents a process which has its error output already bound.

These type aliases can be used to define functions performing redirection on arbitrary processes, for example:

def logErrors[P <: Process.UnboundEProcess[_]](proc: P) = {
   val target = TransformAndSink(
     ZPipeline.utf8Decode.mapError(UnknownProxError.apply) >>> ZPipeline.splitLines,
     ZSink.foreach((line: String) => ZIO.debug(line))) 
   proc !> target 
}

val proc4 = logErrors(Process("something"))
// proc4: Process[_, Unit] = ProcessImplE(
//   command = "something",
//   arguments = List(),
//   workingDirectory = None,
//   environmentVariables = Map(),
//   removedEnvironmentVariables = Set(),
//   outputRedirection = StdOut(),
//   runOutputStream = io.github.vigoo.prox.ProcessModule$Process$$$Lambda$10720/0x00000008026f7040@1fdd241b,
//   errorRedirection = OutputStreamToSink(
//     sink = TransformAndSink(
//       transform = io.github.vigoo.prox.ProxZStream$TransformAndSink$$$Lambda$10991/0x0000000803237040@440a1cdc,
//       sink = zio.stream.ZSink@6e1f2307
//     ),
//     chunkSize = 8192
//   ),
//   runErrorStream = io.github.vigoo.prox.ProcessModule$Process$ProcessImpl$$Lambda$10902/0x0000000803103840@1c3ed5df,
//   inputRedirection = StdIn()
// )