1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.exec;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.OutputStream;
23 import java.io.PipedOutputStream;
24 import java.time.Duration;
25 import java.time.Instant;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ThreadFactory;
28
29 import org.apache.commons.exec.util.DebugUtils;
30
31
32
33
34
35 public class PumpStreamHandler implements ExecuteStreamHandler {
36
37 private static final Duration STOP_TIMEOUT_ADDITION = Duration.ofSeconds(2);
38
39 private Thread outputThread;
40
41 private Thread errorThread;
42
43 private Thread inputThread;
44
45 private final OutputStream outputStream;
46
47 private final OutputStream errorOutputStream;
48
49 private final InputStream inputStream;
50
51 private InputStreamPumper inputStreamPumper;
52
53
54 private Duration stopTimeout = Duration.ZERO;
55
56
57 private IOException caught;
58
59
60
61
62 private final ThreadFactory threadFactory;
63
64
65
66
67 public PumpStreamHandler() {
68 this(System.out, System.err);
69 }
70
71
72
73
74
75
76 public PumpStreamHandler(final OutputStream allOutputStream) {
77 this(allOutputStream, allOutputStream);
78 }
79
80
81
82
83
84
85
86 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream) {
87 this(outputStream, errorOutputStream, null);
88 }
89
90
91
92
93
94
95
96
97 public PumpStreamHandler(final OutputStream outputStream, final OutputStream errorOutputStream, final InputStream inputStream) {
98 this(Executors.defaultThreadFactory(), outputStream, errorOutputStream, inputStream);
99 }
100
101
102
103
104
105
106
107
108 private PumpStreamHandler(final ThreadFactory threadFactory, final OutputStream outputStream, final OutputStream errorOutputStream,
109 final InputStream inputStream) {
110 this.threadFactory = threadFactory;
111 this.outputStream = outputStream;
112 this.errorOutputStream = errorOutputStream;
113 this.inputStream = inputStream;
114 }
115
116
117
118
119
120
121
122 protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
123 errorThread = createPump(is, os);
124 }
125
126
127
128
129
130
131
132 protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
133 outputThread = createPump(is, os);
134 }
135
136
137
138
139
140
141
142
143
144 protected Thread createPump(final InputStream is, final OutputStream os) {
145 return createPump(is, os, os instanceof PipedOutputStream);
146 }
147
148
149
150
151
152
153
154
155
156 protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
157 return ThreadUtil.newThread(threadFactory, new StreamPumper(is, os, closeWhenExhausted), "CommonsExecStreamPumper-", true);
158 }
159
160
161
162
163
164
165
166
167 private Thread createSystemInPump(final InputStream is, final OutputStream os) {
168 inputStreamPumper = new InputStreamPumper(is, os);
169 return ThreadUtil.newThread(threadFactory, inputStreamPumper, "CommonsExecStreamPumper-", true);
170 }
171
172
173
174
175
176
177 protected OutputStream getErr() {
178 return errorOutputStream;
179 }
180
181
182
183
184
185
186 protected OutputStream getOut() {
187 return outputStream;
188 }
189
190 Duration getStopTimeout() {
191 return stopTimeout;
192 }
193
194
195
196
197
198
199 @Override
200 public void setProcessErrorStream(final InputStream is) {
201 if (errorOutputStream != null) {
202 createProcessErrorPump(is, errorOutputStream);
203 }
204 }
205
206
207
208
209
210
211 @Override
212 public void setProcessInputStream(final OutputStream os) {
213 if (inputStream != null) {
214 if (inputStream == System.in) {
215 inputThread = createSystemInPump(inputStream, os);
216 } else {
217 inputThread = createPump(inputStream, os, true);
218 }
219 } else {
220 try {
221 os.close();
222 } catch (final IOException e) {
223 final String msg = "Got exception while closing output stream";
224 DebugUtils.handleException(msg, e);
225 }
226 }
227 }
228
229
230
231
232
233
234 @Override
235 public void setProcessOutputStream(final InputStream is) {
236 if (outputStream != null) {
237 createProcessOutputPump(is, outputStream);
238 }
239 }
240
241
242
243
244
245
246
247 public void setStopTimeout(final Duration timeout) {
248 this.stopTimeout = timeout != null ? timeout : Duration.ZERO;
249 }
250
251
252
253
254
255
256
257 @Deprecated
258 public void setStopTimeout(final long timeout) {
259 this.stopTimeout = Duration.ofMillis(timeout);
260 }
261
262
263
264
265 @Override
266 public void start() {
267 start(outputThread);
268 start(errorThread);
269 start(inputThread);
270 }
271
272
273
274
275 private void start(final Thread thread) {
276 if (thread != null) {
277 thread.start();
278 }
279 }
280
281
282
283
284 @Override
285 public void stop() throws IOException {
286 if (inputStreamPumper != null) {
287 inputStreamPumper.stopProcessing();
288 }
289 stop(outputThread, stopTimeout);
290 stop(errorThread, stopTimeout);
291 stop(inputThread, stopTimeout);
292
293 if (errorOutputStream != null && errorOutputStream != outputStream) {
294 try {
295 errorOutputStream.flush();
296 } catch (final IOException e) {
297 final String msg = "Got exception while flushing the error stream : " + e.getMessage();
298 DebugUtils.handleException(msg, e);
299 }
300 }
301
302 if (outputStream != null) {
303 try {
304 outputStream.flush();
305 } catch (final IOException e) {
306 final String msg = "Got exception while flushing the output stream";
307 DebugUtils.handleException(msg, e);
308 }
309 }
310
311 if (caught != null) {
312 throw caught;
313 }
314 }
315
316
317
318
319
320
321
322
323 private void stop(final Thread thread, final Duration timeout) {
324 if (thread != null) {
325 try {
326 if (timeout.equals(Duration.ZERO)) {
327 thread.join();
328 } else {
329 final Duration timeToWait = timeout.plus(STOP_TIMEOUT_ADDITION);
330 final Instant startTime = Instant.now();
331 thread.join(timeToWait.toMillis());
332 if (Instant.now().isAfter(startTime.plus(timeToWait))) {
333 caught = new ExecuteException("The stop timeout of " + timeout + " ms was exceeded", Executor.INVALID_EXITVALUE);
334 }
335 }
336 } catch (final InterruptedException e) {
337 thread.interrupt();
338 }
339 }
340 }
341
342
343
344
345
346
347
348
349 protected void stopThread(final Thread thread, final long timeoutMillis) {
350 stop(thread, Duration.ofMillis(timeoutMillis));
351 }
352 }