001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.compressors.snappy; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024 025import org.apache.commons.codec.digest.PureJavaCrc32C; 026import org.apache.commons.compress.compressors.CompressorOutputStream; 027import org.apache.commons.compress.compressors.lz77support.Parameters; 028import org.apache.commons.compress.utils.ByteUtils; 029 030/** 031 * CompressorOutputStream for the framing Snappy format. 032 * 033 * <p> 034 * Based on the "spec" in the version "Last revised: 2013-10-25" 035 * </p> 036 * 037 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a> 038 * @since 1.14 039 * @NotThreadSafe 040 */ 041public class FramedSnappyCompressorOutputStream extends CompressorOutputStream<OutputStream> { 042 // see spec: 043 // > However, we place an additional restriction that the uncompressed data 044 // > in a chunk must be no longer than 65,536 bytes. This allows consumers to 045 // > easily use small fixed-size buffers. 046 private static final int MAX_COMPRESSED_BUFFER_SIZE = 1 << 16; 047 048 static long mask(long x) { 049 // ugly, maybe we should just have used ints and deal with the 050 // overflow 051 x = x >> 15 | x << 17; 052 x += FramedSnappyCompressorInputStream.MASK_OFFSET; 053 x &= 0xffffFFFFL; 054 return x; 055 } 056 057 private final Parameters params; 058 private final PureJavaCrc32C checksum = new PureJavaCrc32C(); 059 // used in one-arg write method 060 private final byte[] oneByte = new byte[1]; 061 private final byte[] buffer = new byte[MAX_COMPRESSED_BUFFER_SIZE]; 062 063 private int currentIndex; 064 065 private final ByteUtils.ByteConsumer consumer; 066 067 /** 068 * Constructs a new output stream that compresses snappy-framed-compressed data to the specified output stream. 069 * 070 * @param out the OutputStream to which to write the compressed data 071 * @throws IOException if writing the signature fails 072 */ 073 public FramedSnappyCompressorOutputStream(final OutputStream out) throws IOException { 074 this(out, SnappyCompressorOutputStream.createParameterBuilder(SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE).build()); 075 } 076 077 /** 078 * Constructs a new output stream that compresses snappy-framed-compressed data to the specified output stream. 079 * 080 * @param out the OutputStream to which to write the compressed data 081 * @param params parameters used to fine-tune compression, in particular to balance compression ratio vs compression speed. 082 * @throws IOException if writing the signature fails 083 */ 084 public FramedSnappyCompressorOutputStream(final OutputStream out, final Parameters params) throws IOException { 085 super(out); 086 this.params = params; 087 consumer = new ByteUtils.OutputStreamByteConsumer(out); 088 out.write(FramedSnappyCompressorInputStream.SZ_SIGNATURE); 089 } 090 091 @Override 092 public void close() throws IOException { 093 try { 094 finish(); 095 } finally { 096 out.close(); 097 } 098 } 099 100 /** 101 * Compresses all remaining data and writes it to the stream, doesn't close the underlying stream. 102 * 103 * @throws IOException if an error occurs 104 */ 105 public void finish() throws IOException { 106 flushBuffer(); 107 } 108 109 private void flushBuffer() throws IOException { 110 if (currentIndex == 0) { 111 return; 112 } 113 out.write(FramedSnappyCompressorInputStream.COMPRESSED_CHUNK_TYPE); 114 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 115 try (OutputStream o = new SnappyCompressorOutputStream(baos, currentIndex, params)) { 116 o.write(buffer, 0, currentIndex); 117 } 118 final byte[] b = baos.toByteArray(); 119 writeLittleEndian(3, b.length + 4L /* CRC */); 120 writeCrc(); 121 out.write(b); 122 currentIndex = 0; 123 } 124 125 @Override 126 public void write(final byte[] data, int off, int len) throws IOException { 127 int blockDataRemaining = buffer.length - currentIndex; 128 while (len > 0) { 129 final int copyLen = Math.min(len, blockDataRemaining); 130 System.arraycopy(data, off, buffer, currentIndex, copyLen); 131 off += copyLen; 132 blockDataRemaining -= copyLen; 133 len -= copyLen; 134 currentIndex += copyLen; 135 if (blockDataRemaining == 0) { 136 flushBuffer(); 137 blockDataRemaining = buffer.length; 138 } 139 } 140 } 141 142 @Override 143 public void write(final int b) throws IOException { 144 oneByte[0] = (byte) (b & 0xff); 145 write(oneByte); 146 } 147 148 private void writeCrc() throws IOException { 149 checksum.update(buffer, 0, currentIndex); 150 writeLittleEndian(4, mask(checksum.getValue())); 151 checksum.reset(); 152 } 153 154 private void writeLittleEndian(final int numBytes, final long num) throws IOException { 155 ByteUtils.toLittleEndian(consumer, num, numBytes); 156 } 157}