001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.FilterInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024 025import org.apache.commons.io.IOUtils; 026import org.apache.commons.io.build.AbstractStreamBuilder; 027import org.apache.commons.io.function.Erase; 028import org.apache.commons.io.function.IOConsumer; 029import org.apache.commons.io.function.IOIntConsumer; 030 031/** 032 * A proxy stream which acts as a {@link FilterInputStream}, by passing all method calls on to the proxied stream, not changing which methods are called. 033 * <p> 034 * It is an alternative base class to {@link FilterInputStream} to increase reusability, because {@link FilterInputStream} changes the methods being called, 035 * such as read(byte[]) to read(byte[], int, int). 036 * </p> 037 * <p> 038 * In addition, this class allows you to: 039 * </p> 040 * <ul> 041 * <li>notify a subclass that <em>n</em> bytes are about to be read through {@link #beforeRead(int)}</li> 042 * <li>notify a subclass that <em>n</em> bytes were read through {@link #afterRead(int)}</li> 043 * <li>notify a subclass that an exception was caught through {@link #handleIOException(IOException)}</li> 044 * <li>{@link #unwrap()} itself</li> 045 * </ul> 046 */ 047public abstract class ProxyInputStream extends FilterInputStream { 048 049 /** 050 * Abstracts builder properties for subclasses. 051 * 052 * @param <T> The InputStream type. 053 * @param <B> The builder type. 054 * @since 2.18.0 055 */ 056 protected abstract static class AbstractBuilder<T, B extends AbstractStreamBuilder<T, B>> extends AbstractStreamBuilder<T, B> { 057 058 private IOIntConsumer afterRead; 059 060 /** 061 * Constructs a builder of {@code T}. 062 */ 063 protected AbstractBuilder() { 064 // empty 065 } 066 067 /** 068 * Gets the {@link ProxyInputStream#afterRead(int)} consumer. 069 * 070 * @return the {@link ProxyInputStream#afterRead(int)} consumer. 071 */ 072 public IOIntConsumer getAfterRead() { 073 return afterRead; 074 } 075 076 /** 077 * Sets the {@link ProxyInputStream#afterRead(int)} behavior, null resets to a NOOP. 078 * <p> 079 * Setting this value causes the {@link ProxyInputStream#afterRead(int) afterRead} method to delegate to the given consumer. 080 * </p> 081 * <p> 082 * If a subclass overrides {@link ProxyInputStream#afterRead(int) afterRead} and does not call {@code super.afterRead(int)}, then the given consumer is 083 * not called. 084 * </p> 085 * <p> 086 * This does <em>not</em> override a {@code ProxyInputStream} subclass' implementation of the {@link ProxyInputStream#afterRead(int)} method, it can 087 * supplement it. 088 * </p> 089 * 090 * @param afterRead the {@link ProxyInputStream#afterRead(int)} behavior. 091 * @return this instance. 092 */ 093 public B setAfterRead(final IOIntConsumer afterRead) { 094 this.afterRead = afterRead; 095 return asThis(); 096 } 097 098 } 099 100 /** 101 * Tracks whether {@link #close()} has been called or not. 102 */ 103 private boolean closed; 104 105 /** 106 * Handles exceptions. 107 */ 108 private final IOConsumer<IOException> exceptionHandler; 109 110 private final IOIntConsumer afterRead; 111 112 /** 113 * Constructs a new ProxyInputStream. 114 * 115 * @param builder How to build an instance. 116 * @throws IOException if an I/O error occurs. 117 * @since 2.18.0 118 */ 119 @SuppressWarnings("resource") 120 protected ProxyInputStream(final AbstractBuilder<?, ?> builder) throws IOException { 121 // the delegate is stored in a protected superclass instance variable named 'in'. 122 this(builder.getInputStream(), builder); 123 } 124 125 /** 126 * Constructs a new ProxyInputStream. 127 * 128 * @param proxy the InputStream to proxy. 129 */ 130 public ProxyInputStream(final InputStream proxy) { 131 // the delegate is stored in a protected superclass variable named 'in'. 132 super(proxy); 133 this.exceptionHandler = Erase::rethrow; 134 this.afterRead = IOIntConsumer.NOOP; 135 } 136 137 /** 138 * Constructs a new ProxyInputStream. 139 * 140 * @param proxy the InputStream to proxy. 141 * @param builder How to build an instance. 142 * @since 2.18.0 143 */ 144 protected ProxyInputStream(final InputStream proxy, final AbstractBuilder<?, ?> builder) { 145 // the delegate is stored in a protected superclass instance variable named 'in'. 146 super(proxy); 147 this.exceptionHandler = Erase::rethrow; 148 this.afterRead = builder.getAfterRead() != null ? builder.getAfterRead() : IOIntConsumer.NOOP; 149 } 150 151 /** 152 * Called by the {@code read} methods after the proxied call has returned successfully. The argument is the number of bytes returned to the caller or 153 * {@link IOUtils#EOF EOF} if the end of stream was reached. 154 * <p> 155 * The default delegates to the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}. 156 * </p> 157 * <p> 158 * Alternatively, a subclasses can override this method to add post-processing functionality without having to override all the read methods. 159 * </p> 160 * <p> 161 * Note this method is <em>not</em> called from {@link #skip(long)} or {@link #reset()}. You need to explicitly override those methods if you want to add 162 * post-processing steps also to them. 163 * </p> 164 * 165 * @param n number of bytes read, or {@link IOUtils#EOF EOF} if the end of stream was reached. 166 * @throws IOException Thrown by a subclass or the consumer given to {@link AbstractBuilder#setAfterRead(IOIntConsumer)}. 167 * @since 2.0 168 */ 169 protected void afterRead(final int n) throws IOException { 170 afterRead.accept(n); 171 } 172 173 /** 174 * Invokes the delegate's {@link InputStream#available()} method. 175 * 176 * @return the number of available bytes, 0 if the stream is closed. 177 * @throws IOException if an I/O error occurs. 178 */ 179 @Override 180 public int available() throws IOException { 181 if (in != null && !isClosed()) { 182 try { 183 return in.available(); 184 } catch (final IOException e) { 185 handleIOException(e); 186 } 187 } 188 return 0; 189 } 190 191 /** 192 * Invoked by the {@code read} methods before the call is proxied. The number 193 * of bytes that the caller wanted to read (1 for the {@link #read()} 194 * method, buffer length for {@link #read(byte[])}, etc.) is given as 195 * an argument. 196 * <p> 197 * Subclasses can override this method to add common pre-processing 198 * functionality without having to override all the read methods. 199 * The default implementation does nothing. 200 * </p> 201 * <p> 202 * Note this method is <em>not</em> called from {@link #skip(long)} or 203 * {@link #reset()}. You need to explicitly override those methods if 204 * you want to add pre-processing steps also to them. 205 * </p> 206 * 207 * @param n number of bytes that the caller asked to be read. 208 * @throws IOException if the pre-processing fails in a subclass. 209 * @since 2.0 210 */ 211 @SuppressWarnings("unused") // Possibly thrown from subclasses. 212 protected void beforeRead(final int n) throws IOException { 213 // no-op default 214 } 215 216 /** 217 * Checks if this instance is closed and throws an IOException if so. 218 * 219 * @throws IOException if this instance is closed. 220 */ 221 void checkOpen() throws IOException { 222 Input.checkOpen(!isClosed()); 223 } 224 225 /** 226 * Invokes the delegate's {@link InputStream#close()} method. 227 * 228 * @throws IOException if an I/O error occurs. 229 */ 230 @Override 231 public void close() throws IOException { 232 IOUtils.close(in, this::handleIOException); 233 closed = true; 234 } 235 236 /** 237 * Handles any IOExceptions thrown; by default, throws the given exception. 238 * <p> 239 * This method provides a point to implement custom exception 240 * handling. The default behavior is to re-throw the exception. 241 * </p> 242 * 243 * @param e The IOException thrown. 244 * @throws IOException if an I/O error occurs. 245 * @since 2.0 246 */ 247 protected void handleIOException(final IOException e) throws IOException { 248 exceptionHandler.accept(e); 249 } 250 251 /** 252 * Tests whether this instance is closed. 253 * 254 * @return whether this instance is closed. 255 */ 256 boolean isClosed() { 257 return closed; 258 } 259 260 /** 261 * Invokes the delegate's {@link InputStream#mark(int)} method. 262 * 263 * @param readLimit read ahead limit. 264 */ 265 @Override 266 public synchronized void mark(final int readLimit) { 267 if (in != null) { 268 in.mark(readLimit); 269 } 270 } 271 272 /** 273 * Invokes the delegate's {@link InputStream#markSupported()} method. 274 * 275 * @return {@code true} if this stream instance supports the mark and reset methods; {@code false} otherwise. 276 * @see #mark(int) 277 * @see #reset() 278 */ 279 @Override 280 public boolean markSupported() { 281 return in != null && in.markSupported(); 282 } 283 284 /** 285 * Invokes the delegate's {@link InputStream#read()} method unless the stream is closed. 286 * 287 * @return the byte read or {@link IOUtils#EOF EOF} if we reached the end of stream. 288 * @throws IOException if an I/O error occurs. 289 */ 290 @Override 291 public int read() throws IOException { 292 try { 293 beforeRead(1); 294 final int b = in.read(); 295 afterRead(b != EOF ? 1 : EOF); 296 return b; 297 } catch (final IOException e) { 298 handleIOException(e); 299 return EOF; 300 } 301 } 302 303 /** 304 * Invokes the delegate's {@link InputStream#read(byte[])} method. 305 * 306 * @param b the buffer to read the bytes into. 307 * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream. 308 * @throws IOException 309 * <ul> 310 * <li>If the first byte cannot be read for any reason other than the end of the file, 311 * <li>if the input stream has been closed, or</li> 312 * <li>if some other I/O error occurs.</li> 313 * </ul> 314 */ 315 @Override 316 public int read(final byte[] b) throws IOException { 317 try { 318 beforeRead(IOUtils.length(b)); 319 final int n = in.read(b); 320 afterRead(n); 321 return n; 322 } catch (final IOException e) { 323 handleIOException(e); 324 return EOF; 325 } 326 } 327 328 /** 329 * Invokes the delegate's {@link InputStream#read(byte[], int, int)} method. 330 * 331 * @param b the buffer to read the bytes into. 332 * @param off The start offset. 333 * @param len The number of bytes to read. 334 * @return the number of bytes read or {@link IOUtils#EOF EOF} if we reached the end of stream. 335 * @throws IOException 336 * <ul> 337 * <li>If the first byte cannot be read for any reason other than the end of the file, 338 * <li>if the input stream has been closed, or</li> 339 * <li>if some other I/O error occurs.</li> 340 * </ul> 341 */ 342 @Override 343 public int read(final byte[] b, final int off, final int len) throws IOException { 344 try { 345 beforeRead(len); 346 final int n = in.read(b, off, len); 347 afterRead(n); 348 return n; 349 } catch (final IOException e) { 350 handleIOException(e); 351 return EOF; 352 } 353 } 354 355 /** 356 * Invokes the delegate's {@link InputStream#reset()} method. 357 * 358 * @throws IOException if this stream has not been marked or if the mark has been invalidated. 359 */ 360 @Override 361 public synchronized void reset() throws IOException { 362 try { 363 in.reset(); 364 } catch (final IOException e) { 365 handleIOException(e); 366 } 367 } 368 369 /** 370 * Sets the underlying input stream. 371 * 372 * @param in The input stream to set in {@link java.io.FilterInputStream#in}. 373 * @return this instance. 374 * @since 2.19.0 375 */ 376 public ProxyInputStream setReference(final InputStream in) { 377 this.in = in; 378 return this; 379 } 380 381 /** 382 * Invokes the delegate's {@link InputStream#skip(long)} method. 383 * 384 * @param n the number of bytes to skip. 385 * @return the actual number of bytes skipped. 386 * @throws IOException if the stream does not support seek, or if some other I/O error occurs. 387 */ 388 @Override 389 public long skip(final long n) throws IOException { 390 try { 391 return in.skip(n); 392 } catch (final IOException e) { 393 handleIOException(e); 394 return 0; 395 } 396 } 397 398 /** 399 * Unwraps this instance by returning the underlying {@link InputStream}. 400 * <p> 401 * Use with caution; useful to query the underlying {@link InputStream}. 402 * </p> 403 * 404 * @return the underlying {@link InputStream}. 405 * @since 2.16.0 406 */ 407 public InputStream unwrap() { 408 return in; 409 } 410 411}