001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.commons.io.input;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InterruptedIOException;
023import java.time.Duration;
024import java.time.temporal.ChronoUnit;
025import java.util.Objects;
026import java.util.concurrent.TimeUnit;
027
028/**
029 * Provides bandwidth throttling on an InputStream as a filter input stream. The throttling examines the number of bytes read from the underlying InputStream,
030 * and sleeps for a time interval if the byte-transfer is found to exceed the specified maximum rate. Thus, while the read-rate might exceed the maximum for a
031 * short interval, the average tends towards the specified maximum, overall.
032 * <p>
033 * To build an instance, call {@link #builder()}.
034 * </p>
035 * <p>
036 * Inspired by Apache HBase's class of the same name.
037 * </p>
038 *
039 * @see Builder
040 * @since 2.16.0
041 */
042public final class ThrottledInputStream extends CountingInputStream {
043
044    // @formatter:off
045    /**
046     * Builds a new {@link ThrottledInputStream}.
047     *
048     * <h2>Using NIO</h2>
049     * <pre>{@code
050     * ThrottledInputStream in = ThrottledInputStream.builder()
051     *   .setPath(Paths.get("MyFile.xml"))
052     *   .setMaxBytes(100_000, ChronoUnit.SECONDS)
053     *   .get();
054     * }
055     * </pre>
056     * <h2>Using IO</h2>
057     * <pre>{@code
058     * ThrottledInputStream in = ThrottledInputStream.builder()
059     *   .setFile(new File("MyFile.xml"))
060     *   .setMaxBytes(100_000, ChronoUnit.SECONDS)
061     *   .get();
062     * }
063     * </pre>
064     * <pre>{@code
065     * ThrottledInputStream in = ThrottledInputStream.builder()
066     *   .setInputStream(inputStream)
067     *   .setMaxBytes(100_000, ChronoUnit.SECONDS)
068     *   .get();
069     * }
070     * </pre>
071     *
072     * @see #get()
073     */
074    // @formatter:on
075    public static class Builder extends AbstractBuilder<ThrottledInputStream, Builder> {
076
077        /**
078         * Effectively not throttled.
079         */
080        private double maxBytesPerSecond = Double.MAX_VALUE;
081
082        /**
083         * Constructs a new builder of {@link ThrottledInputStream}.
084         */
085        public Builder() {
086            // empty
087        }
088
089        /**
090         * Builds a new {@link ThrottledInputStream}.
091         * <p>
092         * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
093         * </p>
094         * <p>
095         * This builder uses the following aspects:
096         * </p>
097         * <ul>
098         * <li>{@link #getInputStream()} gets the target aspect.</li>
099         * <li>maxBytesPerSecond</li>
100         * </ul>
101         *
102         * @return a new instance.
103         * @throws IllegalStateException         if the {@code origin} is {@code null}.
104         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
105         * @throws IOException                   if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
106         * @see #getInputStream()
107         * @see #getUnchecked()
108         */
109        @Override
110        public ThrottledInputStream get() throws IOException {
111            return new ThrottledInputStream(this);
112        }
113
114        // package private for testing.
115        double getMaxBytesPerSecond() {
116            return maxBytesPerSecond;
117        }
118
119        /**
120         * Sets the maximum bytes per time period unit.
121         * <p>
122         * For example, to throttle reading to 100K per second, use:
123         * </p>
124         * <pre>
125         * builder.setMaxBytes(100_000, ChronoUnit.SECONDS)
126         * </pre>
127         * <p>
128         * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on.
129         * </p>
130         *
131         * @param value the maximum bytes
132         * @param chronoUnit a duration scale goal.
133         * @return this instance.
134         * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
135         * @since 2.19.0
136         */
137        public Builder setMaxBytes(final long value, final ChronoUnit chronoUnit) {
138            setMaxBytes(value, chronoUnit.getDuration());
139            return asThis();
140        }
141
142        /**
143         * Sets the maximum bytes per duration.
144         * <p>
145         * For example, to throttle reading to 100K per second, use:
146         * </p>
147         * <pre>
148         * builder.setMaxBytes(100_000, Duration.ofSeconds(1))
149         * </pre>
150         * <p>
151         * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on.
152         * </p>
153         *
154         * @param value the maximum bytes
155         * @param duration a duration goal.
156         * @return this instance.
157         * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
158         */
159        // Consider making public in the future
160        Builder setMaxBytes(final long value, final Duration duration) {
161            setMaxBytesPerSecond((double) Objects.requireNonNull(duration, "duration").toMillis() / 1_000 * value);
162            return asThis();
163        }
164
165        /**
166         * Sets the maximum bytes per second.
167         *
168         * @param maxBytesPerSecond the maximum bytes per second.
169         * @return this instance.
170         * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
171         */
172        private Builder setMaxBytesPerSecond(final double maxBytesPerSecond) {
173            if (maxBytesPerSecond <= 0) {
174                throw new IllegalArgumentException("Bandwidth " + maxBytesPerSecond + " must be > 0.");
175            }
176            this.maxBytesPerSecond = maxBytesPerSecond;
177            return asThis();
178        }
179
180        /**
181         * Sets the maximum bytes per second.
182         *
183         * @param maxBytesPerSecond the maximum bytes per second.
184         * @throws IllegalArgumentException Thrown if maxBytesPerSecond &lt;= 0.
185         */
186        public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
187            setMaxBytesPerSecond((double) maxBytesPerSecond);
188            // TODO 3.0
189            // return asThis();
190        }
191
192    }
193
194    /**
195     * Constructs a new {@link Builder}.
196     *
197     * @return a new {@link Builder}.
198     */
199    public static Builder builder() {
200        return new Builder();
201    }
202
203    // package private for testing
204    static long toSleepMillis(final long bytesRead, final long elapsedMillis, final double maxBytesPerSec) {
205        if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
206            return 0;
207        }
208        // We use this class to load the single source file, so the bytesRead
209        // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
210        // We can get the precise sleep time by using the double value.
211        final long millis = (long) (bytesRead / maxBytesPerSec * 1000 - elapsedMillis);
212        if (millis <= 0) {
213            return 0;
214        }
215        return millis;
216    }
217
218    private final double maxBytesPerSecond;
219    private final long startTime = System.currentTimeMillis();
220    private Duration totalSleepDuration = Duration.ZERO;
221
222    private ThrottledInputStream(final Builder builder) throws IOException {
223        super(builder);
224        if (builder.maxBytesPerSecond <= 0) {
225            throw new IllegalArgumentException("Bandwidth " + builder.maxBytesPerSecond + " is invalid.");
226        }
227        this.maxBytesPerSecond = builder.maxBytesPerSecond;
228    }
229
230    @Override
231    protected void beforeRead(final int n) throws IOException {
232        throttle();
233    }
234
235    /**
236     * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
237     *
238     * @return Read rate, in bytes/sec.
239     */
240    private long getBytesPerSecond() {
241        final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
242        if (elapsedSeconds == 0) {
243            return getByteCount();
244        }
245        return getByteCount() / elapsedSeconds;
246    }
247
248    // package private for testing.
249    double getMaxBytesPerSecond() {
250        return maxBytesPerSecond;
251    }
252
253    private long getSleepMillis() {
254        return toSleepMillis(getByteCount(), System.currentTimeMillis() - startTime, maxBytesPerSecond);
255    }
256
257    /**
258     * Gets the total duration spent in sleep.
259     *
260     * @return Duration spent in sleep.
261     */
262    // package private for testing
263    Duration getTotalSleepDuration() {
264        return totalSleepDuration;
265    }
266
267    private void throttle() throws InterruptedIOException {
268        final long sleepMillis = getSleepMillis();
269        if (sleepMillis > 0) {
270            totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
271            try {
272                TimeUnit.MILLISECONDS.sleep(sleepMillis);
273            } catch (final InterruptedException e) {
274                throw new InterruptedIOException("Thread aborted");
275            }
276        }
277    }
278
279    /** {@inheritDoc} */
280    @Override
281    public String toString() {
282        return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
283                + ", totalSleepDuration=" + totalSleepDuration + ']';
284    }
285}