001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 */
017package org.apache.commons.compress.archivers.zip;
018
019import java.io.Closeable;
020import java.io.File;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InputStream;
024import java.nio.file.Path;
025import java.util.Iterator;
026import java.util.Queue;
027import java.util.concurrent.ConcurrentLinkedQueue;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.zip.Deflater;
030
031import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
032import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
033import org.apache.commons.io.input.BoundedInputStream;
034
035/**
036 * A ZIP output stream that is optimized for multi-threaded scatter/gather construction of ZIP files.
037 * <p>
038 * The internal data format of the entries used by this class are entirely private to this class and are not part of any public api whatsoever.
039 * </p>
040 * <p>
041 * It is possible to extend this class to support different kinds of backing storage, the default implementation only supports file-based backing.
042 * </p>
043 * <p>
044 * Thread safety: This class supports multiple threads. But the "writeTo" method must be called by the thread that originally created the
045 * {@link ZipArchiveEntry}.
046 * </p>
047 *
048 * @since 1.10
049 */
050public class ScatterZipOutputStream implements Closeable {
051
052    private static final class CompressedEntry {
053        final ZipArchiveEntryRequest zipArchiveEntryRequest;
054        final long crc;
055        final long compressedSize;
056        final long size;
057
058        CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) {
059            this.zipArchiveEntryRequest = zipArchiveEntryRequest;
060            this.crc = crc;
061            this.compressedSize = compressedSize;
062            this.size = size;
063        }
064
065        /**
066         * Updates the original {@link ZipArchiveEntry} with sizes/crc. Do not use this method from threads that did not create the instance itself!
067         *
068         * @return the zipArchiveEntry that is the basis for this request.
069         */
070
071        public ZipArchiveEntry transferToArchiveEntry() {
072            final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry();
073            entry.setCompressedSize(compressedSize);
074            entry.setSize(size);
075            entry.setCrc(crc);
076            entry.setMethod(zipArchiveEntryRequest.getMethod());
077            return entry;
078        }
079    }
080
081    public static class ZipEntryWriter implements Closeable {
082        private final Iterator<CompressedEntry> itemsIterator;
083        private final InputStream itemsIteratorData;
084
085        public ZipEntryWriter(final ScatterZipOutputStream scatter) throws IOException {
086            scatter.backingStore.closeForWriting();
087            itemsIterator = scatter.items.iterator();
088            itemsIteratorData = scatter.backingStore.getInputStream();
089        }
090
091        @Override
092        public void close() throws IOException {
093            if (itemsIteratorData != null) {
094                itemsIteratorData.close();
095            }
096        }
097
098        public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException {
099            final CompressedEntry compressedEntry = itemsIterator.next();
100            // @formatter:off
101            try (BoundedInputStream rawStream = BoundedInputStream.builder()
102                    .setInputStream(itemsIteratorData)
103                    .setMaxCount(compressedEntry.compressedSize)
104                    .setPropagateClose(false)
105                    .get()) {
106                target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
107            }
108            // @formatter:on
109        }
110    }
111
112    /**
113     * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
114     *
115     * @param file The file to offload compressed data into.
116     * @return A ScatterZipOutputStream that is ready for use.
117     * @throws FileNotFoundException if the file cannot be found
118     */
119    public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException {
120        return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION);
121    }
122
123    /**
124     * Creates a {@link ScatterZipOutputStream} that is backed by a file
125     *
126     * @param file             The file to offload compressed data into.
127     * @param compressionLevel The compression level to use, @see #Deflater
128     * @return A ScatterZipOutputStream that is ready for use.
129     * @throws FileNotFoundException if the file cannot be found
130     */
131    public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException {
132        return pathBased(file.toPath(), compressionLevel);
133    }
134
135    /**
136     * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
137     *
138     * @param path The path to offload compressed data into.
139     * @return A ScatterZipOutputStream that is ready for use.
140     * @throws FileNotFoundException if the path cannot be found
141     * @since 1.22
142     */
143    public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException {
144        return pathBased(path, Deflater.DEFAULT_COMPRESSION);
145    }
146
147    /**
148     * Creates a {@link ScatterZipOutputStream} that is backed by a file
149     *
150     * @param path             The path to offload compressed data into.
151     * @param compressionLevel The compression level to use, @see #Deflater
152     * @return A ScatterZipOutputStream that is ready for use.
153     * @throws FileNotFoundException if the path cannot be found
154     * @since 1.22
155     */
156    public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException {
157        final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path);
158        // lifecycle is bound to the ScatterZipOutputStream returned
159        final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); // NOSONAR
160        return new ScatterZipOutputStream(bs, sc);
161    }
162
163    private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>();
164
165    private final ScatterGatherBackingStore backingStore;
166
167    private final StreamCompressor streamCompressor;
168
169    private final AtomicBoolean isClosed = new AtomicBoolean();
170
171    private ZipEntryWriter zipEntryWriter;
172
173    public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore, final StreamCompressor streamCompressor) {
174        this.backingStore = backingStore;
175        this.streamCompressor = streamCompressor;
176    }
177
178    /**
179     * Adds an archive entry to this scatter stream.
180     *
181     * @param zipArchiveEntryRequest The entry to write.
182     * @throws IOException If writing fails
183     */
184    public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
185        try (InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
186            streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
187        }
188        items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), streamCompressor.getBytesWrittenForLastEntry(),
189                streamCompressor.getBytesRead()));
190    }
191
192    /**
193     * Closes this stream, freeing all resources involved in the creation of this stream.
194     *
195     * @throws IOException If closing fails
196     */
197    @Override
198    public void close() throws IOException {
199        if (!isClosed.compareAndSet(false, true)) {
200            return;
201        }
202        try {
203            if (zipEntryWriter != null) {
204                zipEntryWriter.close();
205            }
206            backingStore.close();
207        } finally {
208            streamCompressor.close();
209        }
210    }
211
212    /**
213     * Writes the contents of this scatter stream to a target archive.
214     *
215     * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}.
216     * @throws IOException If writing fails
217     * @see #zipEntryWriter()
218     */
219    public void writeTo(final ZipArchiveOutputStream target) throws IOException {
220        backingStore.closeForWriting();
221        try (InputStream data = backingStore.getInputStream()) {
222            for (final CompressedEntry compressedEntry : items) {
223                // @formatter:off
224                try (BoundedInputStream rawStream = BoundedInputStream.builder()
225                        .setInputStream(data)
226                        .setMaxCount(compressedEntry.compressedSize)
227                        .setPropagateClose(false)
228                        .get()) {
229                    target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
230                }
231                // @formatter:on
232            }
233        }
234    }
235
236    /**
237     * Gets a ZIP entry writer for this scatter stream.
238     *
239     * @throws IOException If getting scatter stream input stream
240     * @return the ZipEntryWriter created on first call of the method
241     */
242    public ZipEntryWriter zipEntryWriter() throws IOException {
243        if (zipEntryWriter == null) {
244            zipEntryWriter = new ZipEntryWriter(this);
245        }
246        return zipEntryWriter;
247    }
248}