001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018import java.io.BufferedInputStream;
019import java.io.File;
020import java.io.IOException;
021import java.io.InputStream;
022import java.nio.ByteBuffer;
023import java.nio.channels.FileChannel;
024import java.nio.file.Path;
025import java.nio.file.StandardOpenOption;
026import java.util.Objects;
027
028import org.apache.commons.io.IOUtils;
029import org.apache.commons.io.build.AbstractStreamBuilder;
030
031/**
032 * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
033 * using {@link BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
034 * reading a file using NIO, but does not support buffering.
035 * <p>
036 * To build an instance, use {@link Builder}.
037 * </p>
038 * <p>
039 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
040 * </p>
041 *
042 * @see Builder
043 * @since 2.9.0
044 */
045public final class BufferedFileChannelInputStream extends InputStream {
046
047    // @formatter:off
048    /**
049     * Builds a new {@link BufferedFileChannelInputStream}.
050     *
051     * <p>
052     * Using File IO:
053     * </p>
054     * <pre>{@code
055     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
056     *   .setFile(file)
057     *   .setBufferSize(4096)
058     *   .get();}
059     * </pre>
060     * <p>
061     * Using NIO Path:
062     * </p>
063     * <pre>{@code
064     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
065     *   .setPath(path)
066     *   .setBufferSize(4096)
067     *   .get();}
068     * </pre>
069     *
070     * @see #get()
071     * @since 2.12.0
072     */
073    // @formatter:on
074    public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {
075
076        /**
077         * Builds a new {@link BufferedFileChannelInputStream}.
078         * <p>
079         * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
080         * </p>
081         * <p>
082         * This builder use the following aspects:
083         * </p>
084         * <ul>
085         * <li>{@link #getInputStream()}</li>
086         * <li>{@link #getBufferSize()}</li>
087         * </ul>
088         *
089         * @return a new instance.
090         * @throws IllegalStateException         if the {@code origin} is {@code null}.
091         * @throws UnsupportedOperationException if the origin cannot be converted to a {@link Path}.
092         * @throws IOException If an I/O error occurs
093         * @see #getPath()
094         * @see #getBufferSize()
095         */
096        @Override
097        public BufferedFileChannelInputStream get() throws IOException {
098            return new BufferedFileChannelInputStream(getPath(), getBufferSize());
099        }
100
101    }
102
103    /**
104     * Constructs a new {@link Builder}.
105     *
106     * @return a new {@link Builder}.
107     * @since 2.12.0
108     */
109    public static Builder builder() {
110        return new Builder();
111    }
112
113    private final ByteBuffer byteBuffer;
114
115    private final FileChannel fileChannel;
116
117    /**
118     * Constructs a new instance for the given File.
119     *
120     * @param file The file to stream.
121     * @throws IOException If an I/O error occurs
122     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
123     */
124    @Deprecated
125    public BufferedFileChannelInputStream(final File file) throws IOException {
126        this(file, IOUtils.DEFAULT_BUFFER_SIZE);
127    }
128
129    /**
130     * Constructs a new instance for the given File and buffer size.
131     *
132     * @param file       The file to stream.
133     * @param bufferSize buffer size.
134     * @throws IOException If an I/O error occurs
135     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
136     */
137    @Deprecated
138    public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
139        this(file.toPath(), bufferSize);
140    }
141
142    /**
143     * Constructs a new instance for the given Path.
144     *
145     * @param path The path to stream.
146     * @throws IOException If an I/O error occurs
147     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
148     */
149    @Deprecated
150    public BufferedFileChannelInputStream(final Path path) throws IOException {
151        this(path, IOUtils.DEFAULT_BUFFER_SIZE);
152    }
153
154    /**
155     * Constructs a new instance for the given Path and buffer size.
156     *
157     * @param path       The path to stream.
158     * @param bufferSize buffer size.
159     * @throws IOException If an I/O error occurs
160     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
161     */
162    @Deprecated
163    public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
164        Objects.requireNonNull(path, "path");
165        fileChannel = FileChannel.open(path, StandardOpenOption.READ);
166        byteBuffer = ByteBuffer.allocateDirect(bufferSize);
167        byteBuffer.flip();
168    }
169
170    @Override
171    public synchronized int available() throws IOException {
172        if (!fileChannel.isOpen()) {
173            return 0;
174        }
175        if (!refill()) {
176            return 0;
177        }
178        return byteBuffer.remaining();
179    }
180
181    /**
182     * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
183     * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
184     * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
185     * API to manually dispose of these kinds of buffers.
186     *
187     * @param buffer the buffer to clean.
188     */
189    private void clean(final ByteBuffer buffer) {
190        if (buffer.isDirect()) {
191            cleanDirectBuffer(buffer);
192        }
193    }
194
195    /**
196     * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
197     * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
198     * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
199     * reflection.
200     *
201     * @param buffer the buffer to clean. must be a DirectBuffer.
202     */
203    private void cleanDirectBuffer(final ByteBuffer buffer) {
204        if (ByteBufferCleaner.isSupported()) {
205            ByteBufferCleaner.clean(buffer);
206        }
207    }
208
209    @Override
210    public synchronized void close() throws IOException {
211        try {
212            fileChannel.close();
213        } finally {
214            clean(byteBuffer);
215        }
216    }
217
218    @Override
219    public synchronized int read() throws IOException {
220        if (!refill()) {
221            return EOF;
222        }
223        return byteBuffer.get() & 0xFF;
224    }
225
226    @Override
227    public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
228        if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
229            throw new IndexOutOfBoundsException();
230        }
231        if (!refill()) {
232            return EOF;
233        }
234        len = Math.min(len, byteBuffer.remaining());
235        byteBuffer.get(b, offset, len);
236        return len;
237    }
238
239    /**
240     * Checks whether data is left to be read from the input stream.
241     *
242     * @return true if data is left, false otherwise
243     * @throws IOException if an I/O error occurs.
244     */
245    private boolean refill() throws IOException {
246        Input.checkOpen(fileChannel.isOpen());
247        if (!byteBuffer.hasRemaining()) {
248            byteBuffer.clear();
249            int nRead = 0;
250            while (nRead == 0) {
251                nRead = fileChannel.read(byteBuffer);
252            }
253            byteBuffer.flip();
254            return nRead >= 0;
255        }
256        return true;
257    }
258
259    @Override
260    public synchronized long skip(final long n) throws IOException {
261        if (n <= 0L) {
262            return 0L;
263        }
264        if (byteBuffer.remaining() >= n) {
265            // The buffered content is enough to skip
266            byteBuffer.position(byteBuffer.position() + (int) n);
267            return n;
268        }
269        final long skippedFromBuffer = byteBuffer.remaining();
270        final long toSkipFromFileChannel = n - skippedFromBuffer;
271        // Discard everything we have read in the buffer.
272        byteBuffer.position(0);
273        byteBuffer.flip();
274        return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
275    }
276
277    private long skipFromFileChannel(final long n) throws IOException {
278        final long currentFilePosition = fileChannel.position();
279        final long size = fileChannel.size();
280        if (n > size - currentFilePosition) {
281            fileChannel.position(size);
282            return size - currentFilePosition;
283        }
284        fileChannel.position(currentFilePosition + n);
285        return n;
286    }
287
288}