001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.FilterInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024 025import org.apache.commons.io.IOUtils; 026import org.apache.commons.io.build.AbstractStreamBuilder; 027import org.apache.commons.io.function.Erase; 028import org.apache.commons.io.function.IOConsumer; 029import org.apache.commons.io.function.IOIntConsumer; 030 031/** 032 * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called. 033 * <p> 034 * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called, 035 * such as read(byte[]) to read(byte[], int, int). 036 * </p> 037 * <p> 038 * In addition, this class allows you to: 039 * </p> 040 * <ul> 041 * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li> 042 * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li> 043 * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li> 044 * <li>{@link #unwrap()} itself</li> 045 * </ul> 046 */ 047public abstract class ProxyInputStream extends FilterInputStream { 048 049 /** 050 * Abstracts builder properties for subclasses. 051 * 052 * @param <T> The InputStream type. 053 * @param <B> The builder type. 054 * @since 2.18.0 055 */ 056 protected static abstract class AbstractBuilder<T, B extends AbstractStreamBuilder<T, B>> extends AbstractStreamBuilder<T, B> { 057 058 private IOIntConsumer afterRead; 059 060 /** 061 * Gets the {@link ProxyInputStream#afterRead(int)} consumer. 062 * 063 * @return the {@link ProxyInputStream#afterRead(int)} consumer. 064 */ 065 public IOIntConsumer getAfterRead() { 066 return afterRead; 067 } 068 069 /** 070 * Sets the {@link ProxyInputStream#afterRead(int)} behavior, null resets to a NOOP. 071 * <p> 072 * Setting this value causes the {@link ProxyInputStream#afterRead(int) afterRead} method to delegate to the given consumer. 073 * </p> 074 * <p> 075 * If a subclass overrides {@link ProxyInputStream#afterRead(int) afterRead} and does not call {@code super.afterRead(int)}, then the given consumer is 076 * not called. 077 * </p> 078 * <p> 079 * This does <em>not</em> override a {@code ProxyInputStream} subclass' implementation of the {@link ProxyInputStream#afterRead(int)} method, it can 080 * supplement it. 081 * </p> 082 * 083 * @param afterRead the {@link ProxyInputStream#afterRead(int)} behavior. 084 * @return this instance. 085 */ 086 public B setAfterRead(final IOIntConsumer afterRead) { 087 this.afterRead = afterRead; 088 return asThis(); 089 } 090 091 } 092 093 /** 094 * Tracks whether {@link #close()} has been called or not. 095 */ 096 private boolean closed; 097 098 /** 099 * Handles exceptions. 100 */ 101 private final IOConsumer<IOException> exceptionHandler; 102 103 private final IOIntConsumer afterRead; 104 105 /** 106 * Constructs a new ProxyInputStream. 107 * 108 * @param builder How to build an instance. 109 * @throws IOException if an I/O error occurs. 110 * @since 2.18.0 111 */ 112 @SuppressWarnings("resource") 113 protected ProxyInputStream(final AbstractBuilder<?, ?> builder) throws IOException { 114 // the delegate is stored in a protected superclass instance variable named 'in'. 115 this(builder.getInputStream(), builder); 116 } 117 118 /** 119 * Constructs a new ProxyInputStream. 120 * 121 * @param proxy the InputStream to proxy. 122 */ 123 public ProxyInputStream(final InputStream proxy) { 124 // the delegate is stored in a protected superclass variable named 'in'. 125 super(proxy); 126 this.exceptionHandler = Erase::rethrow; 127 this.afterRead = IOIntConsumer.NOOP; 128 } 129 130 /** 131 * Constructs a new ProxyInputStream. 132 * 133 * @param proxy the InputStream to proxy. 134 * @param builder How to build an instance. 135 * @since 2.18.0 136 */ 137 protected ProxyInputStream(final InputStream proxy, final AbstractBuilder<?, ?> builder) { 138 // the delegate is stored in a protected superclass instance variable named 'in'. 139 super(proxy); 140 this.exceptionHandler = Erase::rethrow; 141 this.afterRead = builder.getAfterRead() != null ? builder.getAfterRead() : IOIntConsumer.NOOP; 142 } 143 144 /** 145 * Called by the {@code read} methods after the proxied call has returned successfully. The argument is the number of bytes returned to the caller or 146 * {@link IOUtils#EOF EOF} if the end of stream was reached. 147 * <p> 148 * The default delegates to the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}. 149 * </p> 150 * <p> 151 * Alternatively, a subclasses can override this method to add post-processing functionality without having to override all the read methods. 152 * </p> 153 * <p> 154 * Note this method is <em>not</em> called from {@link #skip(long)} or {@link #reset()}. You need to explicitly override those methods if you want to add 155 * post-processing steps also to them. 156 * </p> 157 * 158 * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached. 159 * @throws IOException Thrown by a subclass or the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}. 160 * @since 2.0 161 */ 162 protected void afterRead(final int n) throws IOException { 163 afterRead.accept(n); 164 } 165 166 /** 167 * Invokes the delegate's {@link InputStream#available()} method. 168 * 169 * @return the number of available bytes, 0 if the stream is closed. 170 * @throws IOException if an I/O error occurs. 171 */ 172 @Override 173 public int available() throws IOException { 174 if (in != null && !isClosed()) { 175 try { 176 return in.available(); 177 } catch (final IOException e) { 178 handleIOException(e); 179 } 180 } 181 return 0; 182 } 183 184 /** 185 * Invoked by the {@code read} methods before the call is proxied. The number 186 * of bytes that the caller wanted to read (1 for the {@link #read()} 187 * method, buffer length for {@link #read(byte[])}, etc.) is given as 188 * an argument. 189 * <p> 190 * Subclasses can override this method to add common pre-processing 191 * functionality without having to override all the read methods. 192 * The default implementation does nothing. 193 * </p> 194 * <p> 195 * Note this method is <em>not</em> called from {@link #skip(long)} or 196 * {@link #reset()}. You need to explicitly override those methods if 197 * you want to add pre-processing steps also to them. 198 * </p> 199 * 200 * @since 2.0 201 * @param n number of bytes that the caller asked to be read. 202 * @throws IOException if the pre-processing fails in a subclass. 203 */ 204 @SuppressWarnings("unused") // Possibly thrown from subclasses. 205 protected void beforeRead(final int n) throws IOException { 206 // no-op default 207 } 208 209 /** 210 * Checks if this instance is closed and throws an IOException if so. 211 * 212 * @throws IOException if this instance is closed. 213 */ 214 void checkOpen() throws IOException { 215 Input.checkOpen(!isClosed()); 216 } 217 218 /** 219 * Invokes the delegate's {@link InputStream#close()} method. 220 * 221 * @throws IOException if an I/O error occurs. 222 */ 223 @Override 224 public void close() throws IOException { 225 IOUtils.close(in, this::handleIOException); 226 closed = true; 227 } 228 229 /** 230 * Handles any IOExceptions thrown; by default, throws the given exception. 231 * <p> 232 * This method provides a point to implement custom exception 233 * handling. The default behavior is to re-throw the exception. 234 * </p> 235 * 236 * @param e The IOException thrown. 237 * @throws IOException if an I/O error occurs. 238 * @since 2.0 239 */ 240 protected void handleIOException(final IOException e) throws IOException { 241 exceptionHandler.accept(e); 242 } 243 244 /** 245 * Tests whether this instance is closed. 246 * 247 * @return whether this instance is closed. 248 */ 249 boolean isClosed() { 250 return closed; 251 } 252 253 /** 254 * Invokes the delegate's {@link InputStream#mark(int)} method. 255 * 256 * @param readLimit read ahead limit. 257 */ 258 @Override 259 public synchronized void mark(final int readLimit) { 260 if (in != null) { 261 in.mark(readLimit); 262 } 263 } 264 265 /** 266 * Invokes the delegate's {@link InputStream#markSupported()} method. 267 * 268 * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise. 269 * @see #mark(int) 270 * @see #reset() 271 */ 272 @Override 273 public boolean markSupported() { 274 return in != null && in.markSupported(); 275 } 276 277 /** 278 * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed. 279 * 280 * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream. 281 * @throws IOException if an I/O error occurs. 282 */ 283 @Override 284 public int read() throws IOException { 285 try { 286 beforeRead(1); 287 final int b = in.read(); 288 afterRead(b != EOF ? 1 : EOF); 289 return b; 290 } catch (final IOException e) { 291 handleIOException(e); 292 return EOF; 293 } 294 } 295 296 /** 297 * Invokes the delegate's {@link InputStream#read(byte[])} method. 298 * 299 * @param b the buffer to read the bytes into. 300 * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream. 301 * @throws IOException 302 * <ul> 303 * <li>If the first byte cannot be read for any reason other than the end of the file, 304 * <li>if the input stream has been closed, or</li> 305 * <li>if some other I/O error occurs.</li> 306 * </ul> 307 */ 308 @Override 309 public int read(final byte[] b) throws IOException { 310 try { 311 beforeRead(IOUtils.length(b)); 312 final int n = in.read(b); 313 afterRead(n); 314 return n; 315 } catch (final IOException e) { 316 handleIOException(e); 317 return EOF; 318 } 319 } 320 321 /** 322 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method. 323 * 324 * @param b the buffer to read the bytes into. 325 * @param off The start offset. 326 * @param len The number of bytes to read. 327 * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream. 328 * @throws IOException 329 * <ul> 330 * <li>If the first byte cannot be read for any reason other than the end of the file, 331 * <li>if the input stream has been closed, or</li> 332 * <li>if some other I/O error occurs.</li> 333 * </ul> 334 */ 335 @Override 336 public int read(final byte[] b, final int off, final int len) throws IOException { 337 try { 338 beforeRead(len); 339 final int n = in.read(b, off, len); 340 afterRead(n); 341 return n; 342 } catch (final IOException e) { 343 handleIOException(e); 344 return EOF; 345 } 346 } 347 348 /** 349 * Invokes the delegate's {@link InputStream#reset()} method. 350 * 351 * @throws IOException if this stream has not been marked or if the mark has been invalidated. 352 */ 353 @Override 354 public synchronized void reset() throws IOException { 355 try { 356 in.reset(); 357 } catch (final IOException e) { 358 handleIOException(e); 359 } 360 } 361 362 /** 363 * Package-private for testing. 364 * 365 * @param in The input stream to set in {@link java.io.FilterInputStream#in}. 366 */ 367 void setIn(final InputStream in) { 368 this.in = in; 369 } 370 371 /** 372 * Invokes the delegate's {@link InputStream#skip(long)} method. 373 * 374 * @param n the number of bytes to skip. 375 * @return the actual number of bytes skipped. 376 * @throws IOException if the stream does not support seek, or if some other I/O error occurs. 377 */ 378 @Override 379 public long skip(final long n) throws IOException { 380 try { 381 return in.skip(n); 382 } catch (final IOException e) { 383 handleIOException(e); 384 return 0; 385 } 386 } 387 388 /** 389 * Unwraps this instance by returning the underlying {@link InputStream}. 390 * <p> 391 * Use with caution; useful to query the underlying {@link InputStream}. 392 * </p> 393 * 394 * @return the underlying {@link InputStream}. 395 * @since 2.16.0 396 */ 397 public InputStream unwrap() { 398 return in; 399 } 400 401}