001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PipedInputStream; 024import java.io.PipedOutputStream; 025import java.time.Duration; 026import java.util.Objects; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.TimeUnit; 030 031import org.apache.commons.io.build.AbstractStreamBuilder; 032import org.apache.commons.io.output.QueueOutputStream; 033 034/** 035 * Simple alternative to JDK {@link PipedInputStream}; queue input stream provides what's written in queue output stream. 036 * <p> 037 * To build an instance, use {@link Builder}. 038 * </p> 039 * <p> 040 * Example usage: 041 * </p> 042 * <pre> 043 * QueueInputStream inputStream = new QueueInputStream(); 044 * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); 045 * 046 * outputStream.write("hello world".getBytes(UTF_8)); 047 * inputStream.read(); 048 * </pre> 049 * <p> 050 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads. 051 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited. 052 * </p> 053 * <p> 054 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an 055 * {@link IOException}. 056 * </p> 057 * 058 * @see Builder 059 * @see QueueOutputStream 060 * @since 2.9.0 061 */ 062public class QueueInputStream extends InputStream { 063 064 // @formatter:off 065 /** 066 * Builds a new {@link QueueInputStream}. 067 * 068 * <p> 069 * For example: 070 * </p> 071 * <pre>{@code 072 * QueueInputStream s = QueueInputStream.builder() 073 * .setBlockingQueue(new LinkedBlockingQueue<>()) 074 * .setTimeout(Duration.ZERO) 075 * .get();} 076 * </pre> 077 * 078 * @see #get() 079 * @since 2.12.0 080 */ 081 // @formatter:on 082 public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> { 083 084 private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); 085 private Duration timeout = Duration.ZERO; 086 087 /** 088 * Constructs a new builder of {@link QueueInputStream}. 089 */ 090 public Builder() { 091 // empty 092 } 093 094 /** 095 * Builds a new {@link QueueInputStream}. 096 * <p> 097 * This builder uses the following aspects: 098 * </p> 099 * <ul> 100 * <li>{@link #setBlockingQueue(BlockingQueue)}</li> 101 * <li>timeout</li> 102 * </ul> 103 * 104 * @return a new instance. 105 * @see #getUnchecked() 106 */ 107 @Override 108 public QueueInputStream get() { 109 return new QueueInputStream(this); 110 } 111 112 /** 113 * Sets backing queue for the stream. 114 * 115 * @param blockingQueue backing queue for the stream, null resets to a new blocking queue instance. 116 * @return {@code this} instance. 117 */ 118 public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) { 119 this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>(); 120 return this; 121 } 122 123 /** 124 * Sets the polling timeout. 125 * 126 * @param timeout the polling timeout. 127 * @return {@code this} instance. 128 */ 129 public Builder setTimeout(final Duration timeout) { 130 if (timeout != null && timeout.toNanos() < 0) { 131 throw new IllegalArgumentException("timeout must not be negative"); 132 } 133 this.timeout = timeout != null ? timeout : Duration.ZERO; 134 return this; 135 } 136 137 } 138 139 /** 140 * Constructs a new {@link Builder}. 141 * 142 * @return a new {@link Builder}. 143 * @since 2.12.0 144 */ 145 public static Builder builder() { 146 return new Builder(); 147 } 148 149 private final BlockingQueue<Integer> blockingQueue; 150 151 private final long timeoutNanos; 152 153 /** 154 * Constructs a new instance with no limit to its internal queue size and zero timeout. 155 */ 156 public QueueInputStream() { 157 this(new LinkedBlockingQueue<>()); 158 } 159 160 /** 161 * Constructs a new instance with given queue and zero timeout. 162 * 163 * @param blockingQueue backing queue for the stream, null maps to a new blocking queue instance. 164 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}. 165 */ 166 @Deprecated 167 public QueueInputStream(final BlockingQueue<Integer> blockingQueue) { 168 this(builder().setBlockingQueue(blockingQueue)); 169 } 170 171 /** 172 * Constructs a new instance. 173 * 174 * @param builder The builder. 175 */ 176 private QueueInputStream(final Builder builder) { 177 this.blockingQueue = Objects.requireNonNull(builder.blockingQueue, "blockingQueue"); 178 this.timeoutNanos = Objects.requireNonNull(builder.timeout, "timeout").toNanos(); 179 } 180 181 /** 182 * Gets the blocking queue. 183 * 184 * @return the blocking queue. 185 */ 186 BlockingQueue<Integer> getBlockingQueue() { 187 return blockingQueue; 188 } 189 190 /** 191 * Gets the timeout duration. 192 * 193 * @return the timeout duration. 194 */ 195 Duration getTimeout() { 196 return Duration.ofNanos(timeoutNanos); 197 } 198 199 /** 200 * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream. 201 * 202 * @return QueueOutputStream connected to this stream. 203 */ 204 public QueueOutputStream newQueueOutputStream() { 205 return new QueueOutputStream(blockingQueue); 206 } 207 208 /** 209 * Reads and returns a single byte. 210 * 211 * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available. 212 * @throws IllegalStateException if thread is interrupted while waiting. 213 */ 214 @Override 215 public int read() { 216 try { 217 final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS); 218 return value == null ? EOF : 0xFF & value; 219 } catch (final InterruptedException e) { 220 Thread.currentThread().interrupt(); 221 // throw runtime unchecked exception to maintain signature backward-compatibility of 222 // this read method, which does not declare IOException 223 throw new IllegalStateException(e); 224 } 225 } 226 227}