FramedSnappyCompressorInputStream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.commons.compress.compressors.snappy;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.util.Arrays;
import org.apache.commons.codec.digest.PureJavaCrc32C;
import org.apache.commons.compress.compressors.CompressorInputStream;
import org.apache.commons.compress.utils.ByteUtils;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.compress.utils.InputStreamStatistics;
import org.apache.commons.io.input.BoundedInputStream;
/**
* CompressorInputStream for the framing Snappy format.
*
* <p>
* Based on the "spec" in the version "Last revised: 2013-10-25"
* </p>
*
* @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
* @since 1.7
*/
public class FramedSnappyCompressorInputStream extends CompressorInputStream implements InputStreamStatistics {
/**
* package private for tests only.
*/
static final long MASK_OFFSET = 0xa282ead8L;
private static final int STREAM_IDENTIFIER_TYPE = 0xff;
static final int COMPRESSED_CHUNK_TYPE = 0;
private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
private static final int PADDING_CHUNK_TYPE = 0xfe;
private static final int MIN_UNSKIPPABLE_TYPE = 2;
private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
private static final int MAX_SKIPPABLE_TYPE = 0xfd;
// used by FramedSnappyCompressorOutputStream as well
static final byte[] SZ_SIGNATURE = { // NOSONAR
(byte) STREAM_IDENTIFIER_TYPE, // tag
6, 0, 0, // length
's', 'N', 'a', 'P', 'p', 'Y' };
/**
* Checks if the signature matches what is expected for a .sz file.
*
* <p>
* .sz files start with a chunk with tag 0xff and content sNaPpY.
* </p>
*
* @param signature the bytes to check
* @param length the number of bytes to check
* @return true if this is a .sz stream, false otherwise
*/
public static boolean matches(final byte[] signature, final int length) {
if (length < SZ_SIGNATURE.length) {
return false;
}
byte[] shortenedSig = signature;
if (signature.length > SZ_SIGNATURE.length) {
shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length);
}
return Arrays.equals(shortenedSig, SZ_SIGNATURE);
}
static long unmask(long x) {
// ugly, maybe we should just have used ints and deal with the
// overflow
x -= MASK_OFFSET;
x &= 0xffffFFFFL;
return (x >> 17 | x << 15) & 0xffffFFFFL;
}
private long unreadBytes;
private final BoundedInputStream countingStream;
/** The underlying stream to read compressed data from */
private final PushbackInputStream inputStream;
/** The dialect to expect */
private final FramedSnappyDialect dialect;
private SnappyCompressorInputStream currentCompressedChunk;
// used in no-arg read method
private final byte[] oneByte = new byte[1];
private boolean endReached, inUncompressedChunk;
private int uncompressedBytesRemaining;
private long expectedChecksum = -1;
private final int blockSize;
private final PureJavaCrc32C checksum = new PureJavaCrc32C();
private final ByteUtils.ByteSupplier supplier = this::readOneByte;
/**
* Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream using the
* {@link FramedSnappyDialect#STANDARD} dialect.
*
* @param in the InputStream from which to read the compressed data
* @throws IOException if reading fails
*/
public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
this(in, FramedSnappyDialect.STANDARD);
}
/**
* Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
*
* @param in the InputStream from which to read the compressed data
* @param dialect the dialect used by the compressed stream
* @throws IOException if reading fails
*/
public FramedSnappyCompressorInputStream(final InputStream in, final FramedSnappyDialect dialect) throws IOException {
this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect);
}
/**
* Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream.
*
* @param in the InputStream from which to read the compressed data
* @param blockSize the block size to use for the compressed stream
* @param dialect the dialect used by the compressed stream
* @throws IOException if reading fails
* @throws IllegalArgumentException if blockSize is not bigger than 0
* @since 1.14
*/
public FramedSnappyCompressorInputStream(final InputStream in, final int blockSize, final FramedSnappyDialect dialect) throws IOException {
if (blockSize <= 0) {
throw new IllegalArgumentException("blockSize must be bigger than 0");
}
countingStream = BoundedInputStream.builder().setInputStream(in).get();
this.inputStream = new PushbackInputStream(countingStream, 1);
this.blockSize = blockSize;
this.dialect = dialect;
if (dialect.hasStreamIdentifier()) {
readStreamIdentifier();
}
}
/** {@inheritDoc} */
@Override
public int available() throws IOException {
if (inUncompressedChunk) {
return Math.min(uncompressedBytesRemaining, inputStream.available());
}
if (currentCompressedChunk != null) {
return currentCompressedChunk.available();
}
return 0;
}
/** {@inheritDoc} */
@Override
public void close() throws IOException {
try {
if (currentCompressedChunk != null) {
currentCompressedChunk.close();
currentCompressedChunk = null;
}
} finally {
inputStream.close();
}
}
/**
* @since 1.17
*/
@Override
public long getCompressedCount() {
return countingStream.getCount() - unreadBytes;
}
/** {@inheritDoc} */
@Override
public int read() throws IOException {
return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
}
/** {@inheritDoc} */
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
if (len == 0) {
return 0;
}
int read = readOnce(b, off, len);
if (read == -1) {
readNextBlock();
if (endReached) {
return -1;
}
read = readOnce(b, off, len);
}
return read;
}
private long readCrc() throws IOException {
final byte[] b = new byte[4];
final int read = IOUtils.readFully(inputStream, b);
count(read);
if (read != 4) {
throw new IOException("Premature end of stream");
}
return ByteUtils.fromLittleEndian(b);
}
private void readNextBlock() throws IOException {
verifyLastChecksumAndReset();
inUncompressedChunk = false;
final int type = readOneByte();
if (type == -1) {
endReached = true;
} else if (type == STREAM_IDENTIFIER_TYPE) {
inputStream.unread(type);
unreadBytes++;
pushedBackBytes(1);
readStreamIdentifier();
readNextBlock();
} else if (type == PADDING_CHUNK_TYPE || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) {
skipBlock();
readNextBlock();
} else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
throw new IOException("Unskippable chunk with type " + type + " (hex " + Integer.toHexString(type) + ")" + " detected.");
} else if (type == UNCOMPRESSED_CHUNK_TYPE) {
inUncompressedChunk = true;
uncompressedBytesRemaining = readSize() - 4 /* CRC */;
if (uncompressedBytesRemaining < 0) {
throw new IOException("Found illegal chunk with negative size");
}
expectedChecksum = unmask(readCrc());
} else if (type == COMPRESSED_CHUNK_TYPE) {
final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
final long size = readSize() - (expectChecksum ? 4L : 0L);
if (size < 0) {
throw new IOException("Found illegal chunk with negative size");
}
if (expectChecksum) {
expectedChecksum = unmask(readCrc());
} else {
expectedChecksum = -1;
}
// @formatter:off
currentCompressedChunk = new SnappyCompressorInputStream(BoundedInputStream.builder()
.setInputStream(inputStream)
.setMaxCount(size)
.setPropagateClose(false)
.get(),
blockSize);
// @formatter:on
// constructor reads uncompressed size
count(currentCompressedChunk.getBytesRead());
} else {
// impossible as all potential byte values have been covered
throw new IOException("Unknown chunk type " + type + " detected.");
}
}
/**
* Read from the current chunk into the given array.
*
* @return -1 if there is no current chunk or the number of bytes read from the current chunk (which may be -1 if the end of the chunk is reached).
*/
private int readOnce(final byte[] b, final int off, final int len) throws IOException {
int read = -1;
if (inUncompressedChunk) {
final int amount = Math.min(uncompressedBytesRemaining, len);
if (amount == 0) {
return -1;
}
read = inputStream.read(b, off, amount);
if (read != -1) {
uncompressedBytesRemaining -= read;
count(read);
}
} else if (currentCompressedChunk != null) {
final long before = currentCompressedChunk.getBytesRead();
read = currentCompressedChunk.read(b, off, len);
if (read == -1) {
currentCompressedChunk.close();
currentCompressedChunk = null;
} else {
count(currentCompressedChunk.getBytesRead() - before);
}
}
if (read > 0) {
checksum.update(b, off, read);
}
return read;
}
private int readOneByte() throws IOException {
final int b = inputStream.read();
if (b != -1) {
count(1);
return b & 0xFF;
}
return -1;
}
private int readSize() throws IOException {
return (int) ByteUtils.fromLittleEndian(supplier, 3);
}
private void readStreamIdentifier() throws IOException {
final byte[] b = new byte[10];
final int read = IOUtils.readFully(inputStream, b);
count(read);
if (10 != read || !matches(b, 10)) {
throw new IOException("Not a framed Snappy stream");
}
}
private void skipBlock() throws IOException {
final int size = readSize();
if (size < 0) {
throw new IOException("Found illegal chunk with negative size");
}
final long read = org.apache.commons.io.IOUtils.skip(inputStream, size);
count(read);
if (read != size) {
throw new IOException("Premature end of stream");
}
}
private void verifyLastChecksumAndReset() throws IOException {
if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
throw new IOException("Checksum verification failed");
}
expectedChecksum = -1;
checksum.reset();
}
}