1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.commons.compress.utils;
20
21 import java.io.FileOutputStream;
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.nio.ByteBuffer;
25 import java.nio.ByteOrder;
26 import java.nio.channels.ClosedChannelException;
27 import java.nio.channels.WritableByteChannel;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public class FixedLengthBlockOutputStream extends OutputStream implements WritableByteChannel {
54
55
56
57
58
59
60 private static final class BufferAtATimeOutputChannel implements WritableByteChannel {
61
62 private final OutputStream out;
63 private final AtomicBoolean closed = new AtomicBoolean();
64
65 private BufferAtATimeOutputChannel(final OutputStream out) {
66 this.out = out;
67 }
68
69 @Override
70 public void close() throws IOException {
71 if (closed.compareAndSet(false, true)) {
72 out.close();
73 }
74 }
75
76 @Override
77 public boolean isOpen() {
78 return !closed.get();
79 }
80
81 @Override
82 public int write(final ByteBuffer buffer) throws IOException {
83 if (!isOpen()) {
84 throw new ClosedChannelException();
85 }
86 if (!buffer.hasArray()) {
87 throw new IOException("Direct buffer somehow written to BufferAtATimeOutputChannel");
88 }
89
90 try {
91 final int pos = buffer.position();
92 final int len = buffer.limit() - pos;
93 out.write(buffer.array(), buffer.arrayOffset() + pos, len);
94 buffer.position(buffer.limit());
95 return len;
96 } catch (final IOException e) {
97 try {
98 close();
99 } catch (final IOException ignored) {
100 }
101 throw e;
102 }
103 }
104
105 }
106
107 private final WritableByteChannel out;
108 private final int blockSize;
109 private final ByteBuffer buffer;
110
111 private final AtomicBoolean closed = new AtomicBoolean();
112
113
114
115
116
117
118
119 public FixedLengthBlockOutputStream(final OutputStream os, final int blockSize) {
120 if (os instanceof FileOutputStream) {
121 final FileOutputStream fileOutputStream = (FileOutputStream) os;
122 out = fileOutputStream.getChannel();
123 buffer = ByteBuffer.allocateDirect(blockSize);
124 } else {
125 out = new BufferAtATimeOutputChannel(os);
126 buffer = ByteBuffer.allocate(blockSize);
127 }
128 this.blockSize = blockSize;
129 }
130
131
132
133
134
135
136
137 public FixedLengthBlockOutputStream(final WritableByteChannel out, final int blockSize) {
138 this.out = out;
139 this.blockSize = blockSize;
140 this.buffer = ByteBuffer.allocateDirect(blockSize);
141 }
142
143 @Override
144 public void close() throws IOException {
145 if (closed.compareAndSet(false, true)) {
146 try {
147 flushBlock();
148 } finally {
149 out.close();
150 }
151 }
152 }
153
154
155
156
157
158
159 public void flushBlock() throws IOException {
160 if (buffer.position() != 0) {
161 padBlock();
162 writeBlock();
163 }
164 }
165
166 @Override
167 public boolean isOpen() {
168 if (!out.isOpen()) {
169 closed.set(true);
170 }
171 return !closed.get();
172 }
173
174 private void maybeFlush() throws IOException {
175 if (!buffer.hasRemaining()) {
176 writeBlock();
177 }
178 }
179
180 private void padBlock() {
181 buffer.order(ByteOrder.nativeOrder());
182 int bytesToWrite = buffer.remaining();
183 if (bytesToWrite > 8) {
184 final int align = buffer.position() & 7;
185 if (align != 0) {
186 final int limit = 8 - align;
187 for (int i = 0; i < limit; i++) {
188 buffer.put((byte) 0);
189 }
190 bytesToWrite -= limit;
191 }
192
193 while (bytesToWrite >= 8) {
194 buffer.putLong(0L);
195 bytesToWrite -= 8;
196 }
197 }
198 while (buffer.hasRemaining()) {
199 buffer.put((byte) 0);
200 }
201 }
202
203 @Override
204 public void write(final byte[] b, final int offset, final int length) throws IOException {
205 if (!isOpen()) {
206 throw new ClosedChannelException();
207 }
208 int off = offset;
209 int len = length;
210 while (len > 0) {
211 final int n = Math.min(len, buffer.remaining());
212 buffer.put(b, off, n);
213 maybeFlush();
214 len -= n;
215 off += n;
216 }
217 }
218
219 @Override
220 public int write(final ByteBuffer src) throws IOException {
221 if (!isOpen()) {
222 throw new ClosedChannelException();
223 }
224 final int srcRemaining = src.remaining();
225
226 if (srcRemaining < buffer.remaining()) {
227
228 buffer.put(src);
229 } else {
230 int srcLeft = srcRemaining;
231 final int savedLimit = src.limit();
232
233
234 if (buffer.position() != 0) {
235 final int n = buffer.remaining();
236 src.limit(src.position() + n);
237 buffer.put(src);
238 writeBlock();
239 srcLeft -= n;
240 }
241
242
243 while (srcLeft >= blockSize) {
244 src.limit(src.position() + blockSize);
245 out.write(src);
246 srcLeft -= blockSize;
247 }
248
249 src.limit(savedLimit);
250 buffer.put(src);
251 }
252 return srcRemaining;
253 }
254
255 @Override
256 public void write(final int b) throws IOException {
257 if (!isOpen()) {
258 throw new ClosedChannelException();
259 }
260 buffer.put((byte) b);
261 maybeFlush();
262 }
263
264 private void writeBlock() throws IOException {
265 buffer.flip();
266 final int i = out.write(buffer);
267 final boolean hasRemaining = buffer.hasRemaining();
268 if (i != blockSize || hasRemaining) {
269 final String msg = String.format("Failed to write %,d bytes atomically. Only wrote %,d", blockSize, i);
270 throw new IOException(msg);
271 }
272 buffer.clear();
273 }
274
275 }