BoundedInputStream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.io.input;
import static org.apache.commons.io.IOUtils.EOF;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.function.IOBiConsumer;
//@formatter:off
/**
* Reads bytes up to a maximum count and stops once reached.
* <p>
* To build an instance, see {@link AbstractBuilder}.
* </p>
* <p>
* By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
* </p>
* <p>
* You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
* </p>
* <h2>Using a ServletInputStream</h2>
* <p>
* A {@code ServletInputStream} can block if you try to read content that isn't there
* because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
* {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
* length in the first place.
* </p>
* <h2>Using NIO</h2>
* <pre>{@code
* BoundedInputStream s = BoundedInputStream.builder()
* .setPath(Paths.get("MyFile.xml"))
* .setMaxCount(1024)
* .setPropagateClose(false)
* .get();
* }
* </pre>
* <h2>Using IO</h2>
* <pre>{@code
* BoundedInputStream s = BoundedInputStream.builder()
* .setFile(new File("MyFile.xml"))
* .setMaxCount(1024)
* .setPropagateClose(false)
* .get();
* }
* </pre>
* <h2>Counting Bytes</h2>
* <p>You can set the running count when building, which is most useful when starting from another stream:
* <pre>{@code
* InputStream in = ...;
* BoundedInputStream s = BoundedInputStream.builder()
* .setInputStream(in)
* .setCount(12)
* .setMaxCount(1024)
* .setPropagateClose(false)
* .get();
* }
* </pre>
* <h2>Listening for the max count reached</h2>
* <pre>{@code
* BoundedInputStream s = BoundedInputStream.builder()
* .setPath(Paths.get("MyFile.xml"))
* .setMaxCount(1024)
* .setOnMaxCount((max, count) -> System.out.printf("Max count %,d reached with a last read count of %,d%n", max, count))
* .get();
* }
* </pre>
* @see Builder
* @since 2.0
*/
//@formatter:on
public class BoundedInputStream extends ProxyInputStream {
/**
* For subclassing builders from {@link BoundedInputStream} subclassses.
*
* @param <T> The subclass.
*/
static abstract class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> {
/** The current count of bytes counted. */
private long count;
/** The max count of bytes to read. */
private long maxCount = EOF;
private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop();
/** Flag if {@link #close()} should be propagated, {@code true} by default. */
private boolean propagateClose = true;
long getCount() {
return count;
}
long getMaxCount() {
return maxCount;
}
IOBiConsumer<Long, Long> getOnMaxCount() {
return onMaxCount;
}
boolean isPropagateClose() {
return propagateClose;
}
/**
* Sets the current number of bytes counted.
* <p>
* Useful when building from another stream to carry forward a read count.
* </p>
* <p>
* Default is {@code 0}, negative means 0.
* </p>
*
* @param count The current number of bytes counted.
* @return {@code this} instance.
*/
public T setCount(final long count) {
this.count = Math.max(0, count);
return asThis();
}
/**
* Sets the maximum number of bytes to return.
* <p>
* Default is {@value IOUtils#EOF}, negative means unbound.
* </p>
*
* @param maxCount The maximum number of bytes to return.
* @return {@code this} instance.
*/
public T setMaxCount(final long maxCount) {
this.maxCount = Math.max(EOF, maxCount);
return asThis();
}
/**
* Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP.
* <p>
* The first Long is the max count of bytes to read. The second Long is the count of bytes read.
* </p>
* <p>
* This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)}
* method.
* </p>
*
* @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior.
* @return this instance.
* @since 2.18.0
*/
public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) {
this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop();
return asThis();
}
/**
* Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
* <p>
* Default is {@code true}.
* </p>
*
* @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
* it does not.
* @return {@code this} instance.
*/
public T setPropagateClose(final boolean propagateClose) {
this.propagateClose = propagateClose;
return asThis();
}
}
//@formatter:off
/**
* Builds a new {@link BoundedInputStream}.
* <p>
* By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
* </p>
* <p>
* You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
* </p>
* <h2>Using a ServletInputStream</h2>
* <p>
* A {@code ServletInputStream} can block if you try to read content that isn't there
* because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
* {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
* length in the first place.
* </p>
* <h2>Using NIO</h2>
* <pre>{@code
* BoundedInputStream s = BoundedInputStream.builder()
* .setPath(Paths.get("MyFile.xml"))
* .setMaxCount(1024)
* .setPropagateClose(false)
* .get();
* }
* </pre>
* <h2>Using IO</h2>
* <pre>{@code
* BoundedInputStream s = BoundedInputStream.builder()
* .setFile(new File("MyFile.xml"))
* .setMaxCount(1024)
* .setPropagateClose(false)
* .get();
* }
* </pre>
* <h2>Counting Bytes</h2>
* <p>You can set the running count when building, which is most useful when starting from another stream:
* <pre>{@code
* InputStream in = ...;
* BoundedInputStream s = BoundedInputStream.builder()
* .setInputStream(in)
* .setCount(12)
* .setMaxCount(1024)
* .setPropagateClose(false)
* .get();
* }
* </pre>
*
* @see #get()
* @since 2.16.0
*/
//@formatter:on
public static class Builder extends AbstractBuilder<Builder> {
/**
* Builds a new {@link BoundedInputStream}.
* <p>
* You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
* </p>
* <p>
* This builder use the following aspects:
* </p>
* <ul>
* <li>{@link #getInputStream()}</li>
* <li>{@link #getAfterRead()}</li>
* <li>{@link #getCount()}</li>
* <li>{@link #getMaxCount()}</li>
* <li>{@link #isPropagateClose()}</li>
* <li>{@link #getOnMaxCount()}</li>
* </ul>
*
* @return a new instance.
* @throws IllegalStateException if the {@code origin} is {@code null}.
* @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
* @throws IOException if an I/O error occurs.
* @see #getInputStream()
*/
@Override
public BoundedInputStream get() throws IOException {
return new BoundedInputStream(this);
}
}
/**
* Constructs a new {@link AbstractBuilder}.
*
* @return a new {@link AbstractBuilder}.
* @since 2.16.0
*/
public static Builder builder() {
return new Builder();
}
/** The current count of bytes counted. */
private long count;
/** The current mark. */
private long mark;
/** The max count of bytes to read. */
private final long maxCount;
private final IOBiConsumer<Long, Long> onMaxCount;
/**
* Flag if close should be propagated.
*
* TODO Make final in 3.0.
*/
private boolean propagateClose = true;
BoundedInputStream(final Builder builder) throws IOException {
super(builder);
this.count = builder.getCount();
this.maxCount = builder.getMaxCount();
this.propagateClose = builder.isPropagateClose();
this.onMaxCount = builder.getOnMaxCount();
}
/**
* Constructs a new {@link BoundedInputStream} that wraps the given input stream and is unlimited.
*
* @param in The wrapped input stream.
* @deprecated Use {@link AbstractBuilder#get()}.
*/
@Deprecated
public BoundedInputStream(final InputStream in) {
this(in, EOF);
}
BoundedInputStream(final InputStream inputStream, final Builder builder) {
super(inputStream, builder);
this.count = builder.getCount();
this.maxCount = builder.getMaxCount();
this.propagateClose = builder.isPropagateClose();
this.onMaxCount = builder.getOnMaxCount();
}
/**
* Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
*
* @param inputStream The wrapped input stream.
* @param maxCount The maximum number of bytes to return.
* @deprecated Use {@link AbstractBuilder#get()}.
*/
@Deprecated
public BoundedInputStream(final InputStream inputStream, final long maxCount) {
// Some badly designed methods - e.g. the Servlet API - overload length
// such that "-1" means stream finished
this(inputStream, builder().setMaxCount(maxCount));
}
/**
* Adds the number of read bytes to the count.
*
* @param n number of bytes read, or -1 if no more bytes are available
* @throws IOException Not thrown here but subclasses may throw.
* @since 2.0
*/
@Override
protected synchronized void afterRead(final int n) throws IOException {
if (n != EOF) {
count += n;
}
super.afterRead(n);
}
/**
* {@inheritDoc}
*/
@Override
public int available() throws IOException {
if (isMaxCount()) {
onMaxLength(maxCount, getCount());
return 0;
}
return in.available();
}
/**
* Invokes the delegate's {@link InputStream#close()} method if {@link #isPropagateClose()} is {@code true}.
*
* @throws IOException if an I/O error occurs.
*/
@Override
public void close() throws IOException {
if (propagateClose) {
super.close();
}
}
/**
* Gets the count of bytes read.
*
* @return The count of bytes read.
* @since 2.12.0
*/
public synchronized long getCount() {
return count;
}
/**
* Gets the max count of bytes to read.
*
* @return The max count of bytes to read.
* @since 2.16.0
*/
public long getMaxCount() {
return maxCount;
}
/**
* Gets the max count of bytes to read.
*
* @return The max count of bytes to read.
* @since 2.12.0
* @deprecated Use {@link #getMaxCount()}.
*/
@Deprecated
public long getMaxLength() {
return maxCount;
}
/**
* Gets how many bytes remain to read.
*
* @return bytes how many bytes remain to read.
* @since 2.16.0
*/
public long getRemaining() {
return Math.max(0, getMaxCount() - getCount());
}
private boolean isMaxCount() {
return maxCount >= 0 && getCount() >= maxCount;
}
/**
* Tests whether the {@link #close()} method should propagate to the underling {@link InputStream}.
*
* @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it does not.
*/
public boolean isPropagateClose() {
return propagateClose;
}
/**
* Invokes the delegate's {@link InputStream#mark(int)} method.
*
* @param readLimit read ahead limit
*/
@Override
public synchronized void mark(final int readLimit) {
in.mark(readLimit);
mark = count;
}
/**
* Invokes the delegate's {@link InputStream#markSupported()} method.
*
* @return true if mark is supported, otherwise false
*/
@Override
public boolean markSupported() {
return in.markSupported();
}
/**
* A caller has caused a request that would cross the {@code maxLength} boundary.
* <p>
* Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}.
* </p>
*
* @param max The max count of bytes to read.
* @param count The count of bytes read.
* @throws IOException Subclasses may throw.
* @since 2.12.0
*/
@SuppressWarnings("unused")
// TODO Rename to onMaxCount for 3.0
protected void onMaxLength(final long max, final long count) throws IOException {
onMaxCount.accept(max, count);
}
/**
* Invokes the delegate's {@link InputStream#read()} method if the current position is less than the limit.
*
* @return the byte read or -1 if the end of stream or the limit has been reached.
* @throws IOException if an I/O error occurs.
*/
@Override
public int read() throws IOException {
if (isMaxCount()) {
onMaxLength(maxCount, getCount());
return EOF;
}
return super.read();
}
/**
* Invokes the delegate's {@link InputStream#read(byte[])} method.
*
* @param b the buffer to read the bytes into
* @return the number of bytes read or -1 if the end of stream or the limit has been reached.
* @throws IOException if an I/O error occurs.
*/
@Override
public int read(final byte[] b) throws IOException {
return read(b, 0, b.length);
}
/**
* Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
*
* @param b the buffer to read the bytes into
* @param off The start offset
* @param len The number of bytes to read
* @return the number of bytes read or -1 if the end of stream or the limit has been reached.
* @throws IOException if an I/O error occurs.
*/
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
if (isMaxCount()) {
onMaxLength(maxCount, getCount());
return EOF;
}
return super.read(b, off, (int) toReadLen(len));
}
/**
* Invokes the delegate's {@link InputStream#reset()} method.
*
* @throws IOException if an I/O error occurs.
*/
@Override
public synchronized void reset() throws IOException {
in.reset();
count = mark;
}
/**
* Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
*
* @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
* does not.
* @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}.
*/
@Deprecated
public void setPropagateClose(final boolean propagateClose) {
this.propagateClose = propagateClose;
}
/**
* Invokes the delegate's {@link InputStream#skip(long)} method.
*
* @param n the number of bytes to skip
* @return the actual number of bytes skipped
* @throws IOException if an I/O error occurs.
*/
@Override
public synchronized long skip(final long n) throws IOException {
final long skip = super.skip(toReadLen(n));
count += skip;
return skip;
}
private long toReadLen(final long len) {
return maxCount >= 0 ? Math.min(len, maxCount - getCount()) : len;
}
/**
* Invokes the delegate's {@link InputStream#toString()} method.
*
* @return the delegate's {@link InputStream#toString()}
*/
@Override
public String toString() {
return in.toString();
}
}