001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.commons.io.input;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InterruptedIOException;
023import java.time.Duration;
024import java.time.temporal.ChronoUnit;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.commons.io.build.AbstractStreamBuilder;
028
029/**
030 * Provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream instance. The throttling works by
031 * examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time interval if the byte-transfer is found
032 * exceed the specified tolerable maximum. (Thus, while the read-rate might exceed the maximum for a short interval, the average tends towards the
033 * specified maximum, overall.)
034 * <p>
035 * To build an instance, see {@link Builder}
036 * </p>
037 * <p>
038 * Inspired by Apache HBase's class of the same name.
039 * </p>
040 *
041 * @see Builder
042 * @since 2.16.0
043 */
044public final class ThrottledInputStream extends CountingInputStream {
045
046    // @formatter:off
047    /**
048     * Builds a new {@link ThrottledInputStream}.
049     *
050     * <h2>Using NIO</h2>
051     * <pre>{@code
052     * ThrottledInputStream in = ThrottledInputStream.builder()
053     *   .setPath(Paths.get("MyFile.xml"))
054     *   .setMaxBytesPerSecond(100_000)
055     *   .get();
056     * }
057     * </pre>
058     * <h2>Using IO</h2>
059     * <pre>{@code
060     * ThrottledInputStream in = ThrottledInputStream.builder()
061     *   .setFile(new File("MyFile.xml"))
062     *   .setMaxBytesPerSecond(100_000)
063     *   .get();
064     * }
065     * </pre>
066     * <pre>{@code
067     * ThrottledInputStream in = ThrottledInputStream.builder()
068     *   .setInputStream(inputStream)
069     *   .setMaxBytesPerSecond(100_000)
070     *   .get();
071     * }
072     * </pre>
073     *
074     * @see #get()
075     */
076    // @formatter:on
077    public static class Builder extends AbstractStreamBuilder<ThrottledInputStream, Builder> {
078
079        /**
080         * Effectively not throttled.
081         */
082        private long maxBytesPerSecond = Long.MAX_VALUE;
083
084        /**
085         * Builds a new {@link ThrottledInputStream}.
086         * <p>
087         * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
088         * </p>
089         * <p>
090         * This builder use the following aspects:
091         * </p>
092         * <ul>
093         * <li>{@link #getInputStream()}</li>
094         * <li>maxBytesPerSecond</li>
095         * </ul>
096         *
097         * @return a new instance.
098         * @throws IllegalStateException         if the {@code origin} is {@code null}.
099         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
100         * @throws IOException                   if an I/O error occurs.
101         * @see #getInputStream()
102         */
103        @SuppressWarnings("resource")
104        @Override
105        public ThrottledInputStream get() throws IOException {
106            return new ThrottledInputStream(getInputStream(), maxBytesPerSecond);
107        }
108
109        /**
110         * Sets the maximum bytes per second.
111         *
112         * @param maxBytesPerSecond the maximum bytes per second.
113         */
114        public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
115            this.maxBytesPerSecond = maxBytesPerSecond;
116        }
117
118    }
119
120    /**
121     * Constructs a new {@link Builder}.
122     *
123     * @return a new {@link Builder}.
124     */
125    public static Builder builder() {
126        return new Builder();
127    }
128
129    static long toSleepMillis(final long bytesRead, final long maxBytesPerSec, final long elapsedMillis) {
130        if (elapsedMillis < 0) {
131            throw new IllegalArgumentException("The elapsed time should be greater or equal to zero");
132        }
133        if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
134            return 0;
135        }
136        // We use this class to load the single source file, so the bytesRead
137        // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
138        // We can get the precise sleep time by using the double value.
139        final long millis = (long) ((double) bytesRead / (double) maxBytesPerSec * 1000 - elapsedMillis);
140        if (millis <= 0) {
141            return 0;
142        }
143        return millis;
144    }
145
146    private final long maxBytesPerSecond;
147    private final long startTime = System.currentTimeMillis();
148    private Duration totalSleepDuration = Duration.ZERO;
149
150    private ThrottledInputStream(final InputStream proxy, final long maxBytesPerSecond) {
151        super(proxy);
152        if (maxBytesPerSecond <= 0) {
153            throw new IllegalArgumentException("Bandwidth " + maxBytesPerSecond + " is invalid.");
154        }
155        this.maxBytesPerSecond = maxBytesPerSecond;
156    }
157
158    @Override
159    protected void beforeRead(final int n) throws IOException {
160        throttle();
161    }
162
163    /**
164     * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
165     *
166     * @return Read rate, in bytes/sec.
167     */
168    private long getBytesPerSecond() {
169        final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
170        if (elapsedSeconds == 0) {
171            return getByteCount();
172        }
173        return getByteCount() / elapsedSeconds;
174    }
175
176    private long getSleepMillis() {
177        return toSleepMillis(getByteCount(), maxBytesPerSecond, System.currentTimeMillis() - startTime);
178    }
179
180    /**
181     * Gets the total duration spent in sleep.
182     *
183     * @return Duration spent in sleep.
184     */
185    Duration getTotalSleepDuration() {
186        return totalSleepDuration;
187    }
188
189    private void throttle() throws InterruptedIOException {
190        final long sleepMillis = getSleepMillis();
191        if (sleepMillis > 0) {
192            totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
193            try {
194                TimeUnit.MILLISECONDS.sleep(sleepMillis);
195            } catch (final InterruptedException e) {
196                throw new InterruptedIOException("Thread aborted");
197            }
198        }
199    }
200
201    /** {@inheritDoc} */
202    @Override
203    public String toString() {
204        return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
205                + ", totalSleepDuration=" + totalSleepDuration + ']';
206    }
207}