001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.compressors.snappy;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PushbackInputStream;
024import java.util.Arrays;
025
026import org.apache.commons.codec.digest.PureJavaCrc32C;
027import org.apache.commons.compress.compressors.CompressorInputStream;
028import org.apache.commons.compress.utils.ByteUtils;
029import org.apache.commons.compress.utils.IOUtils;
030import org.apache.commons.compress.utils.InputStreamStatistics;
031import org.apache.commons.io.input.BoundedInputStream;
032
033/**
034 * CompressorInputStream for the framing Snappy format.
035 *
036 * <p>
037 * Based on the "spec" in the version "Last revised: 2013-10-25"
038 * </p>
039 *
040 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
041 * @since 1.7
042 */
043public class FramedSnappyCompressorInputStream extends CompressorInputStream implements InputStreamStatistics {
044
045    /**
046     * package private for tests only.
047     */
048    static final long MASK_OFFSET = 0xa282ead8L;
049
050    private static final int STREAM_IDENTIFIER_TYPE = 0xff;
051    static final int COMPRESSED_CHUNK_TYPE = 0;
052    private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
053    private static final int PADDING_CHUNK_TYPE = 0xfe;
054    private static final int MIN_UNSKIPPABLE_TYPE = 2;
055    private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
056    private static final int MAX_SKIPPABLE_TYPE = 0xfd;
057
058    // used by FramedSnappyCompressorOutputStream as well
059    static final byte[] SZ_SIGNATURE = { // NOSONAR
060            (byte) STREAM_IDENTIFIER_TYPE, // tag
061            6, 0, 0, // length
062            's', 'N', 'a', 'P', 'p', 'Y' };
063
064    /**
065     * Checks if the signature matches what is expected for a .sz file.
066     *
067     * <p>
068     * .sz files start with a chunk with tag 0xff and content sNaPpY.
069     * </p>
070     *
071     * @param signature the bytes to check
072     * @param length    the number of bytes to check
073     * @return true if this is a .sz stream, false otherwise
074     */
075    public static boolean matches(final byte[] signature, final int length) {
076
077        if (length < SZ_SIGNATURE.length) {
078            return false;
079        }
080
081        byte[] shortenedSig = signature;
082        if (signature.length > SZ_SIGNATURE.length) {
083            shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length);
084        }
085
086        return Arrays.equals(shortenedSig, SZ_SIGNATURE);
087    }
088
089    static long unmask(long x) {
090        // ugly, maybe we should just have used ints and deal with the
091        // overflow
092        x -= MASK_OFFSET;
093        x &= 0xffffFFFFL;
094        return (x >> 17 | x << 15) & 0xffffFFFFL;
095    }
096
097    private long unreadBytes;
098
099    private final BoundedInputStream countingStream;
100
101    /** The underlying stream to read compressed data from */
102    private final PushbackInputStream inputStream;
103
104    /** The dialect to expect */
105    private final FramedSnappyDialect dialect;
106
107    private SnappyCompressorInputStream currentCompressedChunk;
108
109    // used in no-arg read method
110    private final byte[] oneByte = new byte[1];
111    private boolean endReached, inUncompressedChunk;
112    private int uncompressedBytesRemaining;
113    private long expectedChecksum = -1;
114
115    private final int blockSize;
116
117    private final PureJavaCrc32C checksum = new PureJavaCrc32C();
118
119    private final ByteUtils.ByteSupplier supplier = this::readOneByte;
120
121    /**
122     * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream using the
123     * {@link FramedSnappyDialect#STANDARD} dialect.
124     *
125     * @param in the InputStream from which to read the compressed data
126     * @throws IOException if reading fails
127     */
128    public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
129        this(in, FramedSnappyDialect.STANDARD);
130    }
131
132    /**
133     * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
134     *
135     * @param in      the InputStream from which to read the compressed data
136     * @param dialect the dialect used by the compressed stream
137     * @throws IOException if reading fails
138     */
139    public FramedSnappyCompressorInputStream(final InputStream in, final FramedSnappyDialect dialect) throws IOException {
140        this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect);
141    }
142
143    /**
144     * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
145     *
146     * @param in        the InputStream from which to read the compressed data
147     * @param blockSize the block size to use for the compressed stream
148     * @param dialect   the dialect used by the compressed stream
149     * @throws IOException              if reading fails
150     * @throws IllegalArgumentException if blockSize is not bigger than 0
151     * @since 1.14
152     */
153    public FramedSnappyCompressorInputStream(final InputStream in, final int blockSize, final FramedSnappyDialect dialect) throws IOException {
154        if (blockSize <= 0) {
155            throw new IllegalArgumentException("blockSize must be bigger than 0");
156        }
157        countingStream = BoundedInputStream.builder().setInputStream(in).get();
158        this.inputStream = new PushbackInputStream(countingStream, 1);
159        this.blockSize = blockSize;
160        this.dialect = dialect;
161        if (dialect.hasStreamIdentifier()) {
162            readStreamIdentifier();
163        }
164    }
165
166    /** {@inheritDoc} */
167    @Override
168    public int available() throws IOException {
169        if (inUncompressedChunk) {
170            return Math.min(uncompressedBytesRemaining, inputStream.available());
171        }
172        if (currentCompressedChunk != null) {
173            return currentCompressedChunk.available();
174        }
175        return 0;
176    }
177
178    /** {@inheritDoc} */
179    @Override
180    public void close() throws IOException {
181        try {
182            if (currentCompressedChunk != null) {
183                currentCompressedChunk.close();
184                currentCompressedChunk = null;
185            }
186        } finally {
187            inputStream.close();
188        }
189    }
190
191    /**
192     * @since 1.17
193     */
194    @Override
195    public long getCompressedCount() {
196        return countingStream.getCount() - unreadBytes;
197    }
198
199    /** {@inheritDoc} */
200    @Override
201    public int read() throws IOException {
202        return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
203    }
204
205    /** {@inheritDoc} */
206    @Override
207    public int read(final byte[] b, final int off, final int len) throws IOException {
208        if (len == 0) {
209            return 0;
210        }
211        int read = readOnce(b, off, len);
212        if (read == -1) {
213            readNextBlock();
214            if (endReached) {
215                return -1;
216            }
217            read = readOnce(b, off, len);
218        }
219        return read;
220    }
221
222    private long readCrc() throws IOException {
223        final byte[] b = new byte[4];
224        final int read = IOUtils.readFully(inputStream, b);
225        count(read);
226        if (read != 4) {
227            throw new IOException("Premature end of stream");
228        }
229        return ByteUtils.fromLittleEndian(b);
230    }
231
232    private void readNextBlock() throws IOException {
233        verifyLastChecksumAndReset();
234        inUncompressedChunk = false;
235        final int type = readOneByte();
236        if (type == -1) {
237            endReached = true;
238        } else if (type == STREAM_IDENTIFIER_TYPE) {
239            inputStream.unread(type);
240            unreadBytes++;
241            pushedBackBytes(1);
242            readStreamIdentifier();
243            readNextBlock();
244        } else if (type == PADDING_CHUNK_TYPE || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) {
245            skipBlock();
246            readNextBlock();
247        } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
248            throw new IOException("Unskippable chunk with type " + type + " (hex " + Integer.toHexString(type) + ")" + " detected.");
249        } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
250            inUncompressedChunk = true;
251            uncompressedBytesRemaining = readSize() - 4 /* CRC */;
252            if (uncompressedBytesRemaining < 0) {
253                throw new IOException("Found illegal chunk with negative size");
254            }
255            expectedChecksum = unmask(readCrc());
256        } else if (type == COMPRESSED_CHUNK_TYPE) {
257            final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
258            final long size = readSize() - (expectChecksum ? 4L : 0L);
259            if (size < 0) {
260                throw new IOException("Found illegal chunk with negative size");
261            }
262            if (expectChecksum) {
263                expectedChecksum = unmask(readCrc());
264            } else {
265                expectedChecksum = -1;
266            }
267            // @formatter:off
268            currentCompressedChunk = new SnappyCompressorInputStream(BoundedInputStream.builder()
269                    .setInputStream(inputStream)
270                    .setMaxCount(size)
271                    .setPropagateClose(false)
272                    .get(),
273                    blockSize);
274            // @formatter:on
275            // constructor reads uncompressed size
276            count(currentCompressedChunk.getBytesRead());
277        } else {
278            // impossible as all potential byte values have been covered
279            throw new IOException("Unknown chunk type " + type + " detected.");
280        }
281    }
282
283    /**
284     * Read from the current chunk into the given array.
285     *
286     * @return -1 if there is no current chunk or the number of bytes read from the current chunk (which may be -1 if the end of the chunk is reached).
287     */
288    private int readOnce(final byte[] b, final int off, final int len) throws IOException {
289        int read = -1;
290        if (inUncompressedChunk) {
291            final int amount = Math.min(uncompressedBytesRemaining, len);
292            if (amount == 0) {
293                return -1;
294            }
295            read = inputStream.read(b, off, amount);
296            if (read != -1) {
297                uncompressedBytesRemaining -= read;
298                count(read);
299            }
300        } else if (currentCompressedChunk != null) {
301            final long before = currentCompressedChunk.getBytesRead();
302            read = currentCompressedChunk.read(b, off, len);
303            if (read == -1) {
304                currentCompressedChunk.close();
305                currentCompressedChunk = null;
306            } else {
307                count(currentCompressedChunk.getBytesRead() - before);
308            }
309        }
310        if (read > 0) {
311            checksum.update(b, off, read);
312        }
313        return read;
314    }
315
316    private int readOneByte() throws IOException {
317        final int b = inputStream.read();
318        if (b != -1) {
319            count(1);
320            return b & 0xFF;
321        }
322        return -1;
323    }
324
325    private int readSize() throws IOException {
326        return (int) ByteUtils.fromLittleEndian(supplier, 3);
327    }
328
329    private void readStreamIdentifier() throws IOException {
330        final byte[] b = new byte[10];
331        final int read = IOUtils.readFully(inputStream, b);
332        count(read);
333        if (10 != read || !matches(b, 10)) {
334            throw new IOException("Not a framed Snappy stream");
335        }
336    }
337
338    private void skipBlock() throws IOException {
339        final int size = readSize();
340        if (size < 0) {
341            throw new IOException("Found illegal chunk with negative size");
342        }
343        final long read = org.apache.commons.io.IOUtils.skip(inputStream, size);
344        count(read);
345        if (read != size) {
346            throw new IOException("Premature end of stream");
347        }
348    }
349
350    private void verifyLastChecksumAndReset() throws IOException {
351        if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
352            throw new IOException("Checksum verification failed");
353        }
354        expectedChecksum = -1;
355        checksum.reset();
356    }
357
358}