001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026 027import org.apache.commons.io.IOUtils; 028import org.apache.commons.io.function.IOConsumer; 029 030/** 031 * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the 032 * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}. 033 * <p> 034 * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly. 035 * </p> 036 * <p> 037 * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually 038 * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must 039 * be used. 040 * </p> 041 * 042 * @see MessageDigestInputStream 043 */ 044public class ObservableInputStream extends ProxyInputStream { 045 046 /** 047 * For subclassing builders from {@link BoundedInputStream} subclassses. 048 * 049 * @param <T> The subclass. 050 * @since 2.18.0 051 */ 052 public abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<ObservableInputStream, T> { 053 054 private List<Observer> observers; 055 056 /** 057 * Constructs a new instance for subclasses. 058 */ 059 public AbstractBuilder() { 060 // empty 061 } 062 063 /** 064 * Sets the list of observer callbacks. 065 * 066 * @param observers The list of observer callbacks. 067 */ 068 public void setObservers(final List<Observer> observers) { 069 this.observers = observers; 070 } 071 072 } 073 074 /** 075 * Builds instances of {@link ObservableInputStream}. 076 * 077 * @since 2.18.0 078 */ 079 public static class Builder extends AbstractBuilder<Builder> { 080 081 /** 082 * Constructs a new builder of {@link ObservableInputStream}. 083 */ 084 public Builder() { 085 // empty 086 } 087 088 @Override 089 public ObservableInputStream get() throws IOException { 090 return new ObservableInputStream(this); 091 } 092 093 } 094 095 /** 096 * Abstracts observer callback for {@link ObservableInputStream}s. 097 */ 098 public abstract static class Observer { 099 100 /** 101 * Constructs a new instance for subclasses. 102 */ 103 public Observer() { 104 // empty 105 } 106 107 /** 108 * Called to indicate that the {@link ObservableInputStream} has been closed. 109 * 110 * @throws IOException if an I/O error occurs. 111 */ 112 @SuppressWarnings("unused") // Possibly thrown from subclasses. 113 public void closed() throws IOException { 114 // noop 115 } 116 117 /** 118 * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have 119 * been called, and are about to invoke data. 120 * 121 * @param buffer The byte array, which has been passed to the read call, and where data has been stored. 122 * @param offset The offset within the byte array, where data has been stored. 123 * @param length The number of bytes, which have been stored in the byte array. 124 * @throws IOException if an I/O error occurs. 125 */ 126 @SuppressWarnings("unused") // Possibly thrown from subclasses. 127 public void data(final byte[] buffer, final int offset, final int length) throws IOException { 128 // noop 129 } 130 131 /** 132 * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream}, 133 * and will return a value. 134 * 135 * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case, 136 * {@link #finished()} will be invoked instead. 137 * @throws IOException if an I/O error occurs. 138 */ 139 @SuppressWarnings("unused") // Possibly thrown from subclasses. 140 public void data(final int value) throws IOException { 141 // noop 142 } 143 144 /** 145 * Called to indicate that an error occurred on the underlying stream. 146 * 147 * @param exception the exception to throw 148 * @throws IOException if an I/O error occurs. 149 */ 150 public void error(final IOException exception) throws IOException { 151 throw exception; 152 } 153 154 /** 155 * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times, 156 * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF. 157 * 158 * @throws IOException if an I/O error occurs. 159 */ 160 @SuppressWarnings("unused") // Possibly thrown from subclasses. 161 public void finished() throws IOException { 162 // noop 163 } 164 } 165 166 private final List<Observer> observers; 167 168 ObservableInputStream(final AbstractBuilder builder) throws IOException { 169 super(builder); 170 this.observers = builder.observers; 171 } 172 173 /** 174 * Constructs a new ObservableInputStream for the given InputStream. 175 * 176 * @param inputStream the input stream to observe. 177 */ 178 public ObservableInputStream(final InputStream inputStream) { 179 this(inputStream, new ArrayList<>()); 180 } 181 182 /** 183 * Constructs a new ObservableInputStream for the given InputStream. 184 * 185 * @param inputStream the input stream to observe. 186 * @param observers List of observer callbacks. 187 */ 188 private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) { 189 super(inputStream); 190 this.observers = observers; 191 } 192 193 /** 194 * Constructs a new ObservableInputStream for the given InputStream. 195 * 196 * @param inputStream the input stream to observe. 197 * @param observers List of observer callbacks. 198 * @since 2.9.0 199 */ 200 public ObservableInputStream(final InputStream inputStream, final Observer... observers) { 201 this(inputStream, Arrays.asList(observers)); 202 } 203 204 /** 205 * Adds an Observer. 206 * 207 * @param observer the observer to add. 208 */ 209 public void add(final Observer observer) { 210 observers.add(observer); 211 } 212 213 @Override 214 public void close() throws IOException { 215 IOException ioe = null; 216 try { 217 super.close(); 218 } catch (final IOException e) { 219 ioe = e; 220 } 221 if (ioe == null) { 222 noteClosed(); 223 } else { 224 noteError(ioe); 225 } 226 } 227 228 /** 229 * Reads all data from the underlying {@link InputStream}, while notifying the observers. 230 * 231 * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception. 232 */ 233 public void consume() throws IOException { 234 IOUtils.consume(this); 235 } 236 237 private void forEachObserver(final IOConsumer<Observer> action) throws IOException { 238 IOConsumer.forAll(action, observers); 239 } 240 241 /** 242 * Gets a copy of currently registered observers. 243 * 244 * @return a copy of the list of currently registered observers. 245 * @since 2.9.0 246 */ 247 public List<Observer> getObservers() { 248 return new ArrayList<>(observers); 249 } 250 251 /** 252 * Notifies the observers by invoking {@link Observer#finished()}. 253 * 254 * @throws IOException Some observer has thrown an exception, which is being passed down. 255 */ 256 protected void noteClosed() throws IOException { 257 forEachObserver(Observer::closed); 258 } 259 260 /** 261 * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments. 262 * 263 * @param value Passed to the observers. 264 * @throws IOException Some observer has thrown an exception, which is being passed down. 265 */ 266 protected void noteDataByte(final int value) throws IOException { 267 forEachObserver(observer -> observer.data(value)); 268 } 269 270 /** 271 * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments. 272 * 273 * @param buffer Passed to the observers. 274 * @param offset Passed to the observers. 275 * @param length Passed to the observers. 276 * @throws IOException Some observer has thrown an exception, which is being passed down. 277 */ 278 protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException { 279 forEachObserver(observer -> observer.data(buffer, offset, length)); 280 } 281 282 /** 283 * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument. 284 * 285 * @param exception Passed to the observers. 286 * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same 287 * exception, which has been passed as an argument. 288 */ 289 protected void noteError(final IOException exception) throws IOException { 290 forEachObserver(observer -> observer.error(exception)); 291 } 292 293 /** 294 * Notifies the observers by invoking {@link Observer#finished()}. 295 * 296 * @throws IOException Some observer has thrown an exception, which is being passed down. 297 */ 298 protected void noteFinished() throws IOException { 299 forEachObserver(Observer::finished); 300 } 301 302 private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException { 303 if (ioe != null) { 304 noteError(ioe); 305 throw ioe; 306 } 307 if (result == EOF) { 308 noteFinished(); 309 } else if (result > 0) { 310 noteDataBytes(buffer, offset, result); 311 } 312 } 313 314 @Override 315 public int read() throws IOException { 316 int result = 0; 317 IOException ioe = null; 318 try { 319 result = super.read(); 320 } catch (final IOException ex) { 321 ioe = ex; 322 } 323 if (ioe != null) { 324 noteError(ioe); 325 throw ioe; 326 } 327 if (result == EOF) { 328 noteFinished(); 329 } else { 330 noteDataByte(result); 331 } 332 return result; 333 } 334 335 @Override 336 public int read(final byte[] buffer) throws IOException { 337 int result = 0; 338 IOException ioe = null; 339 try { 340 result = super.read(buffer); 341 } catch (final IOException ex) { 342 ioe = ex; 343 } 344 notify(buffer, 0, result, ioe); 345 return result; 346 } 347 348 @Override 349 public int read(final byte[] buffer, final int offset, final int length) throws IOException { 350 int result = 0; 351 IOException ioe = null; 352 try { 353 result = super.read(buffer, offset, length); 354 } catch (final IOException ex) { 355 ioe = ex; 356 } 357 notify(buffer, offset, result, ioe); 358 return result; 359 } 360 361 /** 362 * Removes an Observer. 363 * 364 * @param observer the observer to remove 365 */ 366 public void remove(final Observer observer) { 367 observers.remove(observer); 368 } 369 370 /** 371 * Removes all Observers. 372 */ 373 public void removeAllObservers() { 374 observers.clear(); 375 } 376 377}