001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.FilterInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024
025import org.apache.commons.io.IOUtils;
026import org.apache.commons.io.build.AbstractStreamBuilder;
027import org.apache.commons.io.function.Erase;
028import org.apache.commons.io.function.IOConsumer;
029import org.apache.commons.io.function.IOIntConsumer;
030
031/**
032 * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called.
033 * <p>
034 * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called,
035 * such as read(byte[]) to read(byte[], int, int).
036 * </p>
037 * <p>
038 * In addition, this class allows you to:
039 * </p>
040 * <ul>
041 * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li>
042 * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li>
043 * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li>
044 * <li>{@link #unwrap()} itself</li>
045 * </ul>
046 */
047public abstract class ProxyInputStream extends FilterInputStream {
048
049    /**
050     * Abstracts builder properties for subclasses.
051     *
052     * @param <T> The InputStream type.
053     * @param <B> The builder type.
054     * @since 2.18.0
055     */
056    protected static abstract class AbstractBuilder<T, B extends AbstractStreamBuilder<T, B>> extends AbstractStreamBuilder<T, B> {
057
058        private IOIntConsumer afterRead;
059
060        /**
061         * Gets the {@link ProxyInputStream#afterRead(int)} consumer.
062         *
063         * @return the {@link ProxyInputStream#afterRead(int)} consumer.
064         */
065        public IOIntConsumer getAfterRead() {
066            return afterRead;
067        }
068
069        /**
070         * Sets the {@link ProxyInputStream#afterRead(int)} behavior, null resets to a NOOP.
071         * <p>
072         * Setting this value causes the {@link ProxyInputStream#afterRead(int) afterRead} method to delegate to the given consumer.
073         * </p>
074         * <p>
075         * If a subclass overrides {@link ProxyInputStream#afterRead(int) afterRead} and does not call {@code super.afterRead(int)}, then the given consumer is
076         * not called.
077         * </p>
078         * <p>
079         * This does <em>not</em> override a {@code ProxyInputStream} subclass' implementation of the {@link ProxyInputStream#afterRead(int)} method, it can
080         * supplement it.
081         * </p>
082         *
083         * @param afterRead the {@link ProxyInputStream#afterRead(int)} behavior.
084         * @return this instance.
085         */
086        public B setAfterRead(final IOIntConsumer afterRead) {
087            this.afterRead = afterRead;
088            return asThis();
089        }
090
091    }
092
093    /**
094     * Tracks whether {@link #close()} has been called or not.
095     */
096    private boolean closed;
097
098    /**
099     * Handles exceptions.
100     */
101    private final IOConsumer<IOException> exceptionHandler;
102
103    private final IOIntConsumer afterRead;
104
105    /**
106     * Constructs a new ProxyInputStream.
107     *
108     * @param builder  How to build an instance.
109     * @throws IOException if an I/O error occurs.
110     * @since 2.18.0
111     */
112    @SuppressWarnings("resource")
113    protected ProxyInputStream(final AbstractBuilder<?, ?> builder) throws IOException {
114        // the delegate is stored in a protected superclass instance variable named 'in'.
115        this(builder.getInputStream(), builder);
116    }
117
118    /**
119     * Constructs a new ProxyInputStream.
120     *
121     * @param proxy  the InputStream to proxy.
122     */
123    public ProxyInputStream(final InputStream proxy) {
124        // the delegate is stored in a protected superclass variable named 'in'.
125        super(proxy);
126        this.exceptionHandler = Erase::rethrow;
127        this.afterRead = IOIntConsumer.NOOP;
128    }
129
130    /**
131     * Constructs a new ProxyInputStream.
132     *
133     * @param proxy  the InputStream to proxy.
134     * @param builder  How to build an instance.
135     * @since 2.18.0
136     */
137    protected ProxyInputStream(final InputStream proxy, final AbstractBuilder<?, ?> builder) {
138        // the delegate is stored in a protected superclass instance variable named 'in'.
139        super(proxy);
140        this.exceptionHandler = Erase::rethrow;
141        this.afterRead = builder.getAfterRead() != null ? builder.getAfterRead() : IOIntConsumer.NOOP;
142    }
143
144    /**
145     * Called by the {@code read} methods after the proxied call has returned successfully. The argument is the number of bytes returned to the caller or
146     * {@link IOUtils#EOF EOF} if the end of stream was reached.
147     * <p>
148     * The default delegates to the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
149     * </p>
150     * <p>
151     * Alternatively, a subclasses can override this method to add post-processing functionality without having to override all the read methods.
152     * </p>
153     * <p>
154     * Note this method is <em>not</em> called from {@link #skip(long)} or {@link #reset()}. You need to explicitly override those methods if you want to add
155     * post-processing steps also to them.
156     * </p>
157     *
158     * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached.
159     * @throws IOException Thrown by a subclass or the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
160     * @since 2.0
161     */
162    protected void afterRead(final int n) throws IOException {
163        afterRead.accept(n);
164    }
165
166    /**
167     * Invokes the delegate's {@link InputStream#available()} method.
168     *
169     * @return the number of available bytes, 0 if the stream is closed.
170     * @throws IOException if an I/O error occurs.
171     */
172    @Override
173    public int available() throws IOException {
174        if (in != null && !isClosed()) {
175            try {
176                return in.available();
177            } catch (final IOException e) {
178                handleIOException(e);
179            }
180        }
181        return 0;
182    }
183
184    /**
185     * Invoked by the {@code read} methods before the call is proxied. The number
186     * of bytes that the caller wanted to read (1 for the {@link #read()}
187     * method, buffer length for {@link #read(byte[])}, etc.) is given as
188     * an argument.
189     * <p>
190     * Subclasses can override this method to add common pre-processing
191     * functionality without having to override all the read methods.
192     * The default implementation does nothing.
193     * </p>
194     * <p>
195     * Note this method is <em>not</em> called from {@link #skip(long)} or
196     * {@link #reset()}. You need to explicitly override those methods if
197     * you want to add pre-processing steps also to them.
198     * </p>
199     *
200     * @since 2.0
201     * @param n number of bytes that the caller asked to be read.
202     * @throws IOException if the pre-processing fails in a subclass.
203     */
204    @SuppressWarnings("unused") // Possibly thrown from subclasses.
205    protected void beforeRead(final int n) throws IOException {
206        // no-op default
207    }
208
209    /**
210     * Checks if this instance is closed and throws an IOException if so.
211     *
212     * @throws IOException if this instance is closed.
213     */
214    void checkOpen() throws IOException {
215        Input.checkOpen(!isClosed());
216    }
217
218    /**
219     * Invokes the delegate's {@link InputStream#close()} method.
220     *
221     * @throws IOException if an I/O error occurs.
222     */
223    @Override
224    public void close() throws IOException {
225        IOUtils.close(in, this::handleIOException);
226        closed = true;
227    }
228
229    /**
230     * Handles any IOExceptions thrown; by default, throws the given exception.
231     * <p>
232     * This method provides a point to implement custom exception
233     * handling. The default behavior is to re-throw the exception.
234     * </p>
235     *
236     * @param e The IOException thrown.
237     * @throws IOException if an I/O error occurs.
238     * @since 2.0
239     */
240    protected void handleIOException(final IOException e) throws IOException {
241        exceptionHandler.accept(e);
242    }
243
244    /**
245     * Tests whether this instance is closed.
246     *
247     * @return whether this instance is closed.
248     */
249    boolean isClosed() {
250        return closed;
251    }
252
253    /**
254     * Invokes the delegate's {@link InputStream#mark(int)} method.
255     *
256     * @param readLimit read ahead limit.
257     */
258    @Override
259    public synchronized void mark(final int readLimit) {
260        if (in != null) {
261            in.mark(readLimit);
262        }
263    }
264
265    /**
266     * Invokes the delegate's {@link InputStream#markSupported()} method.
267     *
268     * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise.
269     * @see #mark(int)
270     * @see #reset()
271     */
272    @Override
273    public boolean markSupported() {
274        return in != null && in.markSupported();
275    }
276
277    /**
278     * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed.
279     *
280     * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream.
281     * @throws IOException if an I/O error occurs.
282     */
283    @Override
284    public int read() throws IOException {
285        try {
286            beforeRead(1);
287            final int b = in.read();
288            afterRead(b != EOF ? 1 : EOF);
289            return b;
290        } catch (final IOException e) {
291            handleIOException(e);
292            return EOF;
293        }
294    }
295
296    /**
297     * Invokes the delegate's {@link InputStream#read(byte[])} method.
298     *
299     * @param b the buffer to read the bytes into.
300     * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
301     * @throws IOException
302     *                     <ul>
303     *                     <li>If the first byte cannot be read for any reason other than the end of the file,
304     *                     <li>if the input stream has been closed, or</li>
305     *                     <li>if some other I/O error occurs.</li>
306     *                     </ul>
307     */
308    @Override
309    public int read(final byte[] b) throws IOException {
310        try {
311            beforeRead(IOUtils.length(b));
312            final int n = in.read(b);
313            afterRead(n);
314            return n;
315        } catch (final IOException e) {
316            handleIOException(e);
317            return EOF;
318        }
319    }
320
321    /**
322     * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
323     *
324     * @param b   the buffer to read the bytes into.
325     * @param off The start offset.
326     * @param len The number of bytes to read.
327     * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
328     * @throws IOException
329     *                     <ul>
330     *                     <li>If the first byte cannot be read for any reason other than the end of the file,
331     *                     <li>if the input stream has been closed, or</li>
332     *                     <li>if some other I/O error occurs.</li>
333     *                     </ul>
334     */
335    @Override
336    public int read(final byte[] b, final int off, final int len) throws IOException {
337        try {
338            beforeRead(len);
339            final int n = in.read(b, off, len);
340            afterRead(n);
341            return n;
342        } catch (final IOException e) {
343            handleIOException(e);
344            return EOF;
345        }
346    }
347
348    /**
349     * Invokes the delegate's {@link InputStream#reset()} method.
350     *
351     * @throws IOException if this stream has not been marked or if the mark has been invalidated.
352     */
353    @Override
354    public synchronized void reset() throws IOException {
355        try {
356            in.reset();
357        } catch (final IOException e) {
358            handleIOException(e);
359        }
360    }
361
362    /**
363     * Package-private for testing.
364     *
365     * @param in The input stream to set in {@link java.io.FilterInputStream#in}.
366     */
367    void setIn(final InputStream in) {
368        this.in = in;
369    }
370
371    /**
372     * Invokes the delegate's {@link InputStream#skip(long)} method.
373     *
374     * @param n the number of bytes to skip.
375     * @return the actual number of bytes skipped.
376     * @throws IOException if the stream does not support seek, or if some other I/O error occurs.
377     */
378    @Override
379    public long skip(final long n) throws IOException {
380        try {
381            return in.skip(n);
382        } catch (final IOException e) {
383            handleIOException(e);
384            return 0;
385        }
386    }
387
388    /**
389     * Unwraps this instance by returning the underlying {@link InputStream}.
390     * <p>
391     * Use with caution; useful to query the underlying {@link InputStream}.
392     * </p>
393     *
394     * @return the underlying {@link InputStream}.
395     * @since 2.16.0
396     */
397    public InputStream unwrap() {
398        return in;
399    }
400
401}