PumpStreamHandler.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.exec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedOutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.exec.util.DebugUtils;
/**
* Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback
* from that stream will be lost.
*/
public class PumpStreamHandler implements ExecuteStreamHandler {
private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2);
private Thread outputThread;
private Thread errorThread;
private Thread inputThread;
private final OutputStream outputStream;
private final OutputStream errorOutputStream;
private final InputStream inputStream;
private InputStreamPumper inputStreamPumper;
/** The timeout Duration the implementation waits when stopping the pumper threads. */
private Duration stopTimeout = Duration.ZERO;
/** The last exception being caught. */
private IOException caught;
/**
* The thread factory.
*/
private final ThreadFactory threadFactory;
/**
* Constructs a new {@link PumpStreamHandler}.
*/
public PumpStreamHandler() {
this(System.out, System.err);
}
/**
* Constructs a new {@link PumpStreamHandler}.
*
* @param allOutputStream the output/error {@link OutputStream}.
*/
public PumpStreamHandler(final OutputStream allOutputStream) {
this(allOutputStream, allOutputStream);
}
/**
* Constructs a new {@link PumpStreamHandler}.
*
* @param outputStream the output {@link OutputStream}.
* @param errorOutputStream the error {@link OutputStream}.
*/
public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) {
this(outputStream, errorOutputStream, null);
}
/**
* Constructs a new {@link PumpStreamHandler}.
*
* @param outputStream the output {@link OutputStream}.
* @param errorOutputStream the error {@link OutputStream}.
* @param inputStream the input {@link InputStream}.
*/
public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) {
this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream);
}
/**
* Constructs a new {@link PumpStreamHandler}.
*
* @param outputStream the output {@link OutputStream}.
* @param errorOutputStream the error {@link OutputStream}.
* @param inputStream the input {@link InputStream}.
*/
private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream,
final InputStream inputStream) {
this.threadFactory = threadFactory;
this.outputStream = outputStream;
this.errorOutputStream = errorOutputStream;
this.inputStream = inputStream;
}
/**
* Create the pump to handle error output.
*
* @param is the {@link InputStream}.
* @param os the {@link OutputStream}.
*/
protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
errorThread = createPump(is, os);
}
/**
* Create the pump to handle process output.
*
* @param is the {@link InputStream}.
* @param os the {@link OutputStream}.
*/
protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
outputThread = createPump(is, os);
}
/**
* Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterwards
* to avoid an IOException ("Write end dead").
*
* @param is the input stream to copy from.
* @param os the output stream to copy into.
* @return the stream pumper thread.
*/
protected Thread createPump(final InputStream is, final OutputStream os) {
return createPump(is, os, os instanceof PipedOutputStream);
}
/**
* Creates a stream pumper to copy the given input stream to the given output stream.
*
* @param is the input stream to copy from.
* @param os the output stream to copy into.
* @param closeWhenExhausted close the output stream when the input stream is exhausted.
* @return the stream pumper thread.
*/
protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true);
}
/**
* Creates a stream pumper to copy the given input stream to the given output stream.
*
* @param is the System.in input stream to copy from.
* @param os the output stream to copy into.
* @return the stream pumper thread.
*/
private Thread createSystemInPump(final InputStream is, final OutputStream os) {
inputStreamPumper = new InputStreamPumper(is, os);
return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true);
}
/**
* Gets the error stream.
*
* @return {@link OutputStream}.
*/
protected OutputStream getErr() {
return errorOutputStream;
}
/**
* Gets the output stream.
*
* @return {@link OutputStream}.
*/
protected OutputStream getOut() {
return outputStream;
}
Duration getStopTimeout() {
return stopTimeout;
}
/**
* Sets the {@link InputStream} from which to read the standard error of the process.
*
* @param is the {@link InputStream}.
*/
@Override
public void setProcessErrorStream(final InputStream is) {
if (errorOutputStream != null) {
createProcessErrorPump(is, errorOutputStream);
}
}
/**
* Sets the {@link OutputStream} by means of which input can be sent to the process.
*
* @param os the {@link OutputStream}.
*/
@Override
public void setProcessInputStream(final OutputStream os) {
if (inputStream != null) {
if (inputStream == System.in) {
inputThread = createSystemInPump(inputStream, os);
} else {
inputThread = createPump(inputStream, os, true);
}
} else {
try {
os.close();
} catch (final IOException e) {
final String msg = "Got exception while closing output stream";
DebugUtils.handleException(msg, e);
}
}
}
/**
* Sets the {@link InputStream} from which to read the standard output of the process.
*
* @param is the {@link InputStream}.
*/
@Override
public void setProcessOutputStream(final InputStream is) {
if (outputStream != null) {
createProcessOutputPump(is, outputStream);
}
}
/**
* Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
*
* @param timeout timeout or zero to wait forever (default).
* @since 1.4.0
*/
public void setStopTimeout(final Duration timeout) {
this.stopTimeout = timeout != null ? timeout : Duration.ZERO;
}
/**
* Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called.
*
* @param timeout timeout in milliseconds or zero to wait forever (default).
* @deprecated Use {@link #setStopTimeout(Duration)}.
*/
@Deprecated
public void setStopTimeout(final long timeout) {
this.stopTimeout = Duration.ofMillis(timeout);
}
/**
* Starts the {@link Thread}s.
*/
@Override
public void start() {
start(outputThread);
start(errorThread);
start(inputThread);
}
/**
* Starts the given {@link Thread}.
*/
private void start(final Thread thread) {
if (thread != null) {
thread.start();
}
}
/**
* Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated.
*/
@Override
public void stop() throws IOException {
if (inputStreamPumper != null) {
inputStreamPumper.stopProcessing();
}
stop(outputThread, stopTimeout);
stop(errorThread, stopTimeout);
stop(inputThread, stopTimeout);
if (errorOutputStream != null && errorOutputStream != outputStream) {
try {
errorOutputStream.flush();
} catch (final IOException e) {
final String msg = "Got exception while flushing the error stream : " + e.getMessage();
DebugUtils.handleException(msg, e);
}
}
if (outputStream != null) {
try {
outputStream.flush();
} catch (final IOException e) {
final String msg = "Got exception while flushing the output stream";
DebugUtils.handleException(msg, e);
}
}
if (caught != null) {
throw caught;
}
}
/**
* Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
* was exceeded an IOException is created to be thrown to the caller.
*
* @param thread the thread to be stopped.
* @param timeout the time in ms to wait to join.
*/
private void stop(final Thread thread, final Duration timeout) {
if (thread != null) {
try {
if (timeout.equals(Duration.ZERO)) {
thread.join();
} else {
final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION);
final Instant startTime = Instant.now();
thread.join(timeToWait.toMillis());
if (Instant.now().isAfter(startTime.plus(timeToWait))) {
caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE);
}
}
} catch (final InterruptedException e) {
thread.interrupt();
}
}
}
/**
* Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout
* was exceeded an IOException is created to be thrown to the caller.
*
* @param thread the thread to be stopped.
* @param timeoutMillis the time in ms to wait to join.
*/
protected void stopThread(final Thread thread, final long timeoutMillis) {
stop(thread, Duration.ofMillis(timeoutMillis));
}
}