001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PipedInputStream;
024import java.io.PipedOutputStream;
025import java.time.Duration;
026import java.util.Objects;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.TimeUnit;
030
031import org.apache.commons.io.build.AbstractStreamBuilder;
032import org.apache.commons.io.output.QueueOutputStream;
033
034/**
035 * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream.
036 * <p>
037 * To build an instance, use {@link Builder}.
038 * </p>
039 * <p>
040 * Example usage:
041 * </p>
042 * <pre>
043 * QueueInputStream inputStream = new QueueInputStream();
044 * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
045 *
046 * outputStream.write("hello world".getBytes(UTF_8));
047 * inputStream.read();
048 * </pre>
049 * <p>
050 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
051 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
052 * </p>
053 * <p>
054 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
055 * {@link IOException}.
056 * </p>
057 *
058 * @see Builder
059 * @see QueueOutputStream
060 * @since 2.9.0
061 */
062public class QueueInputStream extends InputStream {
063
064    // @formatter:off
065    /**
066     * Builds a new {@link QueueInputStream}.
067     *
068     * <p>
069     * For example:
070     * </p>
071     * <pre>{@code
072     * QueueInputStream s = QueueInputStream.builder()
073     *   .setBlockingQueue(new LinkedBlockingQueue<>())
074     *   .setTimeout(Duration.ZERO)
075     *   .get();}
076     * </pre>
077     *
078     * @see #get()
079     * @since 2.12.0
080     */
081    // @formatter:on
082    public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
083
084        private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
085        private Duration timeout = Duration.ZERO;
086
087        /**
088         * Builds a new {@link QueueInputStream}.
089         * <p>
090         * This builder use the following aspects:
091         * </p>
092         * <ul>
093         * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
094         * <li>timeout</li>
095         * </ul>
096         *
097         * @return a new instance.
098         */
099        @Override
100        public QueueInputStream get() {
101            return new QueueInputStream(blockingQueue, timeout);
102        }
103
104        /**
105         * Sets backing queue for the stream.
106         *
107         * @param blockingQueue backing queue for the stream.
108         * @return {@code this} instance.
109         */
110        public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
111            this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
112            return this;
113        }
114
115        /**
116         * Sets the polling timeout.
117         *
118         * @param timeout the polling timeout.
119         * @return {@code this} instance.
120         */
121        public Builder setTimeout(final Duration timeout) {
122            if (timeout != null && timeout.toNanos() < 0) {
123                throw new IllegalArgumentException("timeout must not be negative");
124            }
125            this.timeout = timeout != null ? timeout : Duration.ZERO;
126            return this;
127        }
128
129    }
130
131    /**
132     * Constructs a new {@link Builder}.
133     *
134     * @return a new {@link Builder}.
135     * @since 2.12.0
136     */
137    public static Builder builder() {
138        return new Builder();
139    }
140
141    private final BlockingQueue<Integer> blockingQueue;
142
143    private final long timeoutNanos;
144
145    /**
146     * Constructs a new instance with no limit to its internal queue size and zero timeout.
147     */
148    public QueueInputStream() {
149        this(new LinkedBlockingQueue<>());
150    }
151
152    /**
153     * Constructs a new instance with given queue and zero timeout.
154     *
155     * @param blockingQueue backing queue for the stream.
156     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
157     */
158    @Deprecated
159    public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
160        this(blockingQueue, Duration.ZERO);
161    }
162
163    /**
164     * Constructs a new instance with given queue and timeout.
165     *
166     * @param blockingQueue backing queue for the stream.
167     * @param timeout       how long to wait before giving up when polling the queue.
168     */
169    private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout) {
170        this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
171        this.timeoutNanos = Objects.requireNonNull(timeout, "timeout").toNanos();
172    }
173
174    /**
175     * Gets the blocking queue.
176     *
177     * @return the blocking queue.
178     */
179    BlockingQueue<Integer> getBlockingQueue() {
180        return blockingQueue;
181    }
182
183    /**
184     * Gets the timeout duration.
185     *
186     * @return the timeout duration.
187     */
188    Duration getTimeout() {
189        return Duration.ofNanos(timeoutNanos);
190    }
191
192    /**
193     * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
194     *
195     * @return QueueOutputStream connected to this stream.
196     */
197    public QueueOutputStream newQueueOutputStream() {
198        return new QueueOutputStream(blockingQueue);
199    }
200
201    /**
202     * Reads and returns a single byte.
203     *
204     * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
205     * @throws IllegalStateException if thread is interrupted while waiting.
206     */
207    @Override
208    public int read() {
209        try {
210            final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
211            return value == null ? EOF : 0xFF & value;
212        } catch (final InterruptedException e) {
213            Thread.currentThread().interrupt();
214            // throw runtime unchecked exception to maintain signature backward-compatibility of
215            // this read method, which does not declare IOException
216            throw new IllegalStateException(e);
217        }
218    }
219
220}