001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026 027import org.apache.commons.io.IOUtils; 028import org.apache.commons.io.function.IOConsumer; 029 030/** 031 * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the 032 * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}. 033 * <p> 034 * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly. 035 * </p> 036 * <p> 037 * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually 038 * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must 039 * be used. 040 * </p> 041 * 042 * @see MessageDigestInputStream 043 */ 044public class ObservableInputStream extends ProxyInputStream { 045 046 /** 047 * Abstracts observer callback for {@link ObservableInputStream}s. 048 */ 049 public static abstract class Observer { 050 051 /** 052 * Called to indicate that the {@link ObservableInputStream} has been closed. 053 * 054 * @throws IOException if an I/O error occurs. 055 */ 056 @SuppressWarnings("unused") // Possibly thrown from subclasses. 057 public void closed() throws IOException { 058 // noop 059 } 060 061 /** 062 * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have 063 * been called, and are about to invoke data. 064 * 065 * @param buffer The byte array, which has been passed to the read call, and where data has been stored. 066 * @param offset The offset within the byte array, where data has been stored. 067 * @param length The number of bytes, which have been stored in the byte array. 068 * @throws IOException if an I/O error occurs. 069 */ 070 @SuppressWarnings("unused") // Possibly thrown from subclasses. 071 public void data(final byte[] buffer, final int offset, final int length) throws IOException { 072 // noop 073 } 074 075 /** 076 * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream}, 077 * and will return a value. 078 * 079 * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case, 080 * {@link #finished()} will be invoked instead. 081 * @throws IOException if an I/O error occurs. 082 */ 083 @SuppressWarnings("unused") // Possibly thrown from subclasses. 084 public void data(final int value) throws IOException { 085 // noop 086 } 087 088 /** 089 * Called to indicate that an error occurred on the underlying stream. 090 * 091 * @param exception the exception to throw 092 * @throws IOException if an I/O error occurs. 093 */ 094 public void error(final IOException exception) throws IOException { 095 throw exception; 096 } 097 098 /** 099 * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times, 100 * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF. 101 * 102 * @throws IOException if an I/O error occurs. 103 */ 104 @SuppressWarnings("unused") // Possibly thrown from subclasses. 105 public void finished() throws IOException { 106 // noop 107 } 108 } 109 110 private final List<Observer> observers; 111 112 /** 113 * Constructs a new ObservableInputStream for the given InputStream. 114 * 115 * @param inputStream the input stream to observe. 116 */ 117 public ObservableInputStream(final InputStream inputStream) { 118 this(inputStream, new ArrayList<>()); 119 } 120 121 /** 122 * Constructs a new ObservableInputStream for the given InputStream. 123 * 124 * @param inputStream the input stream to observe. 125 * @param observers List of observer callbacks. 126 */ 127 private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) { 128 super(inputStream); 129 this.observers = observers; 130 } 131 132 /** 133 * Constructs a new ObservableInputStream for the given InputStream. 134 * 135 * @param inputStream the input stream to observe. 136 * @param observers List of observer callbacks. 137 * @since 2.9.0 138 */ 139 public ObservableInputStream(final InputStream inputStream, final Observer... observers) { 140 this(inputStream, Arrays.asList(observers)); 141 } 142 143 /** 144 * Adds an Observer. 145 * 146 * @param observer the observer to add. 147 */ 148 public void add(final Observer observer) { 149 observers.add(observer); 150 } 151 152 @Override 153 public void close() throws IOException { 154 IOException ioe = null; 155 try { 156 super.close(); 157 } catch (final IOException e) { 158 ioe = e; 159 } 160 if (ioe == null) { 161 noteClosed(); 162 } else { 163 noteError(ioe); 164 } 165 } 166 167 /** 168 * Reads all data from the underlying {@link InputStream}, while notifying the observers. 169 * 170 * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception. 171 */ 172 public void consume() throws IOException { 173 IOUtils.consume(this); 174 } 175 176 private void forEachObserver(final IOConsumer<Observer> action) throws IOException { 177 IOConsumer.forAll(action, observers); 178 } 179 180 /** 181 * Gets a copy of currently registered observers. 182 * 183 * @return a copy of the list of currently registered observers. 184 * @since 2.9.0 185 */ 186 public List<Observer> getObservers() { 187 return new ArrayList<>(observers); 188 } 189 190 /** 191 * Notifies the observers by invoking {@link Observer#finished()}. 192 * 193 * @throws IOException Some observer has thrown an exception, which is being passed down. 194 */ 195 protected void noteClosed() throws IOException { 196 forEachObserver(Observer::closed); 197 } 198 199 /** 200 * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments. 201 * 202 * @param value Passed to the observers. 203 * @throws IOException Some observer has thrown an exception, which is being passed down. 204 */ 205 protected void noteDataByte(final int value) throws IOException { 206 forEachObserver(observer -> observer.data(value)); 207 } 208 209 /** 210 * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments. 211 * 212 * @param buffer Passed to the observers. 213 * @param offset Passed to the observers. 214 * @param length Passed to the observers. 215 * @throws IOException Some observer has thrown an exception, which is being passed down. 216 */ 217 protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException { 218 forEachObserver(observer -> observer.data(buffer, offset, length)); 219 } 220 221 /** 222 * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument. 223 * 224 * @param exception Passed to the observers. 225 * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same 226 * exception, which has been passed as an argument. 227 */ 228 protected void noteError(final IOException exception) throws IOException { 229 forEachObserver(observer -> observer.error(exception)); 230 } 231 232 /** 233 * Notifies the observers by invoking {@link Observer#finished()}. 234 * 235 * @throws IOException Some observer has thrown an exception, which is being passed down. 236 */ 237 protected void noteFinished() throws IOException { 238 forEachObserver(Observer::finished); 239 } 240 241 private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException { 242 if (ioe != null) { 243 noteError(ioe); 244 throw ioe; 245 } 246 if (result == EOF) { 247 noteFinished(); 248 } else if (result > 0) { 249 noteDataBytes(buffer, offset, result); 250 } 251 } 252 253 @Override 254 public int read() throws IOException { 255 int result = 0; 256 IOException ioe = null; 257 try { 258 result = super.read(); 259 } catch (final IOException ex) { 260 ioe = ex; 261 } 262 if (ioe != null) { 263 noteError(ioe); 264 throw ioe; 265 } 266 if (result == EOF) { 267 noteFinished(); 268 } else { 269 noteDataByte(result); 270 } 271 return result; 272 } 273 274 @Override 275 public int read(final byte[] buffer) throws IOException { 276 int result = 0; 277 IOException ioe = null; 278 try { 279 result = super.read(buffer); 280 } catch (final IOException ex) { 281 ioe = ex; 282 } 283 notify(buffer, 0, result, ioe); 284 return result; 285 } 286 287 @Override 288 public int read(final byte[] buffer, final int offset, final int length) throws IOException { 289 int result = 0; 290 IOException ioe = null; 291 try { 292 result = super.read(buffer, offset, length); 293 } catch (final IOException ex) { 294 ioe = ex; 295 } 296 notify(buffer, offset, result, ioe); 297 return result; 298 } 299 300 /** 301 * Removes an Observer. 302 * 303 * @param observer the observer to remove 304 */ 305 public void remove(final Observer observer) { 306 observers.remove(observer); 307 } 308 309 /** 310 * Removes all Observers. 311 */ 312 public void removeAllObservers() { 313 observers.clear(); 314 } 315 316}