001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018import java.io.BufferedInputStream;
019import java.io.File;
020import java.io.IOException;
021import java.io.InputStream;
022import java.nio.ByteBuffer;
023import java.nio.channels.FileChannel;
024import java.nio.file.Path;
025import java.nio.file.StandardOpenOption;
026import java.util.Objects;
027
028import org.apache.commons.io.IOUtils;
029import org.apache.commons.io.build.AbstractStreamBuilder;
030
031/**
032 * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
033 * using {@link BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
034 * reading a file using NIO, but does not support buffering.
035 * <p>
036 * To build an instance, use {@link Builder}.
037 * </p>
038 * <p>
039 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
040 * </p>
041 *
042 * @see Builder
043 * @since 2.9.0
044 */
045public final class BufferedFileChannelInputStream extends InputStream {
046
047    // @formatter:off
048    /**
049     * Builds a new {@link BufferedFileChannelInputStream}.
050     *
051     * <p>
052     * Using File IO:
053     * </p>
054     * <pre>{@code
055     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
056     *   .setFile(file)
057     *   .setBufferSize(4096)
058     *   .get();}
059     * </pre>
060     * <p>
061     * Using NIO Path:
062     * </p>
063     * <pre>{@code
064     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
065     *   .setPath(path)
066     *   .setBufferSize(4096)
067     *   .get();}
068     * </pre>
069     *
070     * @see #get()
071     * @since 2.12.0
072     */
073    // @formatter:on
074    public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {
075
076        private FileChannel fileChannel;
077
078        /**
079         * Constructs a new builder of {@link BufferedFileChannelInputStream}.
080         */
081        public Builder() {
082            // empty
083        }
084
085        /**
086         * Builds a new {@link BufferedFileChannelInputStream}.
087         * <p>
088         * You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
089         * </p>
090         * <p>
091         * This builder uses the following aspects:
092         * </p>
093         * <ul>
094         * <li>{@link FileChannel} takes precedence is set. </li>
095         * <li>{@link #getPath()} if the file channel is not set.</li>
096         * <li>{@link #getBufferSize()}</li>
097         * </ul>
098         *
099         * @return a new instance.
100         * @throws IllegalStateException         if the {@code origin} is {@code null}.
101         * @throws UnsupportedOperationException if the origin cannot be converted to a {@link Path}.
102         * @throws IOException                   if an I/O error occurs converting to an {@link Path} using {@link #getPath()}.
103         * @see #getPath()
104         * @see #getBufferSize()
105         * @see #getUnchecked()
106         */
107        @Override
108        public BufferedFileChannelInputStream get() throws IOException {
109            return fileChannel != null ? new BufferedFileChannelInputStream(fileChannel, getBufferSize())
110                    : new BufferedFileChannelInputStream(getPath(), getBufferSize());
111        }
112
113        /**
114         * Sets the file channel.
115         * <p>
116         * This setting takes precedence over all others.
117         * </p>
118         *
119         * @param fileChannel the file channel.
120         * @return this instance.
121         * @since 2.18.0
122         */
123        public Builder setFileChannel(final FileChannel fileChannel) {
124            this.fileChannel = fileChannel;
125            return this;
126        }
127
128    }
129
130    /**
131     * Constructs a new {@link Builder}.
132     *
133     * @return a new {@link Builder}.
134     * @since 2.12.0
135     */
136    public static Builder builder() {
137        return new Builder();
138    }
139
140    private final ByteBuffer byteBuffer;
141
142    private final FileChannel fileChannel;
143
144    /**
145     * Constructs a new instance for the given File.
146     *
147     * @param file The file to stream.
148     * @throws IOException If an I/O error occurs
149     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
150     */
151    @Deprecated
152    public BufferedFileChannelInputStream(final File file) throws IOException {
153        this(file, IOUtils.DEFAULT_BUFFER_SIZE);
154    }
155
156    /**
157     * Constructs a new instance for the given File and buffer size.
158     *
159     * @param file       The file to stream.
160     * @param bufferSize buffer size.
161     * @throws IOException If an I/O error occurs
162     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
163     */
164    @Deprecated
165    public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
166        this(file.toPath(), bufferSize);
167    }
168
169    private BufferedFileChannelInputStream(final FileChannel fileChannel, final int bufferSize) {
170        this.fileChannel = Objects.requireNonNull(fileChannel, "path");
171        byteBuffer = ByteBuffer.allocateDirect(bufferSize);
172        byteBuffer.flip();
173    }
174
175    /**
176     * Constructs a new instance for the given Path.
177     *
178     * @param path The path to stream.
179     * @throws IOException If an I/O error occurs
180     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
181     */
182    @Deprecated
183    public BufferedFileChannelInputStream(final Path path) throws IOException {
184        this(path, IOUtils.DEFAULT_BUFFER_SIZE);
185    }
186
187    /**
188     * Constructs a new instance for the given Path and buffer size.
189     *
190     * @param path       The path to stream.
191     * @param bufferSize buffer size.
192     * @throws IOException If an I/O error occurs
193     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
194     */
195    @SuppressWarnings("resource")
196    @Deprecated
197    public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
198        this(FileChannel.open(path, StandardOpenOption.READ), bufferSize);
199    }
200
201    @Override
202    public synchronized int available() throws IOException {
203        if (!fileChannel.isOpen()) {
204            return 0;
205        }
206        if (!refill()) {
207            return 0;
208        }
209        return byteBuffer.remaining();
210    }
211
212    /**
213     * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
214     * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
215     * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
216     * API to manually dispose of these kinds of buffers.
217     *
218     * @param buffer the buffer to clean.
219     */
220    private void clean(final ByteBuffer buffer) {
221        if (buffer.isDirect()) {
222            cleanDirectBuffer(buffer);
223        }
224    }
225
226    /**
227     * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
228     * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
229     * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
230     * reflection.
231     *
232     * @param buffer the buffer to clean. must be a DirectBuffer.
233     */
234    private void cleanDirectBuffer(final ByteBuffer buffer) {
235        if (ByteBufferCleaner.isSupported()) {
236            ByteBufferCleaner.clean(buffer);
237        }
238    }
239
240    @Override
241    public synchronized void close() throws IOException {
242        try {
243            fileChannel.close();
244        } finally {
245            clean(byteBuffer);
246        }
247    }
248
249    @Override
250    public synchronized int read() throws IOException {
251        if (!refill()) {
252            return EOF;
253        }
254        return byteBuffer.get() & 0xFF;
255    }
256
257    @Override
258    public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
259        if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
260            throw new IndexOutOfBoundsException();
261        }
262        if (!refill()) {
263            return EOF;
264        }
265        len = Math.min(len, byteBuffer.remaining());
266        byteBuffer.get(b, offset, len);
267        return len;
268    }
269
270    /**
271     * Checks whether data is left to be read from the input stream.
272     *
273     * @return true if data is left, false otherwise
274     * @throws IOException if an I/O error occurs.
275     */
276    private boolean refill() throws IOException {
277        Input.checkOpen(fileChannel.isOpen());
278        if (!byteBuffer.hasRemaining()) {
279            byteBuffer.clear();
280            int nRead = 0;
281            while (nRead == 0) {
282                nRead = fileChannel.read(byteBuffer);
283            }
284            byteBuffer.flip();
285            return nRead >= 0;
286        }
287        return true;
288    }
289
290    @Override
291    public synchronized long skip(final long n) throws IOException {
292        if (n <= 0L) {
293            return 0L;
294        }
295        if (byteBuffer.remaining() >= n) {
296            // The buffered content is enough to skip
297            byteBuffer.position(byteBuffer.position() + (int) n);
298            return n;
299        }
300        final long skippedFromBuffer = byteBuffer.remaining();
301        final long toSkipFromFileChannel = n - skippedFromBuffer;
302        // Discard everything we have read in the buffer.
303        byteBuffer.position(0);
304        byteBuffer.flip();
305        return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
306    }
307
308    private long skipFromFileChannel(final long n) throws IOException {
309        final long currentFilePosition = fileChannel.position();
310        final long size = fileChannel.size();
311        if (n > size - currentFilePosition) {
312            fileChannel.position(size);
313            return size - currentFilePosition;
314        }
315        fileChannel.position(currentFilePosition + n);
316        return n;
317    }
318
319}