ObservableInputStream.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.commons.io.input;

import static org.apache.commons.io.IOUtils.EOF;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.commons.io.function.IOConsumer;

/**
 * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the
 * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}.
 * <p>
 * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly.
 * </p>
 * <p>
 * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually
 * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must
 * be used.
 * </p>
 *
 * @see MessageDigestInputStream
 */
public class ObservableInputStream extends ProxyInputStream {

    /**
     * For subclassing builders from {@link BoundedInputStream} subclassses.
     *
     * @param <T> The subclass.
     * @since 2.18.0
     */
    public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<ObservableInputStream, T> {

        private List<Observer> observers;

        /**
         * Sets the list of observer callbacks.
         *
         * @param observers The list of observer callbacks.
         */
        public void setObservers(final List<Observer> observers) {
            this.observers = observers;
        }

    }


    /**
     * Builds instances of {@link ObservableInputStream}.
     *
     * @since 2.18.0
     */
    public static class Builder extends AbstractBuilder<Builder> {

        @Override
        public ObservableInputStream get() throws IOException {
            return new ObservableInputStream(this);
        }

    }

    /**
     * Abstracts observer callback for {@link ObservableInputStream}s.
     */
    public static abstract class Observer {

        /**
         * Called to indicate that the {@link ObservableInputStream} has been closed.
         *
         * @throws IOException if an I/O error occurs.
         */
        @SuppressWarnings("unused") // Possibly thrown from subclasses.
        public void closed() throws IOException {
            // noop
        }

        /**
         * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have
         * been called, and are about to invoke data.
         *
         * @param buffer The byte array, which has been passed to the read call, and where data has been stored.
         * @param offset The offset within the byte array, where data has been stored.
         * @param length The number of bytes, which have been stored in the byte array.
         * @throws IOException if an I/O error occurs.
         */
        @SuppressWarnings("unused") // Possibly thrown from subclasses.
        public void data(final byte[] buffer, final int offset, final int length) throws IOException {
            // noop
        }

        /**
         * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream},
         * and will return a value.
         *
         * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case,
         *        {@link #finished()} will be invoked instead.
         * @throws IOException if an I/O error occurs.
         */
        @SuppressWarnings("unused") // Possibly thrown from subclasses.
        public void data(final int value) throws IOException {
            // noop
        }

        /**
         * Called to indicate that an error occurred on the underlying stream.
         *
         * @param exception the exception to throw
         * @throws IOException if an I/O error occurs.
         */
        public void error(final IOException exception) throws IOException {
            throw exception;
        }

        /**
         * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times,
         * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF.
         *
         * @throws IOException if an I/O error occurs.
         */
        @SuppressWarnings("unused") // Possibly thrown from subclasses.
        public void finished() throws IOException {
            // noop
        }
    }

    private final List<Observer> observers;

    ObservableInputStream(final AbstractBuilder builder) throws IOException {
        super(builder);
        this.observers = builder.observers;
    }

    /**
     * Constructs a new ObservableInputStream for the given InputStream.
     *
     * @param inputStream the input stream to observe.
     */
    public ObservableInputStream(final InputStream inputStream) {
        this(inputStream, new ArrayList<>());
    }

    /**
     * Constructs a new ObservableInputStream for the given InputStream.
     *
     * @param inputStream the input stream to observe.
     * @param observers List of observer callbacks.
     */
    private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) {
        super(inputStream);
        this.observers = observers;
    }

    /**
     * Constructs a new ObservableInputStream for the given InputStream.
     *
     * @param inputStream the input stream to observe.
     * @param observers List of observer callbacks.
     * @since 2.9.0
     */
    public ObservableInputStream(final InputStream inputStream, final Observer... observers) {
        this(inputStream, Arrays.asList(observers));
    }

    /**
     * Adds an Observer.
     *
     * @param observer the observer to add.
     */
    public void add(final Observer observer) {
        observers.add(observer);
    }

    @Override
    public void close() throws IOException {
        IOException ioe = null;
        try {
            super.close();
        } catch (final IOException e) {
            ioe = e;
        }
        if (ioe == null) {
            noteClosed();
        } else {
            noteError(ioe);
        }
    }

    /**
     * Reads all data from the underlying {@link InputStream}, while notifying the observers.
     *
     * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception.
     */
    public void consume() throws IOException {
        IOUtils.consume(this);
    }

    private void forEachObserver(final IOConsumer<Observer> action) throws IOException {
        IOConsumer.forAll(action, observers);
    }

    /**
     * Gets a copy of currently registered observers.
     *
     * @return a copy of the list of currently registered observers.
     * @since 2.9.0
     */
    public List<Observer> getObservers() {
        return new ArrayList<>(observers);
    }

    /**
     * Notifies the observers by invoking {@link Observer#finished()}.
     *
     * @throws IOException Some observer has thrown an exception, which is being passed down.
     */
    protected void noteClosed() throws IOException {
        forEachObserver(Observer::closed);
    }

    /**
     * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments.
     *
     * @param value Passed to the observers.
     * @throws IOException Some observer has thrown an exception, which is being passed down.
     */
    protected void noteDataByte(final int value) throws IOException {
        forEachObserver(observer -> observer.data(value));
    }

    /**
     * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments.
     *
     * @param buffer Passed to the observers.
     * @param offset Passed to the observers.
     * @param length Passed to the observers.
     * @throws IOException Some observer has thrown an exception, which is being passed down.
     */
    protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException {
        forEachObserver(observer -> observer.data(buffer, offset, length));
    }

    /**
     * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument.
     *
     * @param exception Passed to the observers.
     * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same
     *         exception, which has been passed as an argument.
     */
    protected void noteError(final IOException exception) throws IOException {
        forEachObserver(observer -> observer.error(exception));
    }

    /**
     * Notifies the observers by invoking {@link Observer#finished()}.
     *
     * @throws IOException Some observer has thrown an exception, which is being passed down.
     */
    protected void noteFinished() throws IOException {
        forEachObserver(Observer::finished);
    }

    private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException {
        if (ioe != null) {
            noteError(ioe);
            throw ioe;
        }
        if (result == EOF) {
            noteFinished();
        } else if (result > 0) {
            noteDataBytes(buffer, offset, result);
        }
    }

    @Override
    public int read() throws IOException {
        int result = 0;
        IOException ioe = null;
        try {
            result = super.read();
        } catch (final IOException ex) {
            ioe = ex;
        }
        if (ioe != null) {
            noteError(ioe);
            throw ioe;
        }
        if (result == EOF) {
            noteFinished();
        } else {
            noteDataByte(result);
        }
        return result;
    }

    @Override
    public int read(final byte[] buffer) throws IOException {
        int result = 0;
        IOException ioe = null;
        try {
            result = super.read(buffer);
        } catch (final IOException ex) {
            ioe = ex;
        }
        notify(buffer, 0, result, ioe);
        return result;
    }

    @Override
    public int read(final byte[] buffer, final int offset, final int length) throws IOException {
        int result = 0;
        IOException ioe = null;
        try {
            result = super.read(buffer, offset, length);
        } catch (final IOException ex) {
            ioe = ex;
        }
        notify(buffer, offset, result, ioe);
        return result;
    }

    /**
     * Removes an Observer.
     *
     * @param observer the observer to remove
     */
    public void remove(final Observer observer) {
        observers.remove(observer);
    }

    /**
     * Removes all Observers.
     */
    public void removeAllObservers() {
        observers.clear();
    }

}