001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023 024import org.apache.commons.io.IOUtils; 025import org.apache.commons.io.function.IOBiConsumer; 026 027//@formatter:off 028/** 029 * Reads bytes up to a maximum count and stops once reached. 030 * <p> 031 * To build an instance, see {@link AbstractBuilder}. 032 * </p> 033 * <p> 034 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}. 035 * </p> 036 * <p> 037 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped. 038 * </p> 039 * <h2>Using a ServletInputStream</h2> 040 * <p> 041 * A {@code ServletInputStream} can block if you try to read content that isn't there 042 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the 043 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content 044 * length in the first place. 045 * </p> 046 * <h2>Using NIO</h2> 047 * <pre>{@code 048 * BoundedInputStream s = BoundedInputStream.builder() 049 * .setPath(Paths.get("MyFile.xml")) 050 * .setMaxCount(1024) 051 * .setPropagateClose(false) 052 * .get(); 053 * } 054 * </pre> 055 * <h2>Using IO</h2> 056 * <pre>{@code 057 * BoundedInputStream s = BoundedInputStream.builder() 058 * .setFile(new File("MyFile.xml")) 059 * .setMaxCount(1024) 060 * .setPropagateClose(false) 061 * .get(); 062 * } 063 * </pre> 064 * <h2>Counting Bytes</h2> 065 * <p>You can set the running count when building, which is most useful when starting from another stream: 066 * <pre>{@code 067 * InputStream in = ...; 068 * BoundedInputStream s = BoundedInputStream.builder() 069 * .setInputStream(in) 070 * .setCount(12) 071 * .setMaxCount(1024) 072 * .setPropagateClose(false) 073 * .get(); 074 * } 075 * </pre> 076 * <h2>Listening for the max count reached</h2> 077 * <pre>{@code 078 * BoundedInputStream s = BoundedInputStream.builder() 079 * .setPath(Paths.get("MyFile.xml")) 080 * .setMaxCount(1024) 081 * .setOnMaxCount((max, count) -> System.out.printf("Max count %,d reached with a last read count of %,d%n", max, count)) 082 * .get(); 083 * } 084 * </pre> 085 * @see Builder 086 * @since 2.0 087 */ 088//@formatter:on 089public class BoundedInputStream extends ProxyInputStream { 090 091 /** 092 * For subclassing builders from {@link BoundedInputStream} subclassses. 093 * 094 * @param <T> The subclass. 095 */ 096 static abstract class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> { 097 098 /** The current count of bytes counted. */ 099 private long count; 100 101 /** The max count of bytes to read. */ 102 private long maxCount = EOF; 103 104 private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop(); 105 106 /** Flag if {@link #close()} should be propagated, {@code true} by default. */ 107 private boolean propagateClose = true; 108 109 long getCount() { 110 return count; 111 } 112 113 long getMaxCount() { 114 return maxCount; 115 } 116 117 IOBiConsumer<Long, Long> getOnMaxCount() { 118 return onMaxCount; 119 } 120 121 boolean isPropagateClose() { 122 return propagateClose; 123 } 124 125 /** 126 * Sets the current number of bytes counted. 127 * <p> 128 * Useful when building from another stream to carry forward a read count. 129 * </p> 130 * <p> 131 * Default is {@code 0}, negative means 0. 132 * </p> 133 * 134 * @param count The current number of bytes counted. 135 * @return {@code this} instance. 136 */ 137 public T setCount(final long count) { 138 this.count = Math.max(0, count); 139 return asThis(); 140 } 141 142 /** 143 * Sets the maximum number of bytes to return. 144 * <p> 145 * Default is {@value IOUtils#EOF}, negative means unbound. 146 * </p> 147 * 148 * @param maxCount The maximum number of bytes to return. 149 * @return {@code this} instance. 150 */ 151 public T setMaxCount(final long maxCount) { 152 this.maxCount = Math.max(EOF, maxCount); 153 return asThis(); 154 } 155 156 /** 157 * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP. 158 * <p> 159 * The first Long is the max count of bytes to read. The second Long is the count of bytes read. 160 * </p> 161 * <p> 162 * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)} 163 * method. 164 * </p> 165 * 166 * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior. 167 * @return this instance. 168 * @since 2.18.0 169 */ 170 public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) { 171 this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop(); 172 return asThis(); 173 } 174 175 /** 176 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}. 177 * <p> 178 * Default is {@code true}. 179 * </p> 180 * 181 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if 182 * it does not. 183 * @return {@code this} instance. 184 */ 185 public T setPropagateClose(final boolean propagateClose) { 186 this.propagateClose = propagateClose; 187 return asThis(); 188 } 189 190 } 191 192 //@formatter:off 193 /** 194 * Builds a new {@link BoundedInputStream}. 195 * <p> 196 * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}. 197 * </p> 198 * <p> 199 * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped. 200 * </p> 201 * <h2>Using a ServletInputStream</h2> 202 * <p> 203 * A {@code ServletInputStream} can block if you try to read content that isn't there 204 * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the 205 * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content 206 * length in the first place. 207 * </p> 208 * <h2>Using NIO</h2> 209 * <pre>{@code 210 * BoundedInputStream s = BoundedInputStream.builder() 211 * .setPath(Paths.get("MyFile.xml")) 212 * .setMaxCount(1024) 213 * .setPropagateClose(false) 214 * .get(); 215 * } 216 * </pre> 217 * <h2>Using IO</h2> 218 * <pre>{@code 219 * BoundedInputStream s = BoundedInputStream.builder() 220 * .setFile(new File("MyFile.xml")) 221 * .setMaxCount(1024) 222 * .setPropagateClose(false) 223 * .get(); 224 * } 225 * </pre> 226 * <h2>Counting Bytes</h2> 227 * <p>You can set the running count when building, which is most useful when starting from another stream: 228 * <pre>{@code 229 * InputStream in = ...; 230 * BoundedInputStream s = BoundedInputStream.builder() 231 * .setInputStream(in) 232 * .setCount(12) 233 * .setMaxCount(1024) 234 * .setPropagateClose(false) 235 * .get(); 236 * } 237 * </pre> 238 * 239 * @see #get() 240 * @since 2.16.0 241 */ 242 //@formatter:on 243 public static class Builder extends AbstractBuilder<Builder> { 244 245 /** 246 * Builds a new {@link BoundedInputStream}. 247 * <p> 248 * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception. 249 * </p> 250 * <p> 251 * This builder use the following aspects: 252 * </p> 253 * <ul> 254 * <li>{@link #getInputStream()}</li> 255 * <li>{@link #getAfterRead()}</li> 256 * <li>{@link #getCount()}</li> 257 * <li>{@link #getMaxCount()}</li> 258 * <li>{@link #isPropagateClose()}</li> 259 * <li>{@link #getOnMaxCount()}</li> 260 * </ul> 261 * 262 * @return a new instance. 263 * @throws IllegalStateException if the {@code origin} is {@code null}. 264 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 265 * @throws IOException if an I/O error occurs. 266 * @see #getInputStream() 267 */ 268 @Override 269 public BoundedInputStream get() throws IOException { 270 return new BoundedInputStream(this); 271 } 272 273 } 274 275 /** 276 * Constructs a new {@link AbstractBuilder}. 277 * 278 * @return a new {@link AbstractBuilder}. 279 * @since 2.16.0 280 */ 281 public static Builder builder() { 282 return new Builder(); 283 } 284 285 /** The current count of bytes counted. */ 286 private long count; 287 288 /** The current mark. */ 289 private long mark; 290 291 /** The max count of bytes to read. */ 292 private final long maxCount; 293 294 private final IOBiConsumer<Long, Long> onMaxCount; 295 296 /** 297 * Flag if close should be propagated. 298 * 299 * TODO Make final in 3.0. 300 */ 301 private boolean propagateClose = true; 302 303 BoundedInputStream(final Builder builder) throws IOException { 304 super(builder); 305 this.count = builder.getCount(); 306 this.maxCount = builder.getMaxCount(); 307 this.propagateClose = builder.isPropagateClose(); 308 this.onMaxCount = builder.getOnMaxCount(); 309 } 310 311 /** 312 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is unlimited. 313 * 314 * @param in The wrapped input stream. 315 * @deprecated Use {@link AbstractBuilder#get()}. 316 */ 317 @Deprecated 318 public BoundedInputStream(final InputStream in) { 319 this(in, EOF); 320 } 321 322 BoundedInputStream(final InputStream inputStream, final Builder builder) { 323 super(inputStream, builder); 324 this.count = builder.getCount(); 325 this.maxCount = builder.getMaxCount(); 326 this.propagateClose = builder.isPropagateClose(); 327 this.onMaxCount = builder.getOnMaxCount(); 328 } 329 330 /** 331 * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size. 332 * 333 * @param inputStream The wrapped input stream. 334 * @param maxCount The maximum number of bytes to return. 335 * @deprecated Use {@link AbstractBuilder#get()}. 336 */ 337 @Deprecated 338 public BoundedInputStream(final InputStream inputStream, final long maxCount) { 339 // Some badly designed methods - e.g. the Servlet API - overload length 340 // such that "-1" means stream finished 341 this(inputStream, builder().setMaxCount(maxCount)); 342 } 343 344 /** 345 * Adds the number of read bytes to the count. 346 * 347 * @param n number of bytes read, or -1 if no more bytes are available 348 * @throws IOException Not thrown here but subclasses may throw. 349 * @since 2.0 350 */ 351 @Override 352 protected synchronized void afterRead(final int n) throws IOException { 353 if (n != EOF) { 354 count += n; 355 } 356 super.afterRead(n); 357 } 358 359 /** 360 * {@inheritDoc} 361 */ 362 @Override 363 public int available() throws IOException { 364 if (isMaxCount()) { 365 onMaxLength(maxCount, getCount()); 366 return 0; 367 } 368 return in.available(); 369 } 370 371 /** 372 * Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}. 373 * 374 * @throws IOException if an I/O error occurs. 375 */ 376 @Override 377 public void close() throws IOException { 378 if (propagateClose) { 379 super.close(); 380 } 381 } 382 383 /** 384 * Gets the count of bytes read. 385 * 386 * @return The count of bytes read. 387 * @since 2.12.0 388 */ 389 public synchronized long getCount() { 390 return count; 391 } 392 393 /** 394 * Gets the max count of bytes to read. 395 * 396 * @return The max count of bytes to read. 397 * @since 2.16.0 398 */ 399 public long getMaxCount() { 400 return maxCount; 401 } 402 403 /** 404 * Gets the max count of bytes to read. 405 * 406 * @return The max count of bytes to read. 407 * @since 2.12.0 408 * @deprecated Use {@link #getMaxCount()}. 409 */ 410 @Deprecated 411 public long getMaxLength() { 412 return maxCount; 413 } 414 415 /** 416 * Gets how many bytes remain to read. 417 * 418 * @return bytes how many bytes remain to read. 419 * @since 2.16.0 420 */ 421 public long getRemaining() { 422 return Math.max(0, getMaxCount() - getCount()); 423 } 424 425 private boolean isMaxCount() { 426 return maxCount >= 0 && getCount() >= maxCount; 427 } 428 429 /** 430 * Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}. 431 * 432 * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not. 433 */ 434 public boolean isPropagateClose() { 435 return propagateClose; 436 } 437 438 /** 439 * Invokes the delegate's {@link InputStream#mark(int)} method. 440 * 441 * @param readLimit read ahead limit 442 */ 443 @Override 444 public synchronized void mark(final int readLimit) { 445 in.mark(readLimit); 446 mark = count; 447 } 448 449 /** 450 * Invokes the delegate's {@link InputStream#markSupported()} method. 451 * 452 * @return true if mark is supported, otherwise false 453 */ 454 @Override 455 public boolean markSupported() { 456 return in.markSupported(); 457 } 458 459 /** 460 * A caller has caused a request that would cross the {@code maxLength} boundary. 461 * <p> 462 * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}. 463 * </p> 464 * 465 * @param max The max count of bytes to read. 466 * @param count The count of bytes read. 467 * @throws IOException Subclasses may throw. 468 * @since 2.12.0 469 */ 470 @SuppressWarnings("unused") 471 // TODO Rename to onMaxCount for 3.0 472 protected void onMaxLength(final long max, final long count) throws IOException { 473 onMaxCount.accept(max, count); 474 } 475 476 /** 477 * Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit. 478 * 479 * @return the byte read or -1 if the end of stream or the limit has been reached. 480 * @throws IOException if an I/O error occurs. 481 */ 482 @Override 483 public int read() throws IOException { 484 if (isMaxCount()) { 485 onMaxLength(maxCount, getCount()); 486 return EOF; 487 } 488 return super.read(); 489 } 490 491 /** 492 * Invokes the delegate's {@link InputStream#read(byte[])} method. 493 * 494 * @param b the buffer to read the bytes into 495 * @return the number of bytes read or -1 if the end of stream or the limit has been reached. 496 * @throws IOException if an I/O error occurs. 497 */ 498 @Override 499 public int read(final byte[] b) throws IOException { 500 return read(b, 0, b.length); 501 } 502 503 /** 504 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method. 505 * 506 * @param b the buffer to read the bytes into 507 * @param off The start offset 508 * @param len The number of bytes to read 509 * @return the number of bytes read or -1 if the end of stream or the limit has been reached. 510 * @throws IOException if an I/O error occurs. 511 */ 512 @Override 513 public int read(final byte[] b, final int off, final int len) throws IOException { 514 if (isMaxCount()) { 515 onMaxLength(maxCount, getCount()); 516 return EOF; 517 } 518 return super.read(b, off, (int) toReadLen(len)); 519 } 520 521 /** 522 * Invokes the delegate's {@link InputStream#reset()} method. 523 * 524 * @throws IOException if an I/O error occurs. 525 */ 526 @Override 527 public synchronized void reset() throws IOException { 528 in.reset(); 529 count = mark; 530 } 531 532 /** 533 * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}. 534 * 535 * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it 536 * does not. 537 * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}. 538 */ 539 @Deprecated 540 public void setPropagateClose(final boolean propagateClose) { 541 this.propagateClose = propagateClose; 542 } 543 544 /** 545 * Invokes the delegate's {@link InputStream#skip(long)} method. 546 * 547 * @param n the number of bytes to skip 548 * @return the actual number of bytes skipped 549 * @throws IOException if an I/O error occurs. 550 */ 551 @Override 552 public synchronized long skip(final long n) throws IOException { 553 final long skip = super.skip(toReadLen(n)); 554 count += skip; 555 return skip; 556 } 557 558 private long toReadLen(final long len) { 559 return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len; 560 } 561 562 /** 563 * Invokes the delegate's {@link InputStream#toString()} method. 564 * 565 * @return the delegate's {@link InputStream#toString()} 566 */ 567 @Override 568 public String toString() { 569 return in.toString(); 570 } 571}