001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PipedInputStream;
024import java.io.PipedOutputStream;
025import java.time.Duration;
026import java.util.Objects;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.TimeUnit;
030
031import org.apache.commons.io.build.AbstractStreamBuilder;
032import org.apache.commons.io.output.QueueOutputStream;
033
034/**
035 * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream.
036 * <p>
037 * To build an instance, use {@link Builder}.
038 * </p>
039 * <p>
040 * Example usage:
041 * </p>
042 * <pre>
043 * QueueInputStream inputStream = new QueueInputStream();
044 * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
045 *
046 * outputStream.write("hello world".getBytes(UTF_8));
047 * inputStream.read();
048 * </pre>
049 * <p>
050 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
051 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
052 * </p>
053 * <p>
054 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
055 * {@link IOException}.
056 * </p>
057 *
058 * @see Builder
059 * @see QueueOutputStream
060 * @since 2.9.0
061 */
062public class QueueInputStream extends InputStream {
063
064    // @formatter:off
065    /**
066     * Builds a new {@link QueueInputStream}.
067     *
068     * <p>
069     * For example:
070     * </p>
071     * <pre>{@code
072     * QueueInputStream s = QueueInputStream.builder()
073     *   .setBlockingQueue(new LinkedBlockingQueue<>())
074     *   .setTimeout(Duration.ZERO)
075     *   .get();}
076     * </pre>
077     *
078     * @see #get()
079     * @since 2.12.0
080     */
081    // @formatter:on
082    public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
083
084        private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
085        private Duration timeout = Duration.ZERO;
086
087        /**
088         * Constructs a new builder of {@link QueueInputStream}.
089         */
090        public Builder() {
091            // empty
092        }
093
094        /**
095         * Builds a new {@link QueueInputStream}.
096         * <p>
097         * This builder uses the following aspects:
098         * </p>
099         * <ul>
100         * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
101         * <li>timeout</li>
102         * </ul>
103         *
104         * @return a new instance.
105         * @see #getUnchecked()
106         */
107        @Override
108        public QueueInputStream get() {
109            return new QueueInputStream(this);
110        }
111
112        /**
113         * Sets backing queue for the stream.
114         *
115         * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance.
116         * @return {@code this} instance.
117         */
118        public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
119            this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
120            return this;
121        }
122
123        /**
124         * Sets the polling timeout.
125         *
126         * @param timeout the polling timeout.
127         * @return {@code this} instance.
128         */
129        public Builder setTimeout(final Duration timeout) {
130            if (timeout != null && timeout.toNanos() < 0) {
131                throw new IllegalArgumentException("timeout must not be negative");
132            }
133            this.timeout = timeout != null ? timeout : Duration.ZERO;
134            return this;
135        }
136
137    }
138
139    /**
140     * Constructs a new {@link Builder}.
141     *
142     * @return a new {@link Builder}.
143     * @since 2.12.0
144     */
145    public static Builder builder() {
146        return new Builder();
147    }
148
149    private final BlockingQueue<Integer> blockingQueue;
150
151    private final long timeoutNanos;
152
153    /**
154     * Constructs a new instance with no limit to its internal queue size and zero timeout.
155     */
156    public QueueInputStream() {
157        this(new LinkedBlockingQueue<>());
158    }
159
160    /**
161     * Constructs a new instance with given queue and zero timeout.
162     *
163     * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance.
164     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
165     */
166    @Deprecated
167    public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
168        this(builder().setBlockingQueue(blockingQueue));
169    }
170
171    /**
172     * Constructs a new instance.
173     *
174     * @param builder The builder.
175     */
176    private QueueInputStream(final Builder builder) {
177        this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue");
178        this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos();
179    }
180
181    /**
182     * Gets the blocking queue.
183     *
184     * @return the blocking queue.
185     */
186    BlockingQueue<Integer> getBlockingQueue() {
187        return blockingQueue;
188    }
189
190    /**
191     * Gets the timeout duration.
192     *
193     * @return the timeout duration.
194     */
195    Duration getTimeout() {
196        return Duration.ofNanos(timeoutNanos);
197    }
198
199    /**
200     * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
201     *
202     * @return QueueOutputStream connected to this stream.
203     */
204    public QueueOutputStream newQueueOutputStream() {
205        return new QueueOutputStream(blockingQueue);
206    }
207
208    /**
209     * Reads and returns a single byte.
210     *
211     * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
212     * @throws IllegalStateException if thread is interrupted while waiting.
213     */
214    @Override
215    public int read() {
216        try {
217            final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
218            return value == null ? EOF : 0xFF & value;
219        } catch (final InterruptedException e) {
220            Thread.currentThread().interrupt();
221            // throw runtime unchecked exception to maintain signature backward-compatibility of
222            // this read method, which does not declare IOException
223            throw new IllegalStateException(e);
224        }
225    }
226
227}