View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *   http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.compress.utils;
19  
20  import java.io.File;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.ClosedChannelException;
24  import java.nio.channels.NonWritableChannelException;
25  import java.nio.channels.SeekableByteChannel;
26  import java.nio.file.Files;
27  import java.nio.file.Path;
28  import java.nio.file.StandardOpenOption;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.List;
33  import java.util.Objects;
34  
35  /**
36   * Implements a read-only {@link SeekableByteChannel} that concatenates a collection of other {@link SeekableByteChannel}s.
37   * <p>
38   * This is a lose port of <a href=
39   * "https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/main/scala/fm/common/MultiReadOnlySeekableByteChannel.scala">
40   * MultiReadOnlySeekableByteChannel</a>
41   * by Tim Underwood.
42   * </p>
43   *
44   * @since 1.19
45   */
46  public class MultiReadOnlySeekableByteChannel implements SeekableByteChannel {
47  
48      private static final Path[] EMPTY_PATH_ARRAY = {};
49  
50      /**
51       * Concatenates the given files.
52       *
53       * @param files the files to concatenate
54       * @throws NullPointerException if files is null
55       * @throws IOException          if opening a channel for one of the files fails
56       * @return SeekableByteChannel that concatenates all provided files
57       */
58      public static SeekableByteChannel forFiles(final File... files) throws IOException {
59          final List<Path> paths = new ArrayList<>();
60          for (final File f : Objects.requireNonNull(files, "files")) {
61              paths.add(f.toPath());
62          }
63          return forPaths(paths.toArray(EMPTY_PATH_ARRAY));
64      }
65  
66      /**
67       * Concatenates the given file paths.
68       *
69       * @param paths the file paths to concatenate, note that the LAST FILE of files should be the LAST SEGMENT(.zip) and these files should be added in correct
70       *              order (e.g.: .z01, .z02... .z99, .zip)
71       * @return SeekableByteChannel that concatenates all provided files
72       * @throws NullPointerException if files is null
73       * @throws IOException          if opening a channel for one of the files fails
74       * @throws IOException          if the first channel doesn't seem to hold the beginning of a split archive
75       * @since 1.22
76       */
77      public static SeekableByteChannel forPaths(final Path... paths) throws IOException {
78          final List<SeekableByteChannel> channels = new ArrayList<>();
79          for (final Path path : Objects.requireNonNull(paths, "paths")) {
80              channels.add(Files.newByteChannel(path, StandardOpenOption.READ));
81          }
82          if (channels.size() == 1) {
83              return channels.get(0);
84          }
85          return new MultiReadOnlySeekableByteChannel(channels);
86      }
87  
88      /**
89       * Concatenates the given channels.
90       *
91       * @param channels the channels to concatenate
92       * @throws NullPointerException if channels is null
93       * @return SeekableByteChannel that concatenates all provided channels
94       */
95      public static SeekableByteChannel forSeekableByteChannels(final SeekableByteChannel... channels) {
96          if (Objects.requireNonNull(channels, "channels").length == 1) {
97              return channels[0];
98          }
99          return new MultiReadOnlySeekableByteChannel(Arrays.asList(channels));
100     }
101 
102     private final List<SeekableByteChannel> channelList;
103 
104     private long globalPosition;
105 
106     private int currentChannelIdx;
107 
108     /**
109      * Concatenates the given channels.
110      *
111      * @param channels the channels to concatenate
112      * @throws NullPointerException if channels is null
113      */
114     public MultiReadOnlySeekableByteChannel(final List<SeekableByteChannel> channels) {
115         this.channelList = Collections.unmodifiableList(new ArrayList<>(Objects.requireNonNull(channels, "channels")));
116     }
117 
118     @Override
119     public void close() throws IOException {
120         IOException first = null;
121         for (final SeekableByteChannel ch : channelList) {
122             try {
123                 ch.close();
124             } catch (final IOException ex) {
125                 if (first == null) {
126                     first = ex;
127                 }
128             }
129         }
130         if (first != null) {
131             throw new IOException("failed to close wrapped channel", first);
132         }
133     }
134 
135     @Override
136     public boolean isOpen() {
137         return channelList.stream().allMatch(SeekableByteChannel::isOpen);
138     }
139 
140     /**
141      * Gets this channel's position.
142      * <p>
143      * This method violates the contract of {@link SeekableByteChannel#position()} as it will not throw any exception when invoked on a closed channel. Instead
144      * it will return the position the channel had when close has been called.
145      * </p>
146      */
147     @Override
148     public long position() {
149         return globalPosition;
150     }
151 
152     @Override
153     public synchronized SeekableByteChannel position(final long newPosition) throws IOException {
154         if (newPosition < 0) {
155             throw new IllegalArgumentException("Negative position: " + newPosition);
156         }
157         if (!isOpen()) {
158             throw new ClosedChannelException();
159         }
160         globalPosition = newPosition;
161         long pos = newPosition;
162         for (int i = 0; i < channelList.size(); i++) {
163             final SeekableByteChannel currentChannel = channelList.get(i);
164             final long size = currentChannel.size();
165 
166             final long newChannelPos;
167             if (pos == -1L) {
168                 // Position is already set for the correct channel,
169                 // the rest of the channels get reset to 0
170                 newChannelPos = 0;
171             } else if (pos <= size) {
172                 // This channel is where we want to be
173                 currentChannelIdx = i;
174                 final long tmp = pos;
175                 pos = -1L; // Mark pos as already being set
176                 newChannelPos = tmp;
177             } else {
178                 // newPosition is past this channel. Set channel
179                 // position to the end and substract channel size from
180                 // pos
181                 pos -= size;
182                 newChannelPos = size;
183             }
184             currentChannel.position(newChannelPos);
185         }
186         return this;
187     }
188 
189     /**
190      * Sets the position based on the given channel number and relative offset
191      *
192      * @param channelNumber  the channel number
193      * @param relativeOffset the relative offset in the corresponding channel
194      * @return global position of all channels as if they are a single channel
195      * @throws IOException if positioning fails
196      */
197     public synchronized SeekableByteChannel position(final long channelNumber, final long relativeOffset) throws IOException {
198         if (!isOpen()) {
199             throw new ClosedChannelException();
200         }
201         long globalPosition = relativeOffset;
202         for (int i = 0; i < channelNumber; i++) {
203             globalPosition += channelList.get(i).size();
204         }
205 
206         return position(globalPosition);
207     }
208 
209     @Override
210     public synchronized int read(final ByteBuffer dst) throws IOException {
211         if (!isOpen()) {
212             throw new ClosedChannelException();
213         }
214         if (!dst.hasRemaining()) {
215             return 0;
216         }
217 
218         int totalBytesRead = 0;
219         while (dst.hasRemaining() && currentChannelIdx < channelList.size()) {
220             final SeekableByteChannel currentChannel = channelList.get(currentChannelIdx);
221             final int newBytesRead = currentChannel.read(dst);
222             if (newBytesRead == -1) {
223                 // EOF for this channel -- advance to next channel idx
224                 currentChannelIdx += 1;
225                 continue;
226             }
227             if (currentChannel.position() >= currentChannel.size()) {
228                 // we are at the end of the current channel
229                 currentChannelIdx++;
230             }
231             totalBytesRead += newBytesRead;
232         }
233         if (totalBytesRead > 0) {
234             globalPosition += totalBytesRead;
235             return totalBytesRead;
236         }
237         return -1;
238     }
239 
240     @Override
241     public long size() throws IOException {
242         if (!isOpen()) {
243             throw new ClosedChannelException();
244         }
245         long acc = 0;
246         for (final SeekableByteChannel ch : channelList) {
247             acc += ch.size();
248         }
249         return acc;
250     }
251 
252     /**
253      * @throws NonWritableChannelException since this implementation is read-only.
254      */
255     @Override
256     public SeekableByteChannel truncate(final long size) {
257         throw new NonWritableChannelException();
258     }
259 
260     /**
261      * @throws NonWritableChannelException since this implementation is read-only.
262      */
263     @Override
264     public int write(final ByteBuffer src) {
265         throw new NonWritableChannelException();
266     }
267 
268 }