001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.commons.exec; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.OutputStream; 023import java.io.PipedOutputStream; 024import java.time.Duration; 025import java.time.Instant; 026import java.util.concurrent.Executors; 027import java.util.concurrent.ThreadFactory; 028 029import org.apache.commons.exec.util.DebugUtils; 030 031/** 032 * Copies standard output and error of sub-processes to standard output and error of the parent process. If output or error stream are set to null, any feedback 033 * from that stream will be lost. 034 */ 035public class PumpStreamHandler implements ExecuteStreamHandler { 036 037 private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2); 038 039 private Thread outputThread; 040 041 private Thread errorThread; 042 043 private Thread inputThread; 044 045 private final OutputStream outputStream; 046 047 private final OutputStream errorOutputStream; 048 049 private final InputStream inputStream; 050 051 private InputStreamPumper inputStreamPumper; 052 053 /** The timeout Duration the implementation waits when stopping the pumper threads. */ 054 private Duration stopTimeout = Duration.ZERO; 055 056 /** The last exception being caught. */ 057 private IOException caught; 058 059 /** 060 * The thread factory. 061 */ 062 private final ThreadFactory threadFactory; 063 064 /** 065 * Constructs a new {@link PumpStreamHandler}. 066 */ 067 public PumpStreamHandler() { 068 this(System.out, System.err); 069 } 070 071 /** 072 * Constructs a new {@link PumpStreamHandler}. 073 * 074 * @param allOutputStream the output/error {@link OutputStream}. 075 */ 076 public PumpStreamHandler(final OutputStream allOutputStream) { 077 this(allOutputStream, allOutputStream); 078 } 079 080 /** 081 * Constructs a new {@link PumpStreamHandler}. 082 * 083 * @param outputStream the output {@link OutputStream}. 084 * @param errorOutputStream the error {@link OutputStream}. 085 */ 086 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) { 087 this(outputStream, errorOutputStream, null); 088 } 089 090 /** 091 * Constructs a new {@link PumpStreamHandler}. 092 * 093 * @param outputStream the output {@link OutputStream}. 094 * @param errorOutputStream the error {@link OutputStream}. 095 * @param inputStream the input {@link InputStream}. 096 */ 097 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) { 098 this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream); 099 } 100 101 /** 102 * Constructs a new {@link PumpStreamHandler}. 103 * 104 * @param outputStream the output {@link OutputStream}. 105 * @param errorOutputStream the error {@link OutputStream}. 106 * @param inputStream the input {@link InputStream}. 107 */ 108 private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream, 109 final InputStream inputStream) { 110 this.threadFactory = threadFactory; 111 this.outputStream = outputStream; 112 this.errorOutputStream = errorOutputStream; 113 this.inputStream = inputStream; 114 } 115 116 /** 117 * Create the pump to handle error output. 118 * 119 * @param is the {@link InputStream}. 120 * @param os the {@link OutputStream}. 121 */ 122 protected void createProcessErrorPump(final InputStream is, final OutputStream os) { 123 errorThread = createPump(is, os); 124 } 125 126 /** 127 * Create the pump to handle process output. 128 * 129 * @param is the {@link InputStream}. 130 * @param os the {@link OutputStream}. 131 */ 132 protected void createProcessOutputPump(final InputStream is, final OutputStream os) { 133 outputThread = createPump(is, os); 134 } 135 136 /** 137 * Creates a stream pumper to copy the given input stream to the given output stream. When the 'os' is an PipedOutputStream we are closing 'os' afterwards 138 * to avoid an IOException ("Write end dead"). 139 * 140 * @param is the input stream to copy from. 141 * @param os the output stream to copy into. 142 * @return the stream pumper thread. 143 */ 144 protected Thread createPump(final InputStream is, final OutputStream os) { 145 return createPump(is, os, os instanceof PipedOutputStream); 146 } 147 148 /** 149 * Creates a stream pumper to copy the given input stream to the given output stream. 150 * 151 * @param is the input stream to copy from. 152 * @param os the output stream to copy into. 153 * @param closeWhenExhausted close the output stream when the input stream is exhausted. 154 * @return the stream pumper thread. 155 */ 156 protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) { 157 return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true); 158 } 159 160 /** 161 * Creates a stream pumper to copy the given input stream to the given output stream. 162 * 163 * @param is the System.in input stream to copy from. 164 * @param os the output stream to copy into. 165 * @return the stream pumper thread. 166 */ 167 private Thread createSystemInPump(final InputStream is, final OutputStream os) { 168 inputStreamPumper = new InputStreamPumper(is, os); 169 return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true); 170 } 171 172 /** 173 * Gets the error stream. 174 * 175 * @return {@link OutputStream}. 176 */ 177 protected OutputStream getErr() { 178 return errorOutputStream; 179 } 180 181 /** 182 * Gets the output stream. 183 * 184 * @return {@link OutputStream}. 185 */ 186 protected OutputStream getOut() { 187 return outputStream; 188 } 189 190 Duration getStopTimeout() { 191 return stopTimeout; 192 } 193 194 /** 195 * Sets the {@link InputStream} from which to read the standard error of the process. 196 * 197 * @param is the {@link InputStream}. 198 */ 199 @Override 200 public void setProcessErrorStream(final InputStream is) { 201 if (errorOutputStream != null) { 202 createProcessErrorPump(is, errorOutputStream); 203 } 204 } 205 206 /** 207 * Sets the {@link OutputStream} by means of which input can be sent to the process. 208 * 209 * @param os the {@link OutputStream}. 210 */ 211 @Override 212 public void setProcessInputStream(final OutputStream os) { 213 if (inputStream != null) { 214 if (inputStream == System.in) { 215 inputThread = createSystemInPump(inputStream, os); 216 } else { 217 inputThread = createPump(inputStream, os, true); 218 } 219 } else { 220 try { 221 os.close(); 222 } catch (final IOException e) { 223 final String msg = "Got exception while closing output stream"; 224 DebugUtils.handleException(msg, e); 225 } 226 } 227 } 228 229 /** 230 * Sets the {@link InputStream} from which to read the standard output of the process. 231 * 232 * @param is the {@link InputStream}. 233 */ 234 @Override 235 public void setProcessOutputStream(final InputStream is) { 236 if (outputStream != null) { 237 createProcessOutputPump(is, outputStream); 238 } 239 } 240 241 /** 242 * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called. 243 * 244 * @param timeout timeout or zero to wait forever (default). 245 * @since 1.4.0 246 */ 247 public void setStopTimeout(final Duration timeout) { 248 this.stopTimeout = timeout != null ? timeout : Duration.ZERO; 249 } 250 251 /** 252 * Sets maximum time to wait until output streams are exhausted when {@link #stop()} was called. 253 * 254 * @param timeout timeout in milliseconds or zero to wait forever (default). 255 * @deprecated Use {@link #setStopTimeout(Duration)}. 256 */ 257 @Deprecated 258 public void setStopTimeout(final long timeout) { 259 this.stopTimeout = Duration.ofMillis(timeout); 260 } 261 262 /** 263 * Starts the {@link Thread}s. 264 */ 265 @Override 266 public void start() { 267 start(outputThread); 268 start(errorThread); 269 start(inputThread); 270 } 271 272 /** 273 * Starts the given {@link Thread}. 274 */ 275 private void start(final Thread thread) { 276 if (thread != null) { 277 thread.start(); 278 } 279 } 280 281 /** 282 * Stops pumping the streams. When a timeout is specified it is not guaranteed that the pumper threads are cleanly terminated. 283 */ 284 @Override 285 public void stop() throws IOException { 286 if (inputStreamPumper != null) { 287 inputStreamPumper.stopProcessing(); 288 } 289 stop(outputThread, stopTimeout); 290 stop(errorThread, stopTimeout); 291 stop(inputThread, stopTimeout); 292 293 if (errorOutputStream != null && errorOutputStream != outputStream) { 294 try { 295 errorOutputStream.flush(); 296 } catch (final IOException e) { 297 final String msg = "Got exception while flushing the error stream : " + e.getMessage(); 298 DebugUtils.handleException(msg, e); 299 } 300 } 301 302 if (outputStream != null) { 303 try { 304 outputStream.flush(); 305 } catch (final IOException e) { 306 final String msg = "Got exception while flushing the output stream"; 307 DebugUtils.handleException(msg, e); 308 } 309 } 310 311 if (caught != null) { 312 throw caught; 313 } 314 } 315 316 /** 317 * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout 318 * was exceeded an IOException is created to be thrown to the caller. 319 * 320 * @param thread the thread to be stopped. 321 * @param timeout the time in ms to wait to join. 322 */ 323 private void stop(final Thread thread, final Duration timeout) { 324 if (thread != null) { 325 try { 326 if (timeout.equals(Duration.ZERO)) { 327 thread.join(); 328 } else { 329 final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION); 330 final Instant startTime = Instant.now(); 331 thread.join(timeToWait.toMillis()); 332 if (Instant.now().isAfter(startTime.plus(timeToWait))) { 333 caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE); 334 } 335 } 336 } catch (final InterruptedException e) { 337 thread.interrupt(); 338 } 339 } 340 } 341 342 /** 343 * Stops a pumper thread. The implementation actually waits longer than specified in 'timeout' to detect if the timeout was indeed exceeded. If the timeout 344 * was exceeded an IOException is created to be thrown to the caller. 345 * 346 * @param thread the thread to be stopped. 347 * @param timeoutMillis the time in ms to wait to join. 348 */ 349 protected void stopThread(final Thread thread, final long timeoutMillis) { 350 stop(thread, Duration.ofMillis(timeoutMillis)); 351 } 352}