001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.commons.io.input; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InterruptedIOException; 023import java.time.Duration; 024import java.time.temporal.ChronoUnit; 025import java.util.concurrent.TimeUnit; 026 027import org.apache.commons.io.build.AbstractStreamBuilder; 028 029/** 030 * Provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream instance. The throttling works by 031 * examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time interval if the byte-transfer is found 032 * exceed the specified tolerable maximum. (Thus, while the read-rate might exceed the maximum for a short interval, the average tends towards the 033 * specified maximum, overall.) 034 * <p> 035 * To build an instance, see {@link Builder} 036 * </p> 037 * <p> 038 * Inspired by Apache HBase's class of the same name. 039 * </p> 040 * 041 * @see Builder 042 * @since 2.16.0 043 */ 044public final class ThrottledInputStream extends CountingInputStream { 045 046 // @formatter:off 047 /** 048 * Builds a new {@link ThrottledInputStream}. 049 * 050 * <h2>Using NIO</h2> 051 * <pre>{@code 052 * ThrottledInputStream in = ThrottledInputStream.builder() 053 * .setPath(Paths.get("MyFile.xml")) 054 * .setMaxBytesPerSecond(100_000) 055 * .get(); 056 * } 057 * </pre> 058 * <h2>Using IO</h2> 059 * <pre>{@code 060 * ThrottledInputStream in = ThrottledInputStream.builder() 061 * .setFile(new File("MyFile.xml")) 062 * .setMaxBytesPerSecond(100_000) 063 * .get(); 064 * } 065 * </pre> 066 * <pre>{@code 067 * ThrottledInputStream in = ThrottledInputStream.builder() 068 * .setInputStream(inputStream) 069 * .setMaxBytesPerSecond(100_000) 070 * .get(); 071 * } 072 * </pre> 073 * 074 * @see #get() 075 */ 076 // @formatter:on 077 public static class Builder extends AbstractStreamBuilder<ThrottledInputStream, Builder> { 078 079 /** 080 * Effectively not throttled. 081 */ 082 private long maxBytesPerSecond = Long.MAX_VALUE; 083 084 /** 085 * Builds a new {@link ThrottledInputStream}. 086 * <p> 087 * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception. 088 * </p> 089 * <p> 090 * This builder use the following aspects: 091 * </p> 092 * <ul> 093 * <li>{@link #getInputStream()}</li> 094 * <li>maxBytesPerSecond</li> 095 * </ul> 096 * 097 * @return a new instance. 098 * @throws IllegalStateException if the {@code origin} is {@code null}. 099 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 100 * @throws IOException if an I/O error occurs. 101 * @see #getInputStream() 102 */ 103 @SuppressWarnings("resource") 104 @Override 105 public ThrottledInputStream get() throws IOException { 106 return new ThrottledInputStream(getInputStream(), maxBytesPerSecond); 107 } 108 109 /** 110 * Sets the maximum bytes per second. 111 * 112 * @param maxBytesPerSecond the maximum bytes per second. 113 */ 114 public void setMaxBytesPerSecond(final long maxBytesPerSecond) { 115 this.maxBytesPerSecond = maxBytesPerSecond; 116 } 117 118 } 119 120 /** 121 * Constructs a new {@link Builder}. 122 * 123 * @return a new {@link Builder}. 124 */ 125 public static Builder builder() { 126 return new Builder(); 127 } 128 129 static long toSleepMillis(final long bytesRead, final long maxBytesPerSec, final long elapsedMillis) { 130 if (elapsedMillis < 0) { 131 throw new IllegalArgumentException("The elapsed time should be greater or equal to zero"); 132 } 133 if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) { 134 return 0; 135 } 136 // We use this class to load the single source file, so the bytesRead 137 // and maxBytesPerSec aren't greater than Double.MAX_VALUE. 138 // We can get the precise sleep time by using the double value. 139 final long millis = (long) ((double) bytesRead / (double) maxBytesPerSec * 1000 - elapsedMillis); 140 if (millis <= 0) { 141 return 0; 142 } 143 return millis; 144 } 145 146 private final long maxBytesPerSecond; 147 private final long startTime = System.currentTimeMillis(); 148 private Duration totalSleepDuration = Duration.ZERO; 149 150 private ThrottledInputStream(final InputStream proxy, final long maxBytesPerSecond) { 151 super(proxy); 152 if (maxBytesPerSecond <= 0) { 153 throw new IllegalArgumentException("Bandwidth " + maxBytesPerSecond + " is invalid."); 154 } 155 this.maxBytesPerSecond = maxBytesPerSecond; 156 } 157 158 @Override 159 protected void beforeRead(final int n) throws IOException { 160 throttle(); 161 } 162 163 /** 164 * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart. 165 * 166 * @return Read rate, in bytes/sec. 167 */ 168 private long getBytesPerSecond() { 169 final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000; 170 if (elapsedSeconds == 0) { 171 return getByteCount(); 172 } 173 return getByteCount() / elapsedSeconds; 174 } 175 176 private long getSleepMillis() { 177 return toSleepMillis(getByteCount(), maxBytesPerSecond, System.currentTimeMillis() - startTime); 178 } 179 180 /** 181 * Gets the total duration spent in sleep. 182 * 183 * @return Duration spent in sleep. 184 */ 185 Duration getTotalSleepDuration() { 186 return totalSleepDuration; 187 } 188 189 private void throttle() throws InterruptedIOException { 190 final long sleepMillis = getSleepMillis(); 191 if (sleepMillis > 0) { 192 totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS); 193 try { 194 TimeUnit.MILLISECONDS.sleep(sleepMillis); 195 } catch (final InterruptedException e) { 196 throw new InterruptedIOException("Thread aborted"); 197 } 198 } 199 } 200 201 /** {@inheritDoc} */ 202 @Override 203 public String toString() { 204 return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond() 205 + ", totalSleepDuration=" + totalSleepDuration + ']'; 206 } 207}