1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.compress.archivers.zip;
18
19 import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
20
21 import java.io.IOException;
22 import java.io.UncheckedIOException;
23 import java.util.Deque;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ConcurrentLinkedDeque;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.TimeUnit;
31 import java.util.zip.Deflater;
32
33 import org.apache.commons.compress.parallel.InputStreamSupplier;
34 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
35 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public class ParallelScatterZipCreator {
52
53 private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
54 private final ExecutorService executorService;
55 private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
56
57 private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
58 private final long startedAt = System.currentTimeMillis();
59 private long compressionDoneAt;
60 private long scatterDoneAt;
61
62 private final int compressionLevel;
63
64 private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
65 @Override
66 protected ScatterZipOutputStream initialValue() {
67 try {
68 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
69 streams.add(scatterStream);
70 return scatterStream;
71 } catch (final IOException e) {
72 throw new UncheckedIOException(e);
73 }
74 }
75 };
76
77
78
79
80
81 public ParallelScatterZipCreator() {
82 this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
83 }
84
85
86
87
88
89
90 public ParallelScatterZipCreator(final ExecutorService executorService) {
91 this(executorService, new DefaultBackingStoreSupplier(null));
92 }
93
94
95
96
97
98
99
100 public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
101 this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION);
102 }
103
104
105
106
107
108
109
110
111
112
113 public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier,
114 final int compressionLevel) throws IllegalArgumentException {
115 if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) && compressionLevel != Deflater.DEFAULT_COMPRESSION) {
116 throw new IllegalArgumentException("Compression level is expected between -1~9");
117 }
118
119 this.backingStoreSupplier = backingStoreSupplier;
120 this.executorService = executorService;
121 this.compressionLevel = compressionLevel;
122 }
123
124
125
126
127
128
129
130
131
132
133
134 public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
135 submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
136 }
137
138
139
140
141
142
143
144
145
146
147 public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
148 submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
149 }
150
151 private void closeAll() {
152 for (final ScatterZipOutputStream scatterStream : streams) {
153 try {
154 scatterStream.close();
155 } catch (final IOException ignored) {
156
157 }
158 }
159 }
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
180 final int method = zipArchiveEntry.getMethod();
181 if (method == ZipMethod.UNKNOWN_CODE) {
182 throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
183 }
184 final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
185 return () -> {
186 final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
187 scatterStream.addArchiveEntry(zipArchiveEntryRequest);
188 return scatterStream;
189 };
190 }
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
210 return () -> {
211 final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
212 scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get());
213 return scatterStream;
214 };
215 }
216
217 @SuppressWarnings("resource")
218 private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
219 final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
220
221 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs);
222 return new ScatterZipOutputStream(bs, sc);
223 }
224
225
226
227
228
229
230 public ScatterStatistics getStatisticsMessage() {
231 return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
232 }
233
234
235
236
237
238
239
240
241 public final void submit(final Callable<? extends Object> callable) {
242 submitStreamAwareCallable(() -> {
243 callable.call();
244 return tlScatterStreams.get();
245 });
246 }
247
248
249
250
251
252
253
254
255
256 public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
257 futures.add(executorService.submit(callable));
258 }
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275 public void writeTo(final ZipArchiveOutputStream targetStream) throws IOException, InterruptedException, ExecutionException {
276
277 try {
278
279 try {
280 for (final Future<?> future : futures) {
281 future.get();
282 }
283 } finally {
284 executorService.shutdown();
285 }
286
287 executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS);
288
289
290 compressionDoneAt = System.currentTimeMillis();
291
292 for (final Future<? extends ScatterZipOutputStream> future : futures) {
293 final ScatterZipOutputStream scatterStream = future.get();
294 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
295 }
296
297 for (final ScatterZipOutputStream scatterStream : streams) {
298 scatterStream.close();
299 }
300
301 scatterDoneAt = System.currentTimeMillis();
302 } finally {
303 closeAll();
304 }
305 }
306 }