001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.commons.io.input; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InterruptedIOException; 023import java.time.Duration; 024import java.time.temporal.ChronoUnit; 025import java.util.concurrent.TimeUnit; 026 027/** 028 * Provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream instance. The throttling works by 029 * examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time interval if the byte-transfer is found 030 * exceed the specified tolerable maximum. (Thus, while the read-rate might exceed the maximum for a short interval, the average tends towards the 031 * specified maximum, overall.) 032 * <p> 033 * To build an instance, see {@link Builder} 034 * </p> 035 * <p> 036 * Inspired by Apache HBase's class of the same name. 037 * </p> 038 * 039 * @see Builder 040 * @since 2.16.0 041 */ 042public final class ThrottledInputStream extends CountingInputStream { 043 044 // @formatter:off 045 /** 046 * Builds a new {@link ThrottledInputStream}. 047 * 048 * <h2>Using NIO</h2> 049 * <pre>{@code 050 * ThrottledInputStream in = ThrottledInputStream.builder() 051 * .setPath(Paths.get("MyFile.xml")) 052 * .setMaxBytesPerSecond(100_000) 053 * .get(); 054 * } 055 * </pre> 056 * <h2>Using IO</h2> 057 * <pre>{@code 058 * ThrottledInputStream in = ThrottledInputStream.builder() 059 * .setFile(new File("MyFile.xml")) 060 * .setMaxBytesPerSecond(100_000) 061 * .get(); 062 * } 063 * </pre> 064 * <pre>{@code 065 * ThrottledInputStream in = ThrottledInputStream.builder() 066 * .setInputStream(inputStream) 067 * .setMaxBytesPerSecond(100_000) 068 * .get(); 069 * } 070 * </pre> 071 * 072 * @see #get() 073 */ 074 // @formatter:on 075 public static class Builder extends AbstractBuilder<ThrottledInputStream, Builder> { 076 077 /** 078 * Effectively not throttled. 079 */ 080 private long maxBytesPerSecond = Long.MAX_VALUE; 081 082 /** 083 * Builds a new {@link ThrottledInputStream}. 084 * <p> 085 * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception. 086 * </p> 087 * <p> 088 * This builder use the following aspects: 089 * </p> 090 * <ul> 091 * <li>{@link #getInputStream()}</li> 092 * <li>maxBytesPerSecond</li> 093 * </ul> 094 * 095 * @return a new instance. 096 * @throws IllegalStateException if the {@code origin} is {@code null}. 097 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 098 * @throws IOException if an I/O error occurs. 099 * @see #getInputStream() 100 */ 101 @SuppressWarnings("resource") 102 @Override 103 public ThrottledInputStream get() throws IOException { 104 return new ThrottledInputStream(this); 105 } 106 107 /** 108 * Sets the maximum bytes per second. 109 * 110 * @param maxBytesPerSecond the maximum bytes per second. 111 */ 112 public void setMaxBytesPerSecond(final long maxBytesPerSecond) { 113 this.maxBytesPerSecond = maxBytesPerSecond; 114 } 115 116 } 117 118 /** 119 * Constructs a new {@link Builder}. 120 * 121 * @return a new {@link Builder}. 122 */ 123 public static Builder builder() { 124 return new Builder(); 125 } 126 127 static long toSleepMillis(final long bytesRead, final long maxBytesPerSec, final long elapsedMillis) { 128 if (elapsedMillis < 0) { 129 throw new IllegalArgumentException("The elapsed time should be greater or equal to zero"); 130 } 131 if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) { 132 return 0; 133 } 134 // We use this class to load the single source file, so the bytesRead 135 // and maxBytesPerSec aren't greater than Double.MAX_VALUE. 136 // We can get the precise sleep time by using the double value. 137 final long millis = (long) ((double) bytesRead / (double) maxBytesPerSec * 1000 - elapsedMillis); 138 if (millis <= 0) { 139 return 0; 140 } 141 return millis; 142 } 143 144 private final long maxBytesPerSecond; 145 private final long startTime = System.currentTimeMillis(); 146 private Duration totalSleepDuration = Duration.ZERO; 147 148 private ThrottledInputStream(final Builder builder) throws IOException { 149 super(builder); 150 if (builder.maxBytesPerSecond <= 0) { 151 throw new IllegalArgumentException("Bandwidth " + builder.maxBytesPerSecond + " is invalid."); 152 } 153 this.maxBytesPerSecond = builder.maxBytesPerSecond; 154 } 155 156 @Override 157 protected void beforeRead(final int n) throws IOException { 158 throttle(); 159 } 160 161 /** 162 * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart. 163 * 164 * @return Read rate, in bytes/sec. 165 */ 166 private long getBytesPerSecond() { 167 final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000; 168 if (elapsedSeconds == 0) { 169 return getByteCount(); 170 } 171 return getByteCount() / elapsedSeconds; 172 } 173 174 private long getSleepMillis() { 175 return toSleepMillis(getByteCount(), maxBytesPerSecond, System.currentTimeMillis() - startTime); 176 } 177 178 /** 179 * Gets the total duration spent in sleep. 180 * 181 * @return Duration spent in sleep. 182 */ 183 Duration getTotalSleepDuration() { 184 return totalSleepDuration; 185 } 186 187 private void throttle() throws InterruptedIOException { 188 final long sleepMillis = getSleepMillis(); 189 if (sleepMillis > 0) { 190 totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS); 191 try { 192 TimeUnit.MILLISECONDS.sleep(sleepMillis); 193 } catch (final InterruptedException e) { 194 throw new InterruptedIOException("Thread aborted"); 195 } 196 } 197 } 198 199 /** {@inheritDoc} */ 200 @Override 201 public String toString() { 202 return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond() 203 + ", totalSleepDuration=" + totalSleepDuration + ']'; 204 } 205}