1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.compress.utils;
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.NonWritableChannelException;
25 import java.nio.channels.SeekableByteChannel;
26 import java.nio.file.Files;
27 import java.nio.file.Path;
28 import java.nio.file.StandardOpenOption;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Objects;
34
35
36
37
38
39
40
41
42
43
44
45
46 public class MultiReadOnlySeekableByteChannel implements SeekableByteChannel {
47
48 private static final Path[] EMPTY_PATH_ARRAY = {};
49
50
51
52
53
54
55
56
57
58 public static SeekableByteChannel forFiles(final File... files) throws IOException {
59 final List<Path> paths = new ArrayList<>();
60 for (final File f : Objects.requireNonNull(files, "files")) {
61 paths.add(f.toPath());
62 }
63 return forPaths(paths.toArray(EMPTY_PATH_ARRAY));
64 }
65
66
67
68
69
70
71
72
73
74
75
76
77 public static SeekableByteChannel forPaths(final Path... paths) throws IOException {
78 final List<SeekableByteChannel> channels = new ArrayList<>();
79 for (final Path path : Objects.requireNonNull(paths, "paths")) {
80 channels.add(Files.newByteChannel(path, StandardOpenOption.READ));
81 }
82 if (channels.size() == 1) {
83 return channels.get(0);
84 }
85 return new MultiReadOnlySeekableByteChannel(channels);
86 }
87
88
89
90
91
92
93
94
95 public static SeekableByteChannel forSeekableByteChannels(final SeekableByteChannel... channels) {
96 if (Objects.requireNonNull(channels, "channels").length == 1) {
97 return channels[0];
98 }
99 return new MultiReadOnlySeekableByteChannel(Arrays.asList(channels));
100 }
101
102 private final List<SeekableByteChannel> channelList;
103
104 private long globalPosition;
105
106 private int currentChannelIdx;
107
108
109
110
111
112
113
114 public MultiReadOnlySeekableByteChannel(final List<SeekableByteChannel> channels) {
115 this.channelList = Collections.unmodifiableList(new ArrayList<>(Objects.requireNonNull(channels, "channels")));
116 }
117
118 @Override
119 public void close() throws IOException {
120 IOException first = null;
121 for (final SeekableByteChannel ch : channelList) {
122 try {
123 ch.close();
124 } catch (final IOException ex) {
125 if (first == null) {
126 first = ex;
127 }
128 }
129 }
130 if (first != null) {
131 throw new IOException("failed to close wrapped channel", first);
132 }
133 }
134
135 @Override
136 public boolean isOpen() {
137 return channelList.stream().allMatch(SeekableByteChannel::isOpen);
138 }
139
140
141
142
143
144
145
146
147 @Override
148 public long position() {
149 return globalPosition;
150 }
151
152 @Override
153 public synchronized SeekableByteChannel position(final long newPosition) throws IOException {
154 if (newPosition < 0) {
155 throw new IllegalArgumentException("Negative position: " + newPosition);
156 }
157 if (!isOpen()) {
158 throw new ClosedChannelException();
159 }
160 globalPosition = newPosition;
161 long pos = newPosition;
162 for (int i = 0; i < channelList.size(); i++) {
163 final SeekableByteChannel currentChannel = channelList.get(i);
164 final long size = currentChannel.size();
165
166 final long newChannelPos;
167 if (pos == -1L) {
168
169
170 newChannelPos = 0;
171 } else if (pos <= size) {
172
173 currentChannelIdx = i;
174 final long tmp = pos;
175 pos = -1L;
176 newChannelPos = tmp;
177 } else {
178
179
180
181 pos -= size;
182 newChannelPos = size;
183 }
184 currentChannel.position(newChannelPos);
185 }
186 return this;
187 }
188
189
190
191
192
193
194
195
196
197 public synchronized SeekableByteChannel position(final long channelNumber, final long relativeOffset) throws IOException {
198 if (!isOpen()) {
199 throw new ClosedChannelException();
200 }
201 long globalPosition = relativeOffset;
202 for (int i = 0; i < channelNumber; i++) {
203 globalPosition += channelList.get(i).size();
204 }
205
206 return position(globalPosition);
207 }
208
209 @Override
210 public synchronized int read(final ByteBuffer dst) throws IOException {
211 if (!isOpen()) {
212 throw new ClosedChannelException();
213 }
214 if (!dst.hasRemaining()) {
215 return 0;
216 }
217
218 int totalBytesRead = 0;
219 while (dst.hasRemaining() && currentChannelIdx < channelList.size()) {
220 final SeekableByteChannel currentChannel = channelList.get(currentChannelIdx);
221 final int newBytesRead = currentChannel.read(dst);
222 if (newBytesRead == -1) {
223
224 currentChannelIdx += 1;
225 continue;
226 }
227 if (currentChannel.position() >= currentChannel.size()) {
228
229 currentChannelIdx++;
230 }
231 totalBytesRead += newBytesRead;
232 }
233 if (totalBytesRead > 0) {
234 globalPosition += totalBytesRead;
235 return totalBytesRead;
236 }
237 return -1;
238 }
239
240 @Override
241 public long size() throws IOException {
242 if (!isOpen()) {
243 throw new ClosedChannelException();
244 }
245 long acc = 0;
246 for (final SeekableByteChannel ch : channelList) {
247 acc += ch.size();
248 }
249 return acc;
250 }
251
252
253
254
255 @Override
256 public SeekableByteChannel truncate(final long size) {
257 throw new NonWritableChannelException();
258 }
259
260
261
262
263 @Override
264 public int write(final ByteBuffer src) {
265 throw new NonWritableChannelException();
266 }
267
268 }