001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.commons.io.input;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InterruptedIOException;
023import java.time.Duration;
024import java.time.temporal.ChronoUnit;
025import java.util.concurrent.TimeUnit;
026
027/**
028 * Provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream instance. The throttling works by
029 * examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time interval if the byte-transfer is found
030 * exceed the specified tolerable maximum. (Thus, while the read-rate might exceed the maximum for a short interval, the average tends towards the
031 * specified maximum, overall.)
032 * <p>
033 * To build an instance, see {@link Builder}
034 * </p>
035 * <p>
036 * Inspired by Apache HBase's class of the same name.
037 * </p>
038 *
039 * @see Builder
040 * @since 2.16.0
041 */
042public final class ThrottledInputStream extends CountingInputStream {
043
044    // @formatter:off
045    /**
046     * Builds a new {@link ThrottledInputStream}.
047     *
048     * <h2>Using NIO</h2>
049     * <pre>{@code
050     * ThrottledInputStream in = ThrottledInputStream.builder()
051     *   .setPath(Paths.get("MyFile.xml"))
052     *   .setMaxBytesPerSecond(100_000)
053     *   .get();
054     * }
055     * </pre>
056     * <h2>Using IO</h2>
057     * <pre>{@code
058     * ThrottledInputStream in = ThrottledInputStream.builder()
059     *   .setFile(new File("MyFile.xml"))
060     *   .setMaxBytesPerSecond(100_000)
061     *   .get();
062     * }
063     * </pre>
064     * <pre>{@code
065     * ThrottledInputStream in = ThrottledInputStream.builder()
066     *   .setInputStream(inputStream)
067     *   .setMaxBytesPerSecond(100_000)
068     *   .get();
069     * }
070     * </pre>
071     *
072     * @see #get()
073     */
074    // @formatter:on
075    public static class Builder extends AbstractBuilder<ThrottledInputStream, Builder> {
076
077        /**
078         * Effectively not throttled.
079         */
080        private long maxBytesPerSecond = Long.MAX_VALUE;
081
082        /**
083         * Builds a new {@link ThrottledInputStream}.
084         * <p>
085         * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
086         * </p>
087         * <p>
088         * This builder use the following aspects:
089         * </p>
090         * <ul>
091         * <li>{@link #getInputStream()}</li>
092         * <li>maxBytesPerSecond</li>
093         * </ul>
094         *
095         * @return a new instance.
096         * @throws IllegalStateException         if the {@code origin} is {@code null}.
097         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
098         * @throws IOException                   if an I/O error occurs.
099         * @see #getInputStream()
100         */
101        @SuppressWarnings("resource")
102        @Override
103        public ThrottledInputStream get() throws IOException {
104            return new ThrottledInputStream(this);
105        }
106
107        /**
108         * Sets the maximum bytes per second.
109         *
110         * @param maxBytesPerSecond the maximum bytes per second.
111         */
112        public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
113            this.maxBytesPerSecond = maxBytesPerSecond;
114        }
115
116    }
117
118    /**
119     * Constructs a new {@link Builder}.
120     *
121     * @return a new {@link Builder}.
122     */
123    public static Builder builder() {
124        return new Builder();
125    }
126
127    static long toSleepMillis(final long bytesRead, final long maxBytesPerSec, final long elapsedMillis) {
128        if (elapsedMillis < 0) {
129            throw new IllegalArgumentException("The elapsed time should be greater or equal to zero");
130        }
131        if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
132            return 0;
133        }
134        // We use this class to load the single source file, so the bytesRead
135        // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
136        // We can get the precise sleep time by using the double value.
137        final long millis = (long) ((double) bytesRead / (double) maxBytesPerSec * 1000 - elapsedMillis);
138        if (millis <= 0) {
139            return 0;
140        }
141        return millis;
142    }
143
144    private final long maxBytesPerSecond;
145    private final long startTime = System.currentTimeMillis();
146    private Duration totalSleepDuration = Duration.ZERO;
147
148    private ThrottledInputStream(final Builder builder) throws IOException {
149        super(builder);
150        if (builder.maxBytesPerSecond <= 0) {
151            throw new IllegalArgumentException("Bandwidth " + builder.maxBytesPerSecond + " is invalid.");
152        }
153        this.maxBytesPerSecond = builder.maxBytesPerSecond;
154    }
155
156    @Override
157    protected void beforeRead(final int n) throws IOException {
158        throttle();
159    }
160
161    /**
162     * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
163     *
164     * @return Read rate, in bytes/sec.
165     */
166    private long getBytesPerSecond() {
167        final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
168        if (elapsedSeconds == 0) {
169            return getByteCount();
170        }
171        return getByteCount() / elapsedSeconds;
172    }
173
174    private long getSleepMillis() {
175        return toSleepMillis(getByteCount(), maxBytesPerSecond, System.currentTimeMillis() - startTime);
176    }
177
178    /**
179     * Gets the total duration spent in sleep.
180     *
181     * @return Duration spent in sleep.
182     */
183    Duration getTotalSleepDuration() {
184        return totalSleepDuration;
185    }
186
187    private void throttle() throws InterruptedIOException {
188        final long sleepMillis = getSleepMillis();
189        if (sleepMillis > 0) {
190            totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
191            try {
192                TimeUnit.MILLISECONDS.sleep(sleepMillis);
193            } catch (final InterruptedException e) {
194                throw new InterruptedIOException("Thread aborted");
195            }
196        }
197    }
198
199    /** {@inheritDoc} */
200    @Override
201    public String toString() {
202        return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
203                + ", totalSleepDuration=" + totalSleepDuration + ']';
204    }
205}