001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * http://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package org.apache.commons.io.input; 015 016import static org.apache.commons.io.IOUtils.EOF; 017 018// import javax.annotation.concurrent.GuardedBy; 019import java.io.EOFException; 020import java.io.FilterInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InterruptedIOException; 024import java.nio.ByteBuffer; 025import java.util.Objects; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.locks.Condition; 031import java.util.concurrent.locks.ReentrantLock; 032 033import org.apache.commons.io.build.AbstractStreamBuilder; 034 035/** 036 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current 037 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a 038 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we 039 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O. 040 * <p> 041 * To build an instance, use {@link Builder}. 042 * </p> 043 * <p> 044 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19. 045 * </p> 046 * 047 * @see Builder 048 * @since 2.9.0 049 */ 050public class ReadAheadInputStream extends FilterInputStream { 051 052 // @formatter:off 053 /** 054 * Builds a new {@link ReadAheadInputStream}. 055 * 056 * <p> 057 * For example: 058 * </p> 059 * <pre>{@code 060 * ReadAheadInputStream s = ReadAheadInputStream.builder() 061 * .setPath(path) 062 * .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread)) 063 * .get();} 064 * </pre> 065 * 066 * @see #get() 067 * @since 2.12.0 068 */ 069 // @formatter:on 070 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> { 071 072 private ExecutorService executorService; 073 074 /** 075 * Builds a new {@link ReadAheadInputStream}. 076 * <p> 077 * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception. 078 * </p> 079 * <p> 080 * This builder use the following aspects: 081 * </p> 082 * <ul> 083 * <li>{@link #getInputStream()}</li> 084 * <li>{@link #getBufferSize()}</li> 085 * <li>{@link ExecutorService}</li> 086 * </ul> 087 * 088 * @return a new instance. 089 * @throws IllegalStateException if the {@code origin} is {@code null}. 090 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 091 * @throws IOException if an I/O error occurs. 092 * @see #getInputStream() 093 * @see #getBufferSize() 094 */ 095 @SuppressWarnings("resource") 096 @Override 097 public ReadAheadInputStream get() throws IOException { 098 return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(), 099 executorService == null); 100 } 101 102 /** 103 * Sets the executor service for the read-ahead thread. 104 * 105 * @param executorService the executor service for the read-ahead thread. 106 * @return {@code this} instance. 107 */ 108 public Builder setExecutorService(final ExecutorService executorService) { 109 this.executorService = executorService; 110 return this; 111 } 112 113 } 114 115 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]); 116 117 /** 118 * Constructs a new {@link Builder}. 119 * 120 * @return a new {@link Builder}. 121 * @since 2.12.0 122 */ 123 public static Builder builder() { 124 return new Builder(); 125 } 126 127 /** 128 * Constructs a new daemon thread. 129 * 130 * @param r the thread's runnable. 131 * @return a new daemon thread. 132 */ 133 private static Thread newDaemonThread(final Runnable r) { 134 final Thread thread = new Thread(r, "commons-io-read-ahead"); 135 thread.setDaemon(true); 136 return thread; 137 } 138 139 /** 140 * Constructs a new daemon executor service. 141 * 142 * @return a new daemon executor service. 143 */ 144 private static ExecutorService newExecutorService() { 145 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread); 146 } 147 148 private final ReentrantLock stateChangeLock = new ReentrantLock(); 149 150 // @GuardedBy("stateChangeLock") 151 private ByteBuffer activeBuffer; 152 153 // @GuardedBy("stateChangeLock") 154 private ByteBuffer readAheadBuffer; 155 156 // @GuardedBy("stateChangeLock") 157 private boolean endOfStream; 158 159 // @GuardedBy("stateChangeLock") 160 // true if async read is in progress 161 private boolean readInProgress; 162 163 // @GuardedBy("stateChangeLock") 164 // true if read is aborted due to an exception in reading from underlying input stream. 165 private boolean readAborted; 166 167 // @GuardedBy("stateChangeLock") 168 private Throwable readException; 169 170 // @GuardedBy("stateChangeLock") 171 // whether the close method is called. 172 private boolean isClosed; 173 174 // @GuardedBy("stateChangeLock") 175 // true when the close method will close the underlying input stream. This is valid only if 176 // `isClosed` is true. 177 private boolean isUnderlyingInputStreamBeingClosed; 178 179 // @GuardedBy("stateChangeLock") 180 // whether there is a read ahead task running, 181 private boolean isReading; 182 183 // Whether there is a reader waiting for data. 184 private final AtomicBoolean isWaiting = new AtomicBoolean(); 185 186 private final ExecutorService executorService; 187 188 private final boolean shutdownExecutorService; 189 190 private final Condition asyncReadComplete = stateChangeLock.newCondition(); 191 192 /** 193 * Constructs an instance with the specified buffer size and read-ahead threshold 194 * 195 * @param inputStream The underlying input stream. 196 * @param bufferSizeInBytes The buffer size. 197 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 198 */ 199 @Deprecated 200 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) { 201 this(inputStream, bufferSizeInBytes, newExecutorService(), true); 202 } 203 204 /** 205 * Constructs an instance with the specified buffer size and read-ahead threshold 206 * 207 * @param inputStream The underlying input stream. 208 * @param bufferSizeInBytes The buffer size. 209 * @param executorService An executor service for the read-ahead thread. 210 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 211 */ 212 @Deprecated 213 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) { 214 this(inputStream, bufferSizeInBytes, executorService, false); 215 } 216 217 /** 218 * Constructs an instance with the specified buffer size and read-ahead threshold 219 * 220 * @param inputStream The underlying input stream. 221 * @param bufferSizeInBytes The buffer size. 222 * @param executorService An executor service for the read-ahead thread. 223 * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close. 224 */ 225 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService, 226 final boolean shutdownExecutorService) { 227 super(Objects.requireNonNull(inputStream, "inputStream")); 228 if (bufferSizeInBytes <= 0) { 229 throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); 230 } 231 this.executorService = Objects.requireNonNull(executorService, "executorService"); 232 this.shutdownExecutorService = shutdownExecutorService; 233 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); 234 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); 235 this.activeBuffer.flip(); 236 this.readAheadBuffer.flip(); 237 } 238 239 @Override 240 public int available() throws IOException { 241 stateChangeLock.lock(); 242 // Make sure we have no integer overflow. 243 try { 244 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining()); 245 } finally { 246 stateChangeLock.unlock(); 247 } 248 } 249 250 private void checkReadException() throws IOException { 251 if (readAborted) { 252 if (readException instanceof IOException) { 253 throw (IOException) readException; 254 } 255 throw new IOException(readException); 256 } 257 } 258 259 @Override 260 public void close() throws IOException { 261 boolean isSafeToCloseUnderlyingInputStream = false; 262 stateChangeLock.lock(); 263 try { 264 if (isClosed) { 265 return; 266 } 267 isClosed = true; 268 if (!isReading) { 269 // Nobody is reading, so we can close the underlying input stream in this method. 270 isSafeToCloseUnderlyingInputStream = true; 271 // Flip this to make sure the read ahead task will not close the underlying input stream. 272 isUnderlyingInputStreamBeingClosed = true; 273 } 274 } finally { 275 stateChangeLock.unlock(); 276 } 277 278 if (shutdownExecutorService) { 279 try { 280 executorService.shutdownNow(); 281 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 282 } catch (final InterruptedException e) { 283 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 284 iio.initCause(e); 285 throw iio; 286 } finally { 287 if (isSafeToCloseUnderlyingInputStream) { 288 super.close(); 289 } 290 } 291 } 292 } 293 294 private void closeUnderlyingInputStreamIfNecessary() { 295 boolean needToCloseUnderlyingInputStream = false; 296 stateChangeLock.lock(); 297 try { 298 isReading = false; 299 if (isClosed && !isUnderlyingInputStreamBeingClosed) { 300 // close method cannot close underlyingInputStream because we were reading. 301 needToCloseUnderlyingInputStream = true; 302 } 303 } finally { 304 stateChangeLock.unlock(); 305 } 306 if (needToCloseUnderlyingInputStream) { 307 try { 308 super.close(); 309 } catch (final IOException ignored) { 310 // TODO Rethrow as UncheckedIOException? 311 } 312 } 313 } 314 315 private boolean isEndOfStream() { 316 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream; 317 } 318 319 @Override 320 public int read() throws IOException { 321 if (activeBuffer.hasRemaining()) { 322 // short path - just get one byte. 323 return activeBuffer.get() & 0xFF; 324 } 325 final byte[] oneByteArray = BYTE_ARRAY_1.get(); 326 oneByteArray[0] = 0; 327 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF; 328 } 329 330 @Override 331 public int read(final byte[] b, final int offset, int len) throws IOException { 332 if (offset < 0 || len < 0 || len > b.length - offset) { 333 throw new IndexOutOfBoundsException(); 334 } 335 if (len == 0) { 336 return 0; 337 } 338 339 if (!activeBuffer.hasRemaining()) { 340 // No remaining in active buffer - lock and switch to write ahead buffer. 341 stateChangeLock.lock(); 342 try { 343 waitForAsyncReadComplete(); 344 if (!readAheadBuffer.hasRemaining()) { 345 // The first read. 346 readAsync(); 347 waitForAsyncReadComplete(); 348 if (isEndOfStream()) { 349 return EOF; 350 } 351 } 352 // Swap the newly read ahead buffer in place of empty active buffer. 353 swapBuffers(); 354 // After swapping buffers, trigger another async read for read ahead buffer. 355 readAsync(); 356 } finally { 357 stateChangeLock.unlock(); 358 } 359 } 360 len = Math.min(len, activeBuffer.remaining()); 361 activeBuffer.get(b, offset, len); 362 363 return len; 364 } 365 366 /** 367 * Read data from underlyingInputStream to readAheadBuffer asynchronously. 368 * 369 * @throws IOException if an I/O error occurs. 370 */ 371 private void readAsync() throws IOException { 372 stateChangeLock.lock(); 373 final byte[] arr; 374 try { 375 arr = readAheadBuffer.array(); 376 if (endOfStream || readInProgress) { 377 return; 378 } 379 checkReadException(); 380 readAheadBuffer.position(0); 381 readAheadBuffer.flip(); 382 readInProgress = true; 383 } finally { 384 stateChangeLock.unlock(); 385 } 386 executorService.execute(() -> { 387 stateChangeLock.lock(); 388 try { 389 if (isClosed) { 390 readInProgress = false; 391 return; 392 } 393 // Flip this so that the close method will not close the underlying input stream when we 394 // are reading. 395 isReading = true; 396 } finally { 397 stateChangeLock.unlock(); 398 } 399 400 // Please note that it is safe to release the lock and read into the read ahead buffer 401 // because either of following two conditions will hold: 402 // 403 // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer. 404 // 405 // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits 406 // for this async read to complete. 407 // 408 // So there is no race condition in both the situations. 409 int read = 0; 410 int off = 0, len = arr.length; 411 Throwable exception = null; 412 try { 413 // try to fill the read ahead buffer. 414 // if a reader is waiting, possibly return early. 415 do { 416 read = in.read(arr, off, len); 417 if (read <= 0) { 418 break; 419 } 420 off += read; 421 len -= read; 422 } while (len > 0 && !isWaiting.get()); 423 } catch (final Throwable ex) { 424 exception = ex; 425 if (ex instanceof Error) { 426 // `readException` may not be reported to the user. Rethrow Error to make sure at least 427 // The user can see Error in UncaughtExceptionHandler. 428 throw (Error) ex; 429 } 430 } finally { 431 stateChangeLock.lock(); 432 try { 433 readAheadBuffer.limit(off); 434 if (read < 0 || exception instanceof EOFException) { 435 endOfStream = true; 436 } else if (exception != null) { 437 readAborted = true; 438 readException = exception; 439 } 440 readInProgress = false; 441 signalAsyncReadComplete(); 442 } finally { 443 stateChangeLock.unlock(); 444 } 445 closeUnderlyingInputStreamIfNecessary(); 446 } 447 }); 448 } 449 450 private void signalAsyncReadComplete() { 451 stateChangeLock.lock(); 452 try { 453 asyncReadComplete.signalAll(); 454 } finally { 455 stateChangeLock.unlock(); 456 } 457 } 458 459 @Override 460 public long skip(final long n) throws IOException { 461 if (n <= 0L) { 462 return 0L; 463 } 464 if (n <= activeBuffer.remaining()) { 465 // Only skipping from active buffer is sufficient 466 activeBuffer.position((int) n + activeBuffer.position()); 467 return n; 468 } 469 stateChangeLock.lock(); 470 final long skipped; 471 try { 472 skipped = skipInternal(n); 473 } finally { 474 stateChangeLock.unlock(); 475 } 476 return skipped; 477 } 478 479 /** 480 * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before 481 * calling this function. 482 * 483 * @param n the number of bytes to be skipped. 484 * @return the actual number of bytes skipped. 485 * @throws IOException if an I/O error occurs. 486 */ 487 private long skipInternal(final long n) throws IOException { 488 if (!stateChangeLock.isLocked()) { 489 throw new IllegalStateException("Expected stateChangeLock to be locked"); 490 } 491 waitForAsyncReadComplete(); 492 if (isEndOfStream()) { 493 return 0; 494 } 495 if (available() >= n) { 496 // we can skip from the internal buffers 497 int toSkip = (int) n; 498 // We need to skip from both active buffer and read ahead buffer 499 toSkip -= activeBuffer.remaining(); 500 if (toSkip <= 0) { // skipping from activeBuffer already handled. 501 throw new IllegalStateException("Expected toSkip > 0, actual: " + toSkip); 502 } 503 activeBuffer.position(0); 504 activeBuffer.flip(); 505 readAheadBuffer.position(toSkip + readAheadBuffer.position()); 506 swapBuffers(); 507 // Trigger async read to emptied read ahead buffer. 508 readAsync(); 509 return n; 510 } 511 final int skippedBytes = available(); 512 final long toSkip = n - skippedBytes; 513 activeBuffer.position(0); 514 activeBuffer.flip(); 515 readAheadBuffer.position(0); 516 readAheadBuffer.flip(); 517 final long skippedFromInputStream = in.skip(toSkip); 518 readAsync(); 519 return skippedBytes + skippedFromInputStream; 520 } 521 522 /** 523 * Flips the active and read ahead buffer 524 */ 525 private void swapBuffers() { 526 final ByteBuffer temp = activeBuffer; 527 activeBuffer = readAheadBuffer; 528 readAheadBuffer = temp; 529 } 530 531 private void waitForAsyncReadComplete() throws IOException { 532 stateChangeLock.lock(); 533 try { 534 isWaiting.set(true); 535 // There is only one reader, and one writer, so the writer should signal only once, 536 // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups. 537 while (readInProgress) { 538 asyncReadComplete.await(); 539 } 540 } catch (final InterruptedException e) { 541 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 542 iio.initCause(e); 543 throw iio; 544 } finally { 545 try { 546 isWaiting.set(false); 547 } finally { 548 stateChangeLock.unlock(); 549 } 550 } 551 checkReadException(); 552 } 553}