001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.FilterInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024
025import org.apache.commons.io.IOUtils;
026import org.apache.commons.io.function.Erase;
027import org.apache.commons.io.function.IOConsumer;
028
029/**
030 * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called.
031 * <p>
032 * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called,
033 * such as read(byte[]) to read(byte[], int, int).
034 * </p>
035 * <p>
036 * In addition, this class allows you to:
037 * </p>
038 * <ul>
039 * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li>
040 * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li>
041 * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li>
042 * <li>{@link #unwrap()} itself</li>
043 * </ul>
044 */
045public abstract class ProxyInputStream extends FilterInputStream {
046
047    /**
048     * Tracks whether {@link #close()} has been called or not.
049     */
050    private boolean closed;
051
052    /**
053     * Handles exceptions.
054     */
055    private final IOConsumer<IOException> exceptionHandler;
056
057    /**
058     * Constructs a new ProxyInputStream.
059     *
060     * @param proxy  the InputStream to proxy.
061     */
062    public ProxyInputStream(final InputStream proxy) {
063        // the proxy is stored in a protected superclass variable named 'in'.
064        this(proxy, Erase::rethrow);
065    }
066
067    /**
068     * Constructs a new ProxyInputStream for testing.
069     *
070     * @param proxy  the InputStream to proxy.
071     * @param exceptionHandler the exception handler.
072     */
073    ProxyInputStream(final InputStream proxy, final IOConsumer<IOException> exceptionHandler) {
074        // the proxy is stored in a protected superclass instance variable named 'in'.
075        super(proxy);
076        this.exceptionHandler = exceptionHandler;
077    }
078
079    /**
080     * Invoked by the {@code read} methods after the proxied call has returned
081     * successfully. The number of bytes returned to the caller (or {@link IOUtils#EOF EOF} if
082     * the end of stream was reached) is given as an argument.
083     * <p>
084     * Subclasses can override this method to add common post-processing
085     * functionality without having to override all the read methods.
086     * The default implementation does nothing.
087     * </p>
088     * <p>
089     * Note this method is <em>not</em> called from {@link #skip(long)} or
090     * {@link #reset()}. You need to explicitly override those methods if
091     * you want to add post-processing steps also to them.
092     * </p>
093     *
094     * @since 2.0
095     * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached.
096     * @throws IOException if the post-processing fails in a subclass.
097     */
098    @SuppressWarnings("unused") // Possibly thrown from subclasses.
099    protected void afterRead(final int n) throws IOException {
100        // no-op default
101    }
102
103    /**
104     * Invokes the delegate's {@link InputStream#available()} method.
105     *
106     * @return the number of available bytes, 0 if the stream is closed.
107     * @throws IOException if an I/O error occurs.
108     */
109    @Override
110    public int available() throws IOException {
111        if (in != null && !isClosed()) {
112            try {
113                return in.available();
114            } catch (final IOException e) {
115                handleIOException(e);
116            }
117        }
118        return 0;
119    }
120
121    /**
122     * Invoked by the {@code read} methods before the call is proxied. The number
123     * of bytes that the caller wanted to read (1 for the {@link #read()}
124     * method, buffer length for {@link #read(byte[])}, etc.) is given as
125     * an argument.
126     * <p>
127     * Subclasses can override this method to add common pre-processing
128     * functionality without having to override all the read methods.
129     * The default implementation does nothing.
130     * </p>
131     * <p>
132     * Note this method is <em>not</em> called from {@link #skip(long)} or
133     * {@link #reset()}. You need to explicitly override those methods if
134     * you want to add pre-processing steps also to them.
135     * </p>
136     *
137     * @since 2.0
138     * @param n number of bytes that the caller asked to be read.
139     * @throws IOException if the pre-processing fails in a subclass.
140     */
141    @SuppressWarnings("unused") // Possibly thrown from subclasses.
142    protected void beforeRead(final int n) throws IOException {
143        // no-op default
144    }
145
146    /**
147     * Checks if this instance is closed and throws an IOException if so.
148     *
149     * @throws IOException if this instance is closed.
150     */
151    void checkOpen() throws IOException {
152        Input.checkOpen(!isClosed());
153    }
154
155    /**
156     * Invokes the delegate's {@link InputStream#close()} method.
157     *
158     * @throws IOException if an I/O error occurs.
159     */
160    @Override
161    public void close() throws IOException {
162        IOUtils.close(in, this::handleIOException);
163        closed = true;
164    }
165
166    /**
167     * Handles any IOExceptions thrown; by default, throws the given exception.
168     * <p>
169     * This method provides a point to implement custom exception
170     * handling. The default behavior is to re-throw the exception.
171     * </p>
172     *
173     * @param e The IOException thrown.
174     * @throws IOException if an I/O error occurs.
175     * @since 2.0
176     */
177    protected void handleIOException(final IOException e) throws IOException {
178        exceptionHandler.accept(e);
179    }
180
181    /**
182     * Tests whether this instance is closed.
183     *
184     * @return whether this instance is closed.
185     */
186    boolean isClosed() {
187        return closed;
188    }
189
190    /**
191     * Invokes the delegate's {@link InputStream#mark(int)} method.
192     *
193     * @param readLimit read ahead limit.
194     */
195    @Override
196    public synchronized void mark(final int readLimit) {
197        if (in != null) {
198            in.mark(readLimit);
199        }
200    }
201
202    /**
203     * Invokes the delegate's {@link InputStream#markSupported()} method.
204     *
205     * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise.
206     * @see #mark(int)
207     * @see #reset()
208     */
209    @Override
210    public boolean markSupported() {
211        return in != null && in.markSupported();
212    }
213
214    /**
215     * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed.
216     *
217     * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream.
218     * @throws IOException if an I/O error occurs.
219     */
220    @Override
221    public int read() throws IOException {
222        try {
223            beforeRead(1);
224            final int b = in.read();
225            afterRead(b != EOF ? 1 : EOF);
226            return b;
227        } catch (final IOException e) {
228            handleIOException(e);
229            return EOF;
230        }
231    }
232
233    /**
234     * Invokes the delegate's {@link InputStream#read(byte[])} method.
235     *
236     * @param b the buffer to read the bytes into.
237     * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
238     * @throws IOException
239     *                     <ul>
240     *                     <li>If the first byte cannot be read for any reason other than the end of the file,
241     *                     <li>if the input stream has been closed, or</li>
242     *                     <li>if some other I/O error occurs.</li>
243     *                     </ul>
244     */
245    @Override
246    public int read(final byte[] b) throws IOException {
247        try {
248            beforeRead(IOUtils.length(b));
249            final int n = in.read(b);
250            afterRead(n);
251            return n;
252        } catch (final IOException e) {
253            handleIOException(e);
254            return EOF;
255        }
256    }
257
258    /**
259     * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method.
260     *
261     * @param b   the buffer to read the bytes into.
262     * @param off The start offset.
263     * @param len The number of bytes to read.
264     * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream.
265     * @throws IOException
266     *                     <ul>
267     *                     <li>If the first byte cannot be read for any reason other than the end of the file,
268     *                     <li>if the input stream has been closed, or</li>
269     *                     <li>if some other I/O error occurs.</li>
270     *                     </ul>
271     */
272    @Override
273    public int read(final byte[] b, final int off, final int len) throws IOException {
274        try {
275            beforeRead(len);
276            final int n = in.read(b, off, len);
277            afterRead(n);
278            return n;
279        } catch (final IOException e) {
280            handleIOException(e);
281            return EOF;
282        }
283    }
284
285    /**
286     * Invokes the delegate's {@link InputStream#reset()} method.
287     *
288     * @throws IOException if this stream has not been marked or if the mark has been invalidated.
289     */
290    @Override
291    public synchronized void reset() throws IOException {
292        try {
293            in.reset();
294        } catch (final IOException e) {
295            handleIOException(e);
296        }
297    }
298
299    /**
300     * Package-private for testing.
301     *
302     * @param in The input stream to set.
303     */
304    void setIn(final InputStream in) {
305        this.in = in;
306    }
307
308    /**
309     * Invokes the delegate's {@link InputStream#skip(long)} method.
310     *
311     * @param n the number of bytes to skip.
312     * @return the actual number of bytes skipped.
313     * @throws IOException if the stream does not support seek, or if some other I/O error occurs.
314     */
315    @Override
316    public long skip(final long n) throws IOException {
317        try {
318            return in.skip(n);
319        } catch (final IOException e) {
320            handleIOException(e);
321            return 0;
322        }
323    }
324
325    /**
326     * Unwraps this instance by returning the underlying {@link InputStream}.
327     * <p>
328     * Use with caution; useful to query the underlying {@link InputStream}.
329     * </p>
330     *
331     * @return the underlying {@link InputStream}.
332     * @since 2.16.0
333     */
334    public InputStream unwrap() {
335        return in;
336    }
337
338}