001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.commons.io.input; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InterruptedIOException; 023import java.time.Duration; 024import java.time.temporal.ChronoUnit; 025import java.util.Objects; 026import java.util.concurrent.TimeUnit; 027 028/** 029 * Provides bandwidth throttling on an InputStream as a filter input stream. The throttling examines the number of bytes read from the underlying InputStream, 030 * and sleeps for a time interval if the byte-transfer is found to exceed the specified maximum rate. Thus, while the read-rate might exceed the maximum for a 031 * short interval, the average tends towards the specified maximum, overall. 032 * <p> 033 * To build an instance, call {@link #builder()}. 034 * </p> 035 * <p> 036 * Inspired by Apache HBase's class of the same name. 037 * </p> 038 * 039 * @see Builder 040 * @since 2.16.0 041 */ 042public final class ThrottledInputStream extends CountingInputStream { 043 044 // @formatter:off 045 /** 046 * Builds a new {@link ThrottledInputStream}. 047 * 048 * <h2>Using NIO</h2> 049 * <pre>{@code 050 * ThrottledInputStream in = ThrottledInputStream.builder() 051 * .setPath(Paths.get("MyFile.xml")) 052 * .setMaxBytes(100_000, ChronoUnit.SECONDS) 053 * .get(); 054 * } 055 * </pre> 056 * <h2>Using IO</h2> 057 * <pre>{@code 058 * ThrottledInputStream in = ThrottledInputStream.builder() 059 * .setFile(new File("MyFile.xml")) 060 * .setMaxBytes(100_000, ChronoUnit.SECONDS) 061 * .get(); 062 * } 063 * </pre> 064 * <pre>{@code 065 * ThrottledInputStream in = ThrottledInputStream.builder() 066 * .setInputStream(inputStream) 067 * .setMaxBytes(100_000, ChronoUnit.SECONDS) 068 * .get(); 069 * } 070 * </pre> 071 * 072 * @see #get() 073 */ 074 // @formatter:on 075 public static class Builder extends AbstractBuilder<ThrottledInputStream, Builder> { 076 077 /** 078 * Effectively not throttled. 079 */ 080 private double maxBytesPerSecond = Double.MAX_VALUE; 081 082 /** 083 * Constructs a new builder of {@link ThrottledInputStream}. 084 */ 085 public Builder() { 086 // empty 087 } 088 089 /** 090 * Builds a new {@link ThrottledInputStream}. 091 * <p> 092 * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception. 093 * </p> 094 * <p> 095 * This builder uses the following aspects: 096 * </p> 097 * <ul> 098 * <li>{@link #getInputStream()} gets the target aspect.</li> 099 * <li>maxBytesPerSecond</li> 100 * </ul> 101 * 102 * @return a new instance. 103 * @throws IllegalStateException if the {@code origin} is {@code null}. 104 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 105 * @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}. 106 * @see #getInputStream() 107 * @see #getUnchecked() 108 */ 109 @Override 110 public ThrottledInputStream get() throws IOException { 111 return new ThrottledInputStream(this); 112 } 113 114 // package private for testing. 115 double getMaxBytesPerSecond() { 116 return maxBytesPerSecond; 117 } 118 119 /** 120 * Sets the maximum bytes per time period unit. 121 * <p> 122 * For example, to throttle reading to 100K per second, use: 123 * </p> 124 * <pre> 125 * builder.setMaxBytes(100_000, ChronoUnit.SECONDS) 126 * </pre> 127 * <p> 128 * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on. 129 * </p> 130 * 131 * @param value the maximum bytes 132 * @param chronoUnit a duration scale goal. 133 * @return this instance. 134 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. 135 * @since 2.19.0 136 */ 137 public Builder setMaxBytes(final long value, final ChronoUnit chronoUnit) { 138 setMaxBytes(value, chronoUnit.getDuration()); 139 return asThis(); 140 } 141 142 /** 143 * Sets the maximum bytes per duration. 144 * <p> 145 * For example, to throttle reading to 100K per second, use: 146 * </p> 147 * <pre> 148 * builder.setMaxBytes(100_000, Duration.ofSeconds(1)) 149 * </pre> 150 * <p> 151 * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on. 152 * </p> 153 * 154 * @param value the maximum bytes 155 * @param duration a duration goal. 156 * @return this instance. 157 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. 158 */ 159 // Consider making public in the future 160 Builder setMaxBytes(final long value, final Duration duration) { 161 setMaxBytesPerSecond((double) Objects.requireNonNull(duration, "duration").toMillis() / 1_000 * value); 162 return asThis(); 163 } 164 165 /** 166 * Sets the maximum bytes per second. 167 * 168 * @param maxBytesPerSecond the maximum bytes per second. 169 * @return this instance. 170 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. 171 */ 172 private Builder setMaxBytesPerSecond(final double maxBytesPerSecond) { 173 if (maxBytesPerSecond <= 0) { 174 throw new IllegalArgumentException("Bandwidth " + maxBytesPerSecond + " must be > 0."); 175 } 176 this.maxBytesPerSecond = maxBytesPerSecond; 177 return asThis(); 178 } 179 180 /** 181 * Sets the maximum bytes per second. 182 * 183 * @param maxBytesPerSecond the maximum bytes per second. 184 * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. 185 */ 186 public void setMaxBytesPerSecond(final long maxBytesPerSecond) { 187 setMaxBytesPerSecond((double) maxBytesPerSecond); 188 // TODO 3.0 189 // return asThis(); 190 } 191 192 } 193 194 /** 195 * Constructs a new {@link Builder}. 196 * 197 * @return a new {@link Builder}. 198 */ 199 public static Builder builder() { 200 return new Builder(); 201 } 202 203 // package private for testing 204 static long toSleepMillis(final long bytesRead, final long elapsedMillis, final double maxBytesPerSec) { 205 if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) { 206 return 0; 207 } 208 // We use this class to load the single source file, so the bytesRead 209 // and maxBytesPerSec aren't greater than Double.MAX_VALUE. 210 // We can get the precise sleep time by using the double value. 211 final long millis = (long) (bytesRead / maxBytesPerSec * 1000 - elapsedMillis); 212 if (millis <= 0) { 213 return 0; 214 } 215 return millis; 216 } 217 218 private final double maxBytesPerSecond; 219 private final long startTime = System.currentTimeMillis(); 220 private Duration totalSleepDuration = Duration.ZERO; 221 222 private ThrottledInputStream(final Builder builder) throws IOException { 223 super(builder); 224 if (builder.maxBytesPerSecond <= 0) { 225 throw new IllegalArgumentException("Bandwidth " + builder.maxBytesPerSecond + " is invalid."); 226 } 227 this.maxBytesPerSecond = builder.maxBytesPerSecond; 228 } 229 230 @Override 231 protected void beforeRead(final int n) throws IOException { 232 throttle(); 233 } 234 235 /** 236 * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart. 237 * 238 * @return Read rate, in bytes/sec. 239 */ 240 private long getBytesPerSecond() { 241 final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000; 242 if (elapsedSeconds == 0) { 243 return getByteCount(); 244 } 245 return getByteCount() / elapsedSeconds; 246 } 247 248 // package private for testing. 249 double getMaxBytesPerSecond() { 250 return maxBytesPerSecond; 251 } 252 253 private long getSleepMillis() { 254 return toSleepMillis(getByteCount(), System.currentTimeMillis() - startTime, maxBytesPerSecond); 255 } 256 257 /** 258 * Gets the total duration spent in sleep. 259 * 260 * @return Duration spent in sleep. 261 */ 262 // package private for testing 263 Duration getTotalSleepDuration() { 264 return totalSleepDuration; 265 } 266 267 private void throttle() throws InterruptedIOException { 268 final long sleepMillis = getSleepMillis(); 269 if (sleepMillis > 0) { 270 totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS); 271 try { 272 TimeUnit.MILLISECONDS.sleep(sleepMillis); 273 } catch (final InterruptedException e) { 274 throw new InterruptedIOException("Thread aborted"); 275 } 276 } 277 } 278 279 /** {@inheritDoc} */ 280 @Override 281 public String toString() { 282 return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond() 283 + ", totalSleepDuration=" + totalSleepDuration + ']'; 284 } 285}