001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018import java.io.BufferedInputStream;
019import java.io.File;
020import java.io.IOException;
021import java.io.InputStream;
022import java.nio.ByteBuffer;
023import java.nio.channels.FileChannel;
024import java.nio.file.Path;
025import java.nio.file.StandardOpenOption;
026import java.util.Objects;
027
028import org.apache.commons.io.IOUtils;
029import org.apache.commons.io.build.AbstractStreamBuilder;
030
031/**
032 * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
033 * using {@link BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
034 * reading a file using NIO, but does not support buffering.
035 * <p>
036 * To build an instance, use {@link Builder}.
037 * </p>
038 * <p>
039 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
040 * </p>
041 *
042 * @see Builder
043 * @since 2.9.0
044 */
045public final class BufferedFileChannelInputStream extends InputStream {
046
047    // @formatter:off
048    /**
049     * Builds a new {@link BufferedFileChannelInputStream}.
050     *
051     * <p>
052     * Using File IO:
053     * </p>
054     * <pre>{@code
055     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
056     *   .setFile(file)
057     *   .setBufferSize(4096)
058     *   .get();}
059     * </pre>
060     * <p>
061     * Using NIO Path:
062     * </p>
063     * <pre>{@code
064     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
065     *   .setPath(path)
066     *   .setBufferSize(4096)
067     *   .get();}
068     * </pre>
069     *
070     * @see #get()
071     * @since 2.12.0
072     */
073    // @formatter:on
074    public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {
075
076        private FileChannel fileChannel;
077
078        /**
079         * Builds a new {@link BufferedFileChannelInputStream}.
080         * <p>
081         * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
082         * </p>
083         * <p>
084         * This builder use the following aspects:
085         * </p>
086         * <ul>
087         * <li>{@link #getInputStream()}</li>
088         * <li>{@link #getBufferSize()}</li>
089         * </ul>
090         *
091         * @return a new instance.
092         * @throws IllegalStateException         if the {@code origin} is {@code null}.
093         * @throws UnsupportedOperationException if the origin cannot be converted to a {@link Path}.
094         * @throws IOException If an I/O error occurs
095         * @see #getPath()
096         * @see #getBufferSize()
097         */
098        @Override
099        public BufferedFileChannelInputStream get() throws IOException {
100            return fileChannel != null ? new BufferedFileChannelInputStream(fileChannel, getBufferSize())
101                    : new BufferedFileChannelInputStream(getPath(), getBufferSize());
102        }
103
104        /**
105         * Sets the file channel.
106         * <p>
107         * This setting takes precedence over all others.
108         * </p>
109         *
110         * @param fileChannel the file channel.
111         * @return this instance.
112         * @since 2.18.0
113         */
114        public Builder setFileChannel(final FileChannel fileChannel) {
115            this.fileChannel = fileChannel;
116            return this;
117        }
118
119    }
120
121    /**
122     * Constructs a new {@link Builder}.
123     *
124     * @return a new {@link Builder}.
125     * @since 2.12.0
126     */
127    public static Builder builder() {
128        return new Builder();
129    }
130
131    private final ByteBuffer byteBuffer;
132
133    private final FileChannel fileChannel;
134
135    /**
136     * Constructs a new instance for the given File.
137     *
138     * @param file The file to stream.
139     * @throws IOException If an I/O error occurs
140     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
141     */
142    @Deprecated
143    public BufferedFileChannelInputStream(final File file) throws IOException {
144        this(file, IOUtils.DEFAULT_BUFFER_SIZE);
145    }
146
147    /**
148     * Constructs a new instance for the given File and buffer size.
149     *
150     * @param file       The file to stream.
151     * @param bufferSize buffer size.
152     * @throws IOException If an I/O error occurs
153     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
154     */
155    @Deprecated
156    public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
157        this(file.toPath(), bufferSize);
158    }
159
160    private BufferedFileChannelInputStream(final FileChannel fileChannel, final int bufferSize) {
161        this.fileChannel = Objects.requireNonNull(fileChannel, "path");
162        byteBuffer = ByteBuffer.allocateDirect(bufferSize);
163        byteBuffer.flip();
164    }
165
166    /**
167     * Constructs a new instance for the given Path.
168     *
169     * @param path The path to stream.
170     * @throws IOException If an I/O error occurs
171     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
172     */
173    @Deprecated
174    public BufferedFileChannelInputStream(final Path path) throws IOException {
175        this(path, IOUtils.DEFAULT_BUFFER_SIZE);
176    }
177
178    /**
179     * Constructs a new instance for the given Path and buffer size.
180     *
181     * @param path       The path to stream.
182     * @param bufferSize buffer size.
183     * @throws IOException If an I/O error occurs
184     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
185     */
186    @SuppressWarnings("resource")
187    @Deprecated
188    public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
189        this(FileChannel.open(path, StandardOpenOption.READ), bufferSize);
190    }
191
192    @Override
193    public synchronized int available() throws IOException {
194        if (!fileChannel.isOpen()) {
195            return 0;
196        }
197        if (!refill()) {
198            return 0;
199        }
200        return byteBuffer.remaining();
201    }
202
203    /**
204     * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
205     * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
206     * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
207     * API to manually dispose of these kinds of buffers.
208     *
209     * @param buffer the buffer to clean.
210     */
211    private void clean(final ByteBuffer buffer) {
212        if (buffer.isDirect()) {
213            cleanDirectBuffer(buffer);
214        }
215    }
216
217    /**
218     * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
219     * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
220     * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
221     * reflection.
222     *
223     * @param buffer the buffer to clean. must be a DirectBuffer.
224     */
225    private void cleanDirectBuffer(final ByteBuffer buffer) {
226        if (ByteBufferCleaner.isSupported()) {
227            ByteBufferCleaner.clean(buffer);
228        }
229    }
230
231    @Override
232    public synchronized void close() throws IOException {
233        try {
234            fileChannel.close();
235        } finally {
236            clean(byteBuffer);
237        }
238    }
239
240    @Override
241    public synchronized int read() throws IOException {
242        if (!refill()) {
243            return EOF;
244        }
245        return byteBuffer.get() & 0xFF;
246    }
247
248    @Override
249    public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
250        if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
251            throw new IndexOutOfBoundsException();
252        }
253        if (!refill()) {
254            return EOF;
255        }
256        len = Math.min(len, byteBuffer.remaining());
257        byteBuffer.get(b, offset, len);
258        return len;
259    }
260
261    /**
262     * Checks whether data is left to be read from the input stream.
263     *
264     * @return true if data is left, false otherwise
265     * @throws IOException if an I/O error occurs.
266     */
267    private boolean refill() throws IOException {
268        Input.checkOpen(fileChannel.isOpen());
269        if (!byteBuffer.hasRemaining()) {
270            byteBuffer.clear();
271            int nRead = 0;
272            while (nRead == 0) {
273                nRead = fileChannel.read(byteBuffer);
274            }
275            byteBuffer.flip();
276            return nRead >= 0;
277        }
278        return true;
279    }
280
281    @Override
282    public synchronized long skip(final long n) throws IOException {
283        if (n <= 0L) {
284            return 0L;
285        }
286        if (byteBuffer.remaining() >= n) {
287            // The buffered content is enough to skip
288            byteBuffer.position(byteBuffer.position() + (int) n);
289            return n;
290        }
291        final long skippedFromBuffer = byteBuffer.remaining();
292        final long toSkipFromFileChannel = n - skippedFromBuffer;
293        // Discard everything we have read in the buffer.
294        byteBuffer.position(0);
295        byteBuffer.flip();
296        return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
297    }
298
299    private long skipFromFileChannel(final long n) throws IOException {
300        final long currentFilePosition = fileChannel.position();
301        final long size = fileChannel.size();
302        if (n > size - currentFilePosition) {
303            fileChannel.position(size);
304            return size - currentFilePosition;
305        }
306        fileChannel.position(currentFilePosition + n);
307        return n;
308    }
309
310}