001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.FilterInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024
025import org.apache.commons.io.IOUtils;
026import org.apache.commons.io.build.AbstractStreamBuilder;
027import org.apache.commons.io.function.Erase;
028import org.apache.commons.io.function.IOConsumer;
029import org.apache.commons.io.function.IOIntConsumer;
030
031/**
032 * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called.
033 * <p>
034 * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called,
035 * such as read(byte[]) to read(byte[], int, int).
036 * </p>
037 * <p>
038 * In addition, this class allows you to:
039 * </p>
040 * <ul>
041 * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li>
042 * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li>
043 * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li>
044 * <li>{@link #unwrap()} itself</li>
045 * </ul>
046 */
047public abstract class ProxyInputStream extends FilterInputStream {
048
049    /**
050     * Abstracts builder properties for subclasses.
051     *
052     * @param <T> The InputStream type.
053     * @param <B> The builder type.
054     * @since 2.18.0
055     */
056    protected abstract static class AbstractBuilder<T, B extends AbstractStreamBuilder<T, B>> extends AbstractStreamBuilder<T, B> {
057
058        private IOIntConsumer afterRead;
059
060        /**
061         * Constructs a builder of {@code T}.
062         */
063        protected AbstractBuilder() {
064            // empty
065        }
066
067        /**
068         * Gets the {@link ProxyInputStream#afterRead(int)} consumer.
069         *
070         * @return the {@link ProxyInputStream#afterRead(int)} consumer.
071         */
072        public IOIntConsumer getAfterRead() {
073            return afterRead;
074        }
075
076        /**
077         * Sets the {@link ProxyInputStream#afterRead(int)} behavior, null resets to a NOOP.
078         * <p>
079         * Setting this value causes the {@link ProxyInputStream#afterRead(int) afterRead} method to delegate to the given consumer.
080         * </p>
081         * <p>
082         * If a subclass overrides {@link ProxyInputStream#afterRead(int) afterRead} and does not call {@code super.afterRead(int)}, then the given consumer is
083         * not called.
084         * </p>
085         * <p>
086         * This does <em>not</em> override a {@code ProxyInputStream} subclass' implementation of the {@link ProxyInputStream#afterRead(int)} method, it can
087         * supplement it.
088         * </p>
089         *
090         * @param afterRead the {@link ProxyInputStream#afterRead(int)} behavior.
091         * @return this instance.
092         */
093        public B setAfterRead(final IOIntConsumer afterRead) {
094            this.afterRead = afterRead;
095            return asThis();
096        }
097
098    }
099
100    /**
101     * Tracks whether {@link #close()} has been called or not.
102     */
103    private boolean closed;
104
105    /**
106     * Handles exceptions.
107     */
108    private final IOConsumer<IOException> exceptionHandler;
109
110    private final IOIntConsumer afterRead;
111
112    /**
113     * Constructs a new ProxyInputStream.
114     *
115     * @param builder  How to build an instance.
116     * @throws IOException if an I/O error occurs.
117     * @since 2.18.0
118     */
119    @SuppressWarnings("resource")
120    protected ProxyInputStream(final AbstractBuilder<?, ?> builder) throws IOException {
121        // the delegate is stored in a protected superclass instance variable named 'in'.
122        this(builder.getInputStream(), builder);
123    }
124
125    /**
126     * Constructs a new ProxyInputStream.
127     *
128     * @param proxy  the InputStream to proxy.
129     */
130    public ProxyInputStream(final InputStream proxy) {
131        // the delegate is stored in a protected superclass variable named 'in'.
132        super(proxy);
133        this.exceptionHandler = Erase::rethrow;
134        this.afterRead = IOIntConsumer.NOOP;
135    }
136
137    /**
138     * Constructs a new ProxyInputStream.
139     *
140     * @param proxy  the InputStream to proxy.
141     * @param builder  How to build an instance.
142     * @since 2.18.0
143     */
144    protected ProxyInputStream(final InputStream proxy, final AbstractBuilder<?, ?> builder) {
145        // the delegate is stored in a protected superclass instance variable named 'in'.
146        super(proxy);
147        this.exceptionHandler = Erase::rethrow;
148        this.afterRead = builder.getAfterRead() != null ? builder.getAfterRead() : IOIntConsumer.NOOP;
149    }
150
151    /**
152     * Called by the {@code read} methods after the proxied call has returned successfully. The argument is the number of bytes returned to the caller or
153     * {@link IOUtils#EOF EOF} if the end of stream was reached.
154     * <p>
155     * The default delegates to the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
156     * </p>
157     * <p>
158     * Alternatively, a subclasses can override this method to add post-processing functionality without having to override all the read methods.
159     * </p>
160     * <p>
161     * Note this method is <em>not</em> called from {@link #skip(long)} or {@link #reset()}. You need to explicitly override those methods if you want to add
162     * post-processing steps also to them.
163     * </p>
164     *
165     * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached.
166     * @throws IOException Thrown by a subclass or the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}.
167     * @since 2.0
168     */
169    protected void afterRead(final int n) throws IOException {
170        afterRead.accept(n);
171    }
172
173    /**
174     * Invokes the delegate's {@link InputStream#available()} method.
175     *
176     * @return the number of available bytes, 0 if the stream is closed.
177     * @throws IOException if an I/O error occurs.
178     */
179    @Override
180    public int available() throws IOException {
181        if (in != null && !isClosed()) {
182            try {
183                return in.available();
184            } catch (final IOException e) {
185                handleIOException(e);
186            }
187        }
188        return 0;
189    }
190
191    /**
192     * Invoked by the {@code read} methods before the call is proxied. The number
193     * of bytes that the caller wanted to read (1 for the {@link #read()}
194     * method, buffer length for {@link #read(byte[])}, etc.) is given as
195     * an argument.
196     * <p>
197     * Subclasses can override this method to add common pre-processing
198     * functionality without having to override all the read methods.
199     * The default implementation does nothing.
200     * </p>
201     * <p>
202     * Note this method is <em>not</em> called from {@link #skip(long)} or
203     * {@link #reset()}. You need to explicitly override those methods if
204     * you want to add pre-processing steps also to them.
205     * </p>
206     *
207     * @param n number of bytes that the caller asked to be read.
208     * @throws IOException if the pre-processing fails in a subclass.
209     * @since 2.0
210     */
211    @SuppressWarnings("unused") // Possibly thrown from subclasses.
212    protected void beforeRead(final int n) throws IOException {
213        // no-op default
214    }
215
216    /**
217     * Checks if this instance is closed and throws an IOException if so.
218     *
219     * @throws IOException if this instance is closed.
220     */
221    void checkOpen() throws IOException {
222        Input.checkOpen(!isClosed());
223    }
224
225    /**
226     * Invokes the delegate's {@link InputStream#close()} method.
227     *
228     * @throws IOException if an I/O error occurs.
229     */
230    @Override
231    public void close() throws IOException {
232        IOUtils.close(in, this::handleIOException);
233        closed = true;
234    }
235
236    /**
237     * Handles any IOExceptions thrown; by default, throws the given exception.
238     * <p>
239     * This method provides a point to implement custom exception
240     * handling. The default behavior is to re-throw the exception.
241     * </p>
242     *
243     * @param e The IOException thrown.
244     * @throws IOException if an I/O error occurs.
245     * @since 2.0
246     */
247    protected void handleIOException(final IOException e) throws IOException {
248        exceptionHandler.accept(e);
249    }
250
251    /**
252     * Tests whether this instance is closed.
253     *
254     * @return whether this instance is closed.
255     */
256    boolean isClosed() {
257        return closed;
258    }
259
260    /**
261     * Invokes the delegate's {@link InputStream#mark(int)} method.
262     *
263     * @param readLimit read ahead limit.
264     */
265    @Override
266    public synchronized void mark(final int readLimit) {
267        if (in != null) {
268            in.mark(readLimit);
269        }
270    }
271
272    /**
273     * Invokes the delegate's {@link InputStream#markSupported()} method.
274     *
275     * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise.
276     * @see #mark(int)
277     * @see #reset()
278     */
279    @Override
280    public boolean markSupported() {
281        return in != null && in.markSupported();
282    }
283
284    /**
285     * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed.
286     *
287     * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream.
288     * @throws IOException if an I/O error occurs.
289     */
290    @Override
291    public int read() throws IOException {
292        try {
293            beforeRead(1);
294            final int b = in.read();
295            afterRead(b != EOF ? 1 : EOF);
296            return b;
297        } catch (final IOException e) {
298            handleIOException(e);
299            return EOF;
300        }
301    }
302
303    /**
304     * Invokes the delegate's {@link InputStream#read(byte[])} method.
305     *
306     * @param b the buffer to read the bytes into.
307     * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
308     * @throws IOException
309     *                     <ul>
310     *                     <li>If the first byte cannot be read for any reason other than the end of the file,
311     *                     <li>if the input stream has been closed, or</li>
312     *                     <li>if some other I/O error occurs.</li>
313     *                     </ul>
314     */
315    @Override
316    public int read(final byte[] b) throws IOException {
317        try {
318            beforeRead(IOUtils.length(b));
319            final int n = in.read(b);
320            afterRead(n);
321            return n;
322        } catch (final IOException e) {
323            handleIOException(e);
324            return EOF;
325        }
326    }
327
328    /**
329     * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
330     *
331     * @param b   the buffer to read the bytes into.
332     * @param off The start offset.
333     * @param len The number of bytes to read.
334     * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
335     * @throws IOException
336     *                     <ul>
337     *                     <li>If the first byte cannot be read for any reason other than the end of the file,
338     *                     <li>if the input stream has been closed, or</li>
339     *                     <li>if some other I/O error occurs.</li>
340     *                     </ul>
341     */
342    @Override
343    public int read(final byte[] b, final int off, final int len) throws IOException {
344        try {
345            beforeRead(len);
346            final int n = in.read(b, off, len);
347            afterRead(n);
348            return n;
349        } catch (final IOException e) {
350            handleIOException(e);
351            return EOF;
352        }
353    }
354
355    /**
356     * Invokes the delegate's {@link InputStream#reset()} method.
357     *
358     * @throws IOException if this stream has not been marked or if the mark has been invalidated.
359     */
360    @Override
361    public synchronized void reset() throws IOException {
362        try {
363            in.reset();
364        } catch (final IOException e) {
365            handleIOException(e);
366        }
367    }
368
369    /**
370     * Sets the underlying input stream.
371     *
372     * @param in The input stream to set in {@link java.io.FilterInputStream#in}.
373     * @return this instance.
374     * @since 2.19.0
375     */
376    public ProxyInputStream setReference(final InputStream in) {
377        this.in = in;
378        return this;
379    }
380
381    /**
382     * Invokes the delegate's {@link InputStream#skip(long)} method.
383     *
384     * @param n the number of bytes to skip.
385     * @return the actual number of bytes skipped.
386     * @throws IOException if the stream does not support seek, or if some other I/O error occurs.
387     */
388    @Override
389    public long skip(final long n) throws IOException {
390        try {
391            return in.skip(n);
392        } catch (final IOException e) {
393            handleIOException(e);
394            return 0;
395        }
396    }
397
398    /**
399     * Unwraps this instance by returning the underlying {@link InputStream}.
400     * <p>
401     * Use with caution; useful to query the underlying {@link InputStream}.
402     * </p>
403     *
404     * @return the underlying {@link InputStream}.
405     * @since 2.16.0
406     */
407    public InputStream unwrap() {
408        return in;
409    }
410
411}