001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.compressors.snappy; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PushbackInputStream; 024import java.util.Arrays; 025 026import org.apache.commons.codec.digest.PureJavaCrc32C; 027import org.apache.commons.compress.compressors.CompressorInputStream; 028import org.apache.commons.compress.utils.ByteUtils; 029import org.apache.commons.compress.utils.IOUtils; 030import org.apache.commons.compress.utils.InputStreamStatistics; 031import org.apache.commons.io.input.BoundedInputStream; 032 033/** 034 * CompressorInputStream for the framing Snappy format. 035 * 036 * <p> 037 * Based on the "spec" in the version "Last revised: 2013-10-25" 038 * </p> 039 * 040 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a> 041 * @since 1.7 042 */ 043public class FramedSnappyCompressorInputStream extends CompressorInputStream implements InputStreamStatistics { 044 045 /** 046 * package private for tests only. 047 */ 048 static final long MASK_OFFSET = 0xa282ead8L; 049 050 private static final int STREAM_IDENTIFIER_TYPE = 0xff; 051 static final int COMPRESSED_CHUNK_TYPE = 0; 052 private static final int UNCOMPRESSED_CHUNK_TYPE = 1; 053 private static final int PADDING_CHUNK_TYPE = 0xfe; 054 private static final int MIN_UNSKIPPABLE_TYPE = 2; 055 private static final int MAX_UNSKIPPABLE_TYPE = 0x7f; 056 private static final int MAX_SKIPPABLE_TYPE = 0xfd; 057 058 // used by FramedSnappyCompressorOutputStream as well 059 static final byte[] SZ_SIGNATURE = { // NOSONAR 060 (byte) STREAM_IDENTIFIER_TYPE, // tag 061 6, 0, 0, // length 062 's', 'N', 'a', 'P', 'p', 'Y' }; 063 064 /** 065 * Checks if the signature matches what is expected for a .sz file. 066 * 067 * <p> 068 * .sz files start with a chunk with tag 0xff and content sNaPpY. 069 * </p> 070 * 071 * @param signature the bytes to check 072 * @param length the number of bytes to check 073 * @return true if this is a .sz stream, false otherwise 074 */ 075 public static boolean matches(final byte[] signature, final int length) { 076 077 if (length < SZ_SIGNATURE.length) { 078 return false; 079 } 080 081 byte[] shortenedSig = signature; 082 if (signature.length > SZ_SIGNATURE.length) { 083 shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length); 084 } 085 086 return Arrays.equals(shortenedSig, SZ_SIGNATURE); 087 } 088 089 static long unmask(long x) { 090 // ugly, maybe we should just have used ints and deal with the 091 // overflow 092 x -= MASK_OFFSET; 093 x &= 0xffffFFFFL; 094 return (x >> 17 | x << 15) & 0xffffFFFFL; 095 } 096 097 private long unreadBytes; 098 099 private final BoundedInputStream countingStream; 100 101 /** The underlying stream to read compressed data from */ 102 private final PushbackInputStream inputStream; 103 104 /** The dialect to expect */ 105 private final FramedSnappyDialect dialect; 106 107 private SnappyCompressorInputStream currentCompressedChunk; 108 109 // used in no-arg read method 110 private final byte[] oneByte = new byte[1]; 111 private boolean endReached, inUncompressedChunk; 112 private int uncompressedBytesRemaining; 113 private long expectedChecksum = -1; 114 115 private final int blockSize; 116 117 private final PureJavaCrc32C checksum = new PureJavaCrc32C(); 118 119 private final ByteUtils.ByteSupplier supplier = this::readOneByte; 120 121 /** 122 * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream using the 123 * {@link FramedSnappyDialect#STANDARD} dialect. 124 * 125 * @param in the InputStream from which to read the compressed data 126 * @throws IOException if reading fails 127 */ 128 public FramedSnappyCompressorInputStream(final InputStream in) throws IOException { 129 this(in, FramedSnappyDialect.STANDARD); 130 } 131 132 /** 133 * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream. 134 * 135 * @param in the InputStream from which to read the compressed data 136 * @param dialect the dialect used by the compressed stream 137 * @throws IOException if reading fails 138 */ 139 public FramedSnappyCompressorInputStream(final InputStream in, final FramedSnappyDialect dialect) throws IOException { 140 this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect); 141 } 142 143 /** 144 * Constructs a new input stream that decompresses snappy-framed-compressed data from the specified input stream. 145 * 146 * @param in the InputStream from which to read the compressed data 147 * @param blockSize the block size to use for the compressed stream 148 * @param dialect the dialect used by the compressed stream 149 * @throws IOException if reading fails 150 * @throws IllegalArgumentException if blockSize is not bigger than 0 151 * @since 1.14 152 */ 153 public FramedSnappyCompressorInputStream(final InputStream in, final int blockSize, final FramedSnappyDialect dialect) throws IOException { 154 if (blockSize <= 0) { 155 throw new IllegalArgumentException("blockSize must be bigger than 0"); 156 } 157 countingStream = BoundedInputStream.builder().setInputStream(in).get(); 158 this.inputStream = new PushbackInputStream(countingStream, 1); 159 this.blockSize = blockSize; 160 this.dialect = dialect; 161 if (dialect.hasStreamIdentifier()) { 162 readStreamIdentifier(); 163 } 164 } 165 166 /** {@inheritDoc} */ 167 @Override 168 public int available() throws IOException { 169 if (inUncompressedChunk) { 170 return Math.min(uncompressedBytesRemaining, inputStream.available()); 171 } 172 if (currentCompressedChunk != null) { 173 return currentCompressedChunk.available(); 174 } 175 return 0; 176 } 177 178 /** {@inheritDoc} */ 179 @Override 180 public void close() throws IOException { 181 try { 182 if (currentCompressedChunk != null) { 183 currentCompressedChunk.close(); 184 currentCompressedChunk = null; 185 } 186 } finally { 187 inputStream.close(); 188 } 189 } 190 191 /** 192 * @since 1.17 193 */ 194 @Override 195 public long getCompressedCount() { 196 return countingStream.getCount() - unreadBytes; 197 } 198 199 /** {@inheritDoc} */ 200 @Override 201 public int read() throws IOException { 202 return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF; 203 } 204 205 /** {@inheritDoc} */ 206 @Override 207 public int read(final byte[] b, final int off, final int len) throws IOException { 208 if (len == 0) { 209 return 0; 210 } 211 int read = readOnce(b, off, len); 212 if (read == -1) { 213 readNextBlock(); 214 if (endReached) { 215 return -1; 216 } 217 read = readOnce(b, off, len); 218 } 219 return read; 220 } 221 222 private long readCrc() throws IOException { 223 final byte[] b = new byte[4]; 224 final int read = IOUtils.readFully(inputStream, b); 225 count(read); 226 if (read != 4) { 227 throw new IOException("Premature end of stream"); 228 } 229 return ByteUtils.fromLittleEndian(b); 230 } 231 232 private void readNextBlock() throws IOException { 233 verifyLastChecksumAndReset(); 234 inUncompressedChunk = false; 235 final int type = readOneByte(); 236 if (type == -1) { 237 endReached = true; 238 } else if (type == STREAM_IDENTIFIER_TYPE) { 239 inputStream.unread(type); 240 unreadBytes++; 241 pushedBackBytes(1); 242 readStreamIdentifier(); 243 readNextBlock(); 244 } else if (type == PADDING_CHUNK_TYPE || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) { 245 skipBlock(); 246 readNextBlock(); 247 } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) { 248 throw new IOException("Unskippable chunk with type " + type + " (hex " + Integer.toHexString(type) + ")" + " detected."); 249 } else if (type == UNCOMPRESSED_CHUNK_TYPE) { 250 inUncompressedChunk = true; 251 uncompressedBytesRemaining = readSize() - 4 /* CRC */; 252 if (uncompressedBytesRemaining < 0) { 253 throw new IOException("Found illegal chunk with negative size"); 254 } 255 expectedChecksum = unmask(readCrc()); 256 } else if (type == COMPRESSED_CHUNK_TYPE) { 257 final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks(); 258 final long size = readSize() - (expectChecksum ? 4L : 0L); 259 if (size < 0) { 260 throw new IOException("Found illegal chunk with negative size"); 261 } 262 if (expectChecksum) { 263 expectedChecksum = unmask(readCrc()); 264 } else { 265 expectedChecksum = -1; 266 } 267 // @formatter:off 268 currentCompressedChunk = new SnappyCompressorInputStream(BoundedInputStream.builder() 269 .setInputStream(inputStream) 270 .setMaxCount(size) 271 .setPropagateClose(false) 272 .get(), 273 blockSize); 274 // @formatter:on 275 // constructor reads uncompressed size 276 count(currentCompressedChunk.getBytesRead()); 277 } else { 278 // impossible as all potential byte values have been covered 279 throw new IOException("Unknown chunk type " + type + " detected."); 280 } 281 } 282 283 /** 284 * Read from the current chunk into the given array. 285 * 286 * @return -1 if there is no current chunk or the number of bytes read from the current chunk (which may be -1 if the end of the chunk is reached). 287 */ 288 private int readOnce(final byte[] b, final int off, final int len) throws IOException { 289 int read = -1; 290 if (inUncompressedChunk) { 291 final int amount = Math.min(uncompressedBytesRemaining, len); 292 if (amount == 0) { 293 return -1; 294 } 295 read = inputStream.read(b, off, amount); 296 if (read != -1) { 297 uncompressedBytesRemaining -= read; 298 count(read); 299 } 300 } else if (currentCompressedChunk != null) { 301 final long before = currentCompressedChunk.getBytesRead(); 302 read = currentCompressedChunk.read(b, off, len); 303 if (read == -1) { 304 currentCompressedChunk.close(); 305 currentCompressedChunk = null; 306 } else { 307 count(currentCompressedChunk.getBytesRead() - before); 308 } 309 } 310 if (read > 0) { 311 checksum.update(b, off, read); 312 } 313 return read; 314 } 315 316 private int readOneByte() throws IOException { 317 final int b = inputStream.read(); 318 if (b != -1) { 319 count(1); 320 return b & 0xFF; 321 } 322 return -1; 323 } 324 325 private int readSize() throws IOException { 326 return (int) ByteUtils.fromLittleEndian(supplier, 3); 327 } 328 329 private void readStreamIdentifier() throws IOException { 330 final byte[] b = new byte[10]; 331 final int read = IOUtils.readFully(inputStream, b); 332 count(read); 333 if (10 != read || !matches(b, 10)) { 334 throw new IOException("Not a framed Snappy stream"); 335 } 336 } 337 338 private void skipBlock() throws IOException { 339 final int size = readSize(); 340 if (size < 0) { 341 throw new IOException("Found illegal chunk with negative size"); 342 } 343 final long read = org.apache.commons.io.IOUtils.skip(inputStream, size); 344 count(read); 345 if (read != size) { 346 throw new IOException("Premature end of stream"); 347 } 348 } 349 350 private void verifyLastChecksumAndReset() throws IOException { 351 if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) { 352 throw new IOException("Checksum verification failed"); 353 } 354 expectedChecksum = -1; 355 checksum.reset(); 356 } 357 358}