001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026
027import org.apache.commons.io.IOUtils;
028import org.apache.commons.io.function.IOConsumer;
029
030/**
031 * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the
032 * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}.
033 * <p>
034 * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly.
035 * </p>
036 * <p>
037 * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually
038 * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must
039 * be used.
040 * </p>
041 *
042 * @see MessageDigestInputStream
043 */
044public class ObservableInputStream extends ProxyInputStream {
045
046    /**
047     * For subclassing builders from {@link BoundedInputStream} subclassses.
048     *
049     * @param <T> The subclass.
050     * @since 2.18.0
051     */
052    public abstract static class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<ObservableInputStream, T> {
053
054        private List<Observer> observers;
055
056        /**
057         * Constructs a new instance for subclasses.
058         */
059        public AbstractBuilder() {
060            // empty
061        }
062
063        /**
064         * Sets the list of observer callbacks.
065         *
066         * @param observers The list of observer callbacks.
067         */
068        public void setObservers(final List<Observer> observers) {
069            this.observers = observers;
070        }
071
072    }
073
074    /**
075     * Builds instances of {@link ObservableInputStream}.
076     *
077     * @since 2.18.0
078     */
079    public static class Builder extends AbstractBuilder<Builder> {
080
081        /**
082         * Constructs a new builder of {@link ObservableInputStream}.
083         */
084        public Builder() {
085            // empty
086        }
087
088        @Override
089        public ObservableInputStream get() throws IOException {
090            return new ObservableInputStream(this);
091        }
092
093    }
094
095    /**
096     * Abstracts observer callback for {@link ObservableInputStream}s.
097     */
098    public abstract static class Observer {
099
100        /**
101         * Constructs a new instance for subclasses.
102         */
103        public Observer() {
104            // empty
105        }
106
107        /**
108         * Called to indicate that the {@link ObservableInputStream} has been closed.
109         *
110         * @throws IOException if an I/O error occurs.
111         */
112        @SuppressWarnings("unused") // Possibly thrown from subclasses.
113        public void closed() throws IOException {
114            // noop
115        }
116
117        /**
118         * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have
119         * been called, and are about to invoke data.
120         *
121         * @param buffer The byte array, which has been passed to the read call, and where data has been stored.
122         * @param offset The offset within the byte array, where data has been stored.
123         * @param length The number of bytes, which have been stored in the byte array.
124         * @throws IOException if an I/O error occurs.
125         */
126        @SuppressWarnings("unused") // Possibly thrown from subclasses.
127        public void data(final byte[] buffer, final int offset, final int length) throws IOException {
128            // noop
129        }
130
131        /**
132         * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream},
133         * and will return a value.
134         *
135         * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case,
136         *        {@link #finished()} will be invoked instead.
137         * @throws IOException if an I/O error occurs.
138         */
139        @SuppressWarnings("unused") // Possibly thrown from subclasses.
140        public void data(final int value) throws IOException {
141            // noop
142        }
143
144        /**
145         * Called to indicate that an error occurred on the underlying stream.
146         *
147         * @param exception the exception to throw
148         * @throws IOException if an I/O error occurs.
149         */
150        public void error(final IOException exception) throws IOException {
151            throw exception;
152        }
153
154        /**
155         * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times,
156         * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF.
157         *
158         * @throws IOException if an I/O error occurs.
159         */
160        @SuppressWarnings("unused") // Possibly thrown from subclasses.
161        public void finished() throws IOException {
162            // noop
163        }
164    }
165
166    private final List<Observer> observers;
167
168    ObservableInputStream(final AbstractBuilder builder) throws IOException {
169        super(builder);
170        this.observers = builder.observers;
171    }
172
173    /**
174     * Constructs a new ObservableInputStream for the given InputStream.
175     *
176     * @param inputStream the input stream to observe.
177     */
178    public ObservableInputStream(final InputStream inputStream) {
179        this(inputStream, new ArrayList<>());
180    }
181
182    /**
183     * Constructs a new ObservableInputStream for the given InputStream.
184     *
185     * @param inputStream the input stream to observe.
186     * @param observers List of observer callbacks.
187     */
188    private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) {
189        super(inputStream);
190        this.observers = observers;
191    }
192
193    /**
194     * Constructs a new ObservableInputStream for the given InputStream.
195     *
196     * @param inputStream the input stream to observe.
197     * @param observers List of observer callbacks.
198     * @since 2.9.0
199     */
200    public ObservableInputStream(final InputStream inputStream, final Observer... observers) {
201        this(inputStream, Arrays.asList(observers));
202    }
203
204    /**
205     * Adds an Observer.
206     *
207     * @param observer the observer to add.
208     */
209    public void add(final Observer observer) {
210        observers.add(observer);
211    }
212
213    @Override
214    public void close() throws IOException {
215        IOException ioe = null;
216        try {
217            super.close();
218        } catch (final IOException e) {
219            ioe = e;
220        }
221        if (ioe == null) {
222            noteClosed();
223        } else {
224            noteError(ioe);
225        }
226    }
227
228    /**
229     * Reads all data from the underlying {@link InputStream}, while notifying the observers.
230     *
231     * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception.
232     */
233    public void consume() throws IOException {
234        IOUtils.consume(this);
235    }
236
237    private void forEachObserver(final IOConsumer<Observer> action) throws IOException {
238        IOConsumer.forAll(action, observers);
239    }
240
241    /**
242     * Gets a copy of currently registered observers.
243     *
244     * @return a copy of the list of currently registered observers.
245     * @since 2.9.0
246     */
247    public List<Observer> getObservers() {
248        return new ArrayList<>(observers);
249    }
250
251    /**
252     * Notifies the observers by invoking {@link Observer#finished()}.
253     *
254     * @throws IOException Some observer has thrown an exception, which is being passed down.
255     */
256    protected void noteClosed() throws IOException {
257        forEachObserver(Observer::closed);
258    }
259
260    /**
261     * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments.
262     *
263     * @param value Passed to the observers.
264     * @throws IOException Some observer has thrown an exception, which is being passed down.
265     */
266    protected void noteDataByte(final int value) throws IOException {
267        forEachObserver(observer -> observer.data(value));
268    }
269
270    /**
271     * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments.
272     *
273     * @param buffer Passed to the observers.
274     * @param offset Passed to the observers.
275     * @param length Passed to the observers.
276     * @throws IOException Some observer has thrown an exception, which is being passed down.
277     */
278    protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException {
279        forEachObserver(observer -> observer.data(buffer, offset, length));
280    }
281
282    /**
283     * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument.
284     *
285     * @param exception Passed to the observers.
286     * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same
287     *         exception, which has been passed as an argument.
288     */
289    protected void noteError(final IOException exception) throws IOException {
290        forEachObserver(observer -> observer.error(exception));
291    }
292
293    /**
294     * Notifies the observers by invoking {@link Observer#finished()}.
295     *
296     * @throws IOException Some observer has thrown an exception, which is being passed down.
297     */
298    protected void noteFinished() throws IOException {
299        forEachObserver(Observer::finished);
300    }
301
302    private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException {
303        if (ioe != null) {
304            noteError(ioe);
305            throw ioe;
306        }
307        if (result == EOF) {
308            noteFinished();
309        } else if (result > 0) {
310            noteDataBytes(buffer, offset, result);
311        }
312    }
313
314    @Override
315    public int read() throws IOException {
316        int result = 0;
317        IOException ioe = null;
318        try {
319            result = super.read();
320        } catch (final IOException ex) {
321            ioe = ex;
322        }
323        if (ioe != null) {
324            noteError(ioe);
325            throw ioe;
326        }
327        if (result == EOF) {
328            noteFinished();
329        } else {
330            noteDataByte(result);
331        }
332        return result;
333    }
334
335    @Override
336    public int read(final byte[] buffer) throws IOException {
337        int result = 0;
338        IOException ioe = null;
339        try {
340            result = super.read(buffer);
341        } catch (final IOException ex) {
342            ioe = ex;
343        }
344        notify(buffer, 0, result, ioe);
345        return result;
346    }
347
348    @Override
349    public int read(final byte[] buffer, final int offset, final int length) throws IOException {
350        int result = 0;
351        IOException ioe = null;
352        try {
353            result = super.read(buffer, offset, length);
354        } catch (final IOException ex) {
355            ioe = ex;
356        }
357        notify(buffer, offset, result, ioe);
358        return result;
359    }
360
361    /**
362     * Removes an Observer.
363     *
364     * @param observer the observer to remove
365     */
366    public void remove(final Observer observer) {
367        observers.remove(observer);
368    }
369
370    /**
371     * Removes all Observers.
372     */
373    public void removeAllObservers() {
374        observers.clear();
375    }
376
377}