001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PipedInputStream; 024import java.io.PipedOutputStream; 025import java.time.Duration; 026import java.util.Objects; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.TimeUnit; 030 031import org.apache.commons.io.build.AbstractStreamBuilder; 032import org.apache.commons.io.output.QueueOutputStream; 033 034/** 035 * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream. 036 * <p> 037 * To build an instance, use {@link Builder}. 038 * </p> 039 * <p> 040 * Example usage: 041 * </p> 042 * <pre> 043 * QueueInputStream inputStream = new QueueInputStream(); 044 * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); 045 * 046 * outputStream.write("hello world".getBytes(UTF_8)); 047 * inputStream.read(); 048 * </pre> 049 * <p> 050 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads. 051 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited. 052 * </p> 053 * <p> 054 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an 055 * {@link IOException}. 056 * </p> 057 * 058 * @see Builder 059 * @see QueueOutputStream 060 * @since 2.9.0 061 */ 062public class QueueInputStream extends InputStream { 063 064 // @formatter:off 065 /** 066 * Builds a new {@link QueueInputStream}. 067 * 068 * <p> 069 * For example: 070 * </p> 071 * <pre>{@code 072 * QueueInputStream s = QueueInputStream.builder() 073 * .setBlockingQueue(new LinkedBlockingQueue<>()) 074 * .setTimeout(Duration.ZERO) 075 * .get();} 076 * </pre> 077 * 078 * @see #get() 079 * @since 2.12.0 080 */ 081 // @formatter:on 082 public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> { 083 084 private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); 085 private Duration timeout = Duration.ZERO; 086 087 /** 088 * Builds a new {@link QueueInputStream}. 089 * <p> 090 * This builder use the following aspects: 091 * </p> 092 * <ul> 093 * <li>{@link #setBlockingQueue(BlockingQueue)}</li> 094 * <li>timeout</li> 095 * </ul> 096 * 097 * @return a new instance. 098 */ 099 @Override 100 public QueueInputStream get() { 101 return new QueueInputStream(blockingQueue, timeout); 102 } 103 104 /** 105 * Sets backing queue for the stream. 106 * 107 * @param blockingQueue backing queue for the stream. 108 * @return {@code this} instance. 109 */ 110 public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) { 111 this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>(); 112 return this; 113 } 114 115 /** 116 * Sets the polling timeout. 117 * 118 * @param timeout the polling timeout. 119 * @return {@code this} instance. 120 */ 121 public Builder setTimeout(final Duration timeout) { 122 if (timeout != null && timeout.toNanos() < 0) { 123 throw new IllegalArgumentException("timeout must not be negative"); 124 } 125 this.timeout = timeout != null ? timeout : Duration.ZERO; 126 return this; 127 } 128 129 } 130 131 /** 132 * Constructs a new {@link Builder}. 133 * 134 * @return a new {@link Builder}. 135 * @since 2.12.0 136 */ 137 public static Builder builder() { 138 return new Builder(); 139 } 140 141 private final BlockingQueue<Integer> blockingQueue; 142 143 private final long timeoutNanos; 144 145 /** 146 * Constructs a new instance with no limit to its internal queue size and zero timeout. 147 */ 148 public QueueInputStream() { 149 this(new LinkedBlockingQueue<>()); 150 } 151 152 /** 153 * Constructs a new instance with given queue and zero timeout. 154 * 155 * @param blockingQueue backing queue for the stream. 156 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}. 157 */ 158 @Deprecated 159 public QueueInputStream(final BlockingQueue<Integer> blockingQueue) { 160 this(blockingQueue, Duration.ZERO); 161 } 162 163 /** 164 * Constructs a new instance with given queue and timeout. 165 * 166 * @param blockingQueue backing queue for the stream. 167 * @param timeout how long to wait before giving up when polling the queue. 168 */ 169 private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout) { 170 this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue"); 171 this.timeoutNanos = Objects.requireNonNull(timeout, "timeout").toNanos(); 172 } 173 174 /** 175 * Gets the blocking queue. 176 * 177 * @return the blocking queue. 178 */ 179 BlockingQueue<Integer> getBlockingQueue() { 180 return blockingQueue; 181 } 182 183 /** 184 * Gets the timeout duration. 185 * 186 * @return the timeout duration. 187 */ 188 Duration getTimeout() { 189 return Duration.ofNanos(timeoutNanos); 190 } 191 192 /** 193 * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream. 194 * 195 * @return QueueOutputStream connected to this stream. 196 */ 197 public QueueOutputStream newQueueOutputStream() { 198 return new QueueOutputStream(blockingQueue); 199 } 200 201 /** 202 * Reads and returns a single byte. 203 * 204 * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available. 205 * @throws IllegalStateException if thread is interrupted while waiting. 206 */ 207 @Override 208 public int read() { 209 try { 210 final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS); 211 return value == null ? EOF : 0xFF & value; 212 } catch (final InterruptedException e) { 213 Thread.currentThread().interrupt(); 214 // throw runtime unchecked exception to maintain signature backward-compatibility of 215 // this read method, which does not declare IOException 216 throw new IllegalStateException(e); 217 } 218 } 219 220}