001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.commons.io.function;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.List;
025import java.util.NoSuchElementException;
026import java.util.Objects;
027import java.util.Optional;
028import java.util.Spliterator;
029import java.util.Spliterators;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.concurrent.atomic.AtomicReference;
032import java.util.function.BiFunction;
033import java.util.function.IntFunction;
034import java.util.function.ToDoubleFunction;
035import java.util.function.ToIntFunction;
036import java.util.function.ToLongFunction;
037import java.util.function.UnaryOperator;
038import java.util.stream.Collector;
039import java.util.stream.DoubleStream;
040import java.util.stream.IntStream;
041import java.util.stream.LongStream;
042import java.util.stream.Stream;
043import java.util.stream.StreamSupport;
044
045import org.apache.commons.io.IOExceptionList;
046
047/**
048 * Like {@link Stream} but throws {@link IOException}.
049 *
050 * @param <T> the type of the stream elements.
051 * @since 2.12.0
052 */
053public interface IOStream<T> extends IOBaseStream<T, IOStream<T>, Stream<T>> {
054
055    /**
056     * Constructs a new IOStream for the given Stream.
057     *
058     * @param <T> the type of the stream elements.
059     * @param stream The stream to delegate.
060     * @return a new IOStream.
061     */
062    static <T> IOStream<T> adapt(final Stream<T> stream) {
063        return IOStreamAdapter.adapt(stream);
064    }
065
066    /**
067     * This class' version of {@link Stream#empty()}.
068     *
069     * @param <T> the type of the stream elements
070     * @return an empty sequential {@code IOStreamImpl}.
071     * @see Stream#empty()
072     */
073    static <T> IOStream<T> empty() {
074        return IOStreamAdapter.adapt(Stream.empty());
075    }
076
077    /**
078     * Like {@link Stream#iterate(Object, UnaryOperator)} but for IO.
079     *
080     * @param <T> the type of stream elements.
081     * @param seed the initial element.
082     * @param f a function to be applied to the previous element to produce a new element.
083     * @return a new sequential {@code IOStream}.
084     */
085    static <T> IOStream<T> iterate(final T seed, final IOUnaryOperator<T> f) {
086        Objects.requireNonNull(f);
087        final Iterator<T> iterator = new Iterator<T>() {
088            @SuppressWarnings("unchecked")
089            T t = (T) IOStreams.NONE;
090
091            @Override
092            public boolean hasNext() {
093                return true;
094            }
095
096            @Override
097            public T next() throws NoSuchElementException {
098                try {
099                    return t = t == IOStreams.NONE ? seed : f.apply(t);
100                } catch (final IOException e) {
101                    final NoSuchElementException nsee = new NoSuchElementException();
102                    nsee.initCause(e);
103                    throw nsee;
104                }
105            }
106        };
107        return adapt(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED | Spliterator.IMMUTABLE), false));
108    }
109
110    /**
111     * Null-safe version of {@link StreamSupport#stream(java.util.Spliterator, boolean)}.
112     *
113     * Copied from Apache Commons Lang.
114     *
115     * @param <T> the type of stream elements.
116     * @param values the elements of the new stream, may be {@code null}.
117     * @return the new stream on {@code values} or {@link Stream#empty()}.
118     */
119    static <T> IOStream<T> of(final Iterable<T> values) {
120        return values == null ? empty() : adapt(StreamSupport.stream(values.spliterator(), false));
121    }
122
123    /**
124     * Null-safe version of {@link Stream#of(Object[])} for an IO stream.
125     *
126     * @param <T> the type of stream elements.
127     * @param values the elements of the new stream, may be {@code null}.
128     * @return the new stream on {@code values} or {@link Stream#empty()}.
129     */
130    @SafeVarargs // Creating a stream from an array is safe
131    static <T> IOStream<T> of(final T... values) {
132        return values == null || values.length == 0 ? empty() : adapt(Arrays.stream(values));
133    }
134
135    /**
136     * Returns a sequential {@code IOStreamImpl} containing a single element.
137     *
138     * @param t the single element
139     * @param <T> the type of stream elements
140     * @return a singleton sequential stream
141     */
142    static <T> IOStream<T> of(final T t) {
143        return adapt(Stream.of(t));
144    }
145
146    /**
147     * Like {@link Stream#allMatch(java.util.function.Predicate)} but throws {@link IOException}.
148     *
149     * @param predicate {@link Stream#allMatch(java.util.function.Predicate)}.
150     * @return Like {@link Stream#allMatch(java.util.function.Predicate)}.
151     * @throws IOException if an I/O error occurs.
152     */
153    @SuppressWarnings("unused") // thrown by Erase.
154    default boolean allMatch(final IOPredicate<? super T> predicate) throws IOException {
155        return unwrap().allMatch(t -> Erase.test(predicate, t));
156    }
157
158    /**
159     * Like {@link Stream#anyMatch(java.util.function.Predicate)} but throws {@link IOException}.
160     *
161     * @param predicate {@link Stream#anyMatch(java.util.function.Predicate)}.
162     * @return Like {@link Stream#anyMatch(java.util.function.Predicate)}.
163     * @throws IOException if an I/O error occurs.
164     */
165    @SuppressWarnings("unused") // thrown by Erase.
166    default boolean anyMatch(final IOPredicate<? super T> predicate) throws IOException {
167        return unwrap().anyMatch(t -> Erase.test(predicate, t));
168    }
169
170    /**
171     * TODO Package-private for now, needs IOCollector?
172     *
173     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
174     * would be ideal to have only one.
175     *
176     * Like {@link Stream#collect(Collector)}.
177     *
178     * Package private for now.
179     *
180     * @param <R> Like {@link Stream#collect(Collector)}.
181     * @param <A> Like {@link Stream#collect(Collector)}.
182     * @param collector Like {@link Stream#collect(Collector)}.
183     * @return Like {@link Stream#collect(Collector)}.
184     */
185    default <R, A> R collect(final Collector<? super T, A, R> collector) {
186        return unwrap().collect(collector);
187    }
188
189    /**
190     * Like
191     * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
192     *
193     * @param <R> Like
194     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
195     * @param supplier Like
196     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
197     * @param accumulator Like
198     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
199     * @param combiner Like
200     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
201     * @return Like
202     *         {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
203     * @throws IOException if an I/O error occurs.
204     */
205    @SuppressWarnings("unused") // thrown by Erase.
206    default <R> R collect(final IOSupplier<R> supplier, final IOBiConsumer<R, ? super T> accumulator, final IOBiConsumer<R, R> combiner) throws IOException {
207        return unwrap().collect(() -> Erase.get(supplier), (t, u) -> Erase.accept(accumulator, t, u), (t, u) -> Erase.accept(combiner, t, u));
208    }
209
210    /**
211     * Like {@link Stream#count()}.
212     *
213     * @return Like {@link Stream#count()}.
214     */
215    default long count() {
216        return unwrap().count();
217    }
218
219    /**
220     * Like {@link Stream#distinct()}.
221     *
222     * @return Like {@link Stream#distinct()}.
223     */
224    default IOStream<T> distinct() {
225        return adapt(unwrap().distinct());
226    }
227
228    /**
229     * Like {@link Stream#filter(java.util.function.Predicate)}.
230     *
231     * @param predicate Like {@link Stream#filter(java.util.function.Predicate)}.
232     * @return Like {@link Stream#filter(java.util.function.Predicate)}.
233     * @throws IOException if an I/O error occurs.
234     */
235    @SuppressWarnings("unused") // thrown by Erase.
236    default IOStream<T> filter(final IOPredicate<? super T> predicate) throws IOException {
237        return adapt(unwrap().filter(t -> Erase.test(predicate, t)));
238    }
239
240    /**
241     * Like {@link Stream#findAny()}.
242     *
243     * @return Like {@link Stream#findAny()}.
244     */
245    default Optional<T> findAny() {
246        return unwrap().findAny();
247    }
248
249    /**
250     * Like {@link Stream#findFirst()}.
251     *
252     * @return Like {@link Stream#findFirst()}.
253     */
254    default Optional<T> findFirst() {
255        return unwrap().findFirst();
256    }
257
258    /**
259     * Like {@link Stream#flatMap(java.util.function.Function)}.
260     *
261     * @param <R> Like {@link Stream#flatMap(java.util.function.Function)}.
262     * @param mapper Like {@link Stream#flatMap(java.util.function.Function)}.
263     * @return Like {@link Stream#flatMap(java.util.function.Function)}.
264     * @throws IOException if an I/O error occurs.
265     */
266    @SuppressWarnings({ "unused", "resource" }) // thrown by Erase; resource closed by caller.
267    default <R> IOStream<R> flatMap(final IOFunction<? super T, ? extends IOStream<? extends R>> mapper) throws IOException {
268        return adapt(unwrap().flatMap(t -> Erase.apply(mapper, t).unwrap()));
269    }
270
271    /**
272     * TODO Package-private for now, needs IODoubleStream?
273     *
274     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
275     * would be ideal to have only one.
276     *
277     * Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
278     *
279     * @param mapper Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
280     * @return Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
281     * @throws IOException if an I/O error occurs.
282     */
283    @SuppressWarnings("unused") // thrown by Erase.
284    default DoubleStream flatMapToDouble(final IOFunction<? super T, ? extends DoubleStream> mapper) throws IOException {
285        return unwrap().flatMapToDouble(t -> Erase.apply(mapper, t));
286    }
287
288    /**
289     * TODO Package-private for now, needs IOIntStream?
290     *
291     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
292     * would be ideal to have only one.
293     *
294     * Like {@link Stream#flatMapToInt(java.util.function.Function)}.
295     *
296     * @param mapper Like {@link Stream#flatMapToInt(java.util.function.Function)}.
297     * @return Like {@link Stream#flatMapToInt(java.util.function.Function)}.
298     * @throws IOException if an I/O error occurs.
299     */
300    @SuppressWarnings("unused") // thrown by Erase.
301    default IntStream flatMapToInt(final IOFunction<? super T, ? extends IntStream> mapper) throws IOException {
302        return unwrap().flatMapToInt(t -> Erase.apply(mapper, t));
303    }
304
305    /**
306     * TODO Package-private for now, needs IOLongStream?
307     *
308     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
309     * would be ideal to have only one.
310     *
311     * Like {@link Stream#flatMapToLong(java.util.function.Function)}.
312     *
313     * @param mapper Like {@link Stream#flatMapToLong(java.util.function.Function)}.
314     * @return Like {@link Stream#flatMapToLong(java.util.function.Function)}.
315     * @throws IOException if an I/O error occurs.
316     */
317    @SuppressWarnings("unused") // thrown by Erase.
318    default LongStream flatMapToLong(final IOFunction<? super T, ? extends LongStream> mapper) throws IOException {
319        return unwrap().flatMapToLong(t -> Erase.apply(mapper, t));
320    }
321
322    /**
323     * Performs an action for each element gathering any exceptions.
324     *
325     * @param action The action to apply to each element.
326     * @throws IOExceptionList if any I/O errors occur.
327     */
328    default void forAll(final IOConsumer<T> action) throws IOExceptionList {
329        forAll(action, (i, e) -> e);
330    }
331
332    /**
333     * Performs an action for each element gathering any exceptions.
334     *
335     * @param action The action to apply to each element.
336     * @param exSupplier The exception supplier.
337     * @throws IOExceptionList if any I/O errors occur.
338     */
339    default void forAll(final IOConsumer<T> action, final BiFunction<Integer, IOException, IOException> exSupplier) throws IOExceptionList {
340        final AtomicReference<List<IOException>> causeList = new AtomicReference<>();
341        final AtomicInteger index = new AtomicInteger();
342        final IOConsumer<T> safeAction = IOStreams.toIOConsumer(action);
343        unwrap().forEach(e -> {
344            try {
345                safeAction.accept(e);
346            } catch (final IOException innerEx) {
347                if (causeList.get() == null) {
348                    // Only allocate if required
349                    causeList.set(new ArrayList<>());
350                }
351                if (exSupplier != null) {
352                    causeList.get().add(exSupplier.apply(index.get(), innerEx));
353                }
354            }
355            index.incrementAndGet();
356        });
357        IOExceptionList.checkEmpty(causeList.get(), null);
358    }
359
360    /**
361     * Like {@link Stream#forEach(java.util.function.Consumer)} but throws {@link IOException}.
362     *
363     * @param action Like {@link Stream#forEach(java.util.function.Consumer)}.
364     * @throws IOException if an I/O error occurs.
365     */
366    @SuppressWarnings("unused") // thrown by Erase.
367    default void forEach(final IOConsumer<? super T> action) throws IOException {
368        unwrap().forEach(e -> Erase.accept(action, e));
369    }
370
371    /**
372     * Like {@link Stream#forEachOrdered(java.util.function.Consumer)}.
373     *
374     * @param action Like {@link Stream#forEachOrdered(java.util.function.Consumer)}.
375     * @throws IOException if an I/O error occurs.
376     */
377    @SuppressWarnings("unused") // thrown by Erase.
378    default void forEachOrdered(final IOConsumer<? super T> action) throws IOException {
379        unwrap().forEachOrdered(e -> Erase.accept(action, e));
380    }
381
382    /**
383     * Like {@link Stream#limit(long)}.
384     *
385     * @param maxSize Like {@link Stream#limit(long)}.
386     * @return Like {@link Stream#limit(long)}.
387     */
388    default IOStream<T> limit(final long maxSize) {
389        return adapt(unwrap().limit(maxSize));
390    }
391
392    /**
393     * Like {@link Stream#map(java.util.function.Function)}.
394     *
395     * @param <R> Like {@link Stream#map(java.util.function.Function)}.
396     * @param mapper Like {@link Stream#map(java.util.function.Function)}.
397     * @return Like {@link Stream#map(java.util.function.Function)}.
398     * @throws IOException if an I/O error occurs.
399     */
400    @SuppressWarnings("unused") // thrown by Erase.
401    default <R> IOStream<R> map(final IOFunction<? super T, ? extends R> mapper) throws IOException {
402        return adapt(unwrap().map(t -> Erase.apply(mapper, t)));
403    }
404
405    /**
406     * TODO Package-private for now, needs IOToDoubleFunction?
407     *
408     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
409     * would be ideal to have only one.
410     *
411     * Like {@link Stream#mapToDouble(ToDoubleFunction)}.
412     *
413     * Package private for now.
414     *
415     * @param mapper Like {@link Stream#mapToDouble(ToDoubleFunction)}.
416     * @return Like {@link Stream#mapToDouble(ToDoubleFunction)}.
417     */
418    default DoubleStream mapToDouble(final ToDoubleFunction<? super T> mapper) {
419        return unwrap().mapToDouble(mapper);
420    }
421
422    /**
423     * TODO Package-private for now, needs IOToIntFunction?
424     *
425     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
426     * would be ideal to have only one.
427     *
428     * Like {@link Stream#mapToInt(ToIntFunction)}.
429     *
430     * Package private for now.
431     *
432     * @param mapper Like {@link Stream#mapToInt(ToIntFunction)}.
433     * @return Like {@link Stream#mapToInt(ToIntFunction)}.
434     */
435    default IntStream mapToInt(final ToIntFunction<? super T> mapper) {
436        return unwrap().mapToInt(mapper);
437    }
438
439    /**
440     * TODO Package-private for now, needs IOToLongFunction?
441     *
442     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
443     * would be ideal to have only one.
444     *
445     * Like {@link Stream#mapToLong(ToLongFunction)}.
446     *
447     * Package private for now.
448     *
449     * @param mapper Like {@link Stream#mapToLong(ToLongFunction)}.
450     * @return Like {@link Stream#mapToLong(ToLongFunction)}.
451     */
452    default LongStream mapToLong(final ToLongFunction<? super T> mapper) {
453        return unwrap().mapToLong(mapper);
454    }
455
456    /**
457     * Like {@link Stream#max(java.util.Comparator)}.
458     *
459     * @param comparator Like {@link Stream#max(java.util.Comparator)}.
460     * @return Like {@link Stream#max(java.util.Comparator)}.
461     * @throws IOException if an I/O error occurs.
462     */
463    @SuppressWarnings("unused") // thrown by Erase.
464    default Optional<T> max(final IOComparator<? super T> comparator) throws IOException {
465        return unwrap().max((t, u) -> Erase.compare(comparator, t, u));
466    }
467
468    /**
469     * Like {@link Stream#min(java.util.Comparator)}.
470     *
471     * @param comparator Like {@link Stream#min(java.util.Comparator)}.
472     * @return Like {@link Stream#min(java.util.Comparator)}.
473     * @throws IOException if an I/O error occurs.
474     */
475    @SuppressWarnings("unused") // thrown by Erase.
476    default Optional<T> min(final IOComparator<? super T> comparator) throws IOException {
477        return unwrap().min((t, u) -> Erase.compare(comparator, t, u));
478    }
479
480    /**
481     * Like {@link Stream#noneMatch(java.util.function.Predicate)}.
482     *
483     * @param predicate Like {@link Stream#noneMatch(java.util.function.Predicate)}.
484     * @return Like {@link Stream#noneMatch(java.util.function.Predicate)}.
485     * @throws IOException if an I/O error occurs.
486     */
487    @SuppressWarnings("unused") // thrown by Erase.
488    default boolean noneMatch(final IOPredicate<? super T> predicate) throws IOException {
489        return unwrap().noneMatch(t -> Erase.test(predicate, t));
490    }
491
492    /**
493     * Like {@link Stream#peek(java.util.function.Consumer)}.
494     *
495     * @param action Like {@link Stream#peek(java.util.function.Consumer)}.
496     * @return Like {@link Stream#peek(java.util.function.Consumer)}.
497     * @throws IOException if an I/O error occurs.
498     */
499    @SuppressWarnings("unused") // thrown by Erase.
500    default IOStream<T> peek(final IOConsumer<? super T> action) throws IOException {
501        return adapt(unwrap().peek(t -> Erase.accept(action, t)));
502    }
503
504    /**
505     * Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
506     *
507     * @param accumulator Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
508     * @return Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
509     * @throws IOException if an I/O error occurs.
510     */
511    @SuppressWarnings("unused") // thrown by Erase.
512    default Optional<T> reduce(final IOBinaryOperator<T> accumulator) throws IOException {
513        return unwrap().reduce((t, u) -> Erase.apply(accumulator, t, u));
514    }
515
516    /**
517     * Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
518     *
519     * @param identity Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
520     * @param accumulator Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
521     * @return Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
522     * @throws IOException if an I/O error occurs.
523     */
524    @SuppressWarnings("unused") // thrown by Erase.
525    default T reduce(final T identity, final IOBinaryOperator<T> accumulator) throws IOException {
526        return unwrap().reduce(identity, (t, u) -> Erase.apply(accumulator, t, u));
527    }
528
529    /**
530     * Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
531     *
532     * @param <U> Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
533     * @param identity Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
534     * @param accumulator Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
535     * @param combiner Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
536     * @return Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
537     * @throws IOException if an I/O error occurs.
538     */
539    @SuppressWarnings("unused") // thrown by Erase.
540    default <U> U reduce(final U identity, final IOBiFunction<U, ? super T, U> accumulator, final IOBinaryOperator<U> combiner) throws IOException {
541        return unwrap().reduce(identity, (t, u) -> Erase.apply(accumulator, t, u), (t, u) -> Erase.apply(combiner, t, u));
542    }
543
544    /**
545     * Like {@link Stream#skip(long)}.
546     *
547     * @param n Like {@link Stream#skip(long)}.
548     * @return Like {@link Stream#skip(long)}.
549     */
550    default IOStream<T> skip(final long n) {
551        return adapt(unwrap().skip(n));
552    }
553
554    /**
555     * Like {@link Stream#sorted()}.
556     *
557     * @return Like {@link Stream#sorted()}.
558     */
559    default IOStream<T> sorted() {
560        return adapt(unwrap().sorted());
561    }
562
563    /**
564     * Like {@link Stream#sorted(java.util.Comparator)}.
565     *
566     * @param comparator Like {@link Stream#sorted(java.util.Comparator)}.
567     * @return Like {@link Stream#sorted(java.util.Comparator)}.
568     * @throws IOException if an I/O error occurs.
569     */
570    @SuppressWarnings("unused") // thrown by Erase.
571    default IOStream<T> sorted(final IOComparator<? super T> comparator) throws IOException {
572        return adapt(unwrap().sorted((t, u) -> Erase.compare(comparator, t, u)));
573    }
574
575    /**
576     * Like {@link Stream#toArray()}.
577     *
578     * @return {@link Stream#toArray()}.
579     */
580    default Object[] toArray() {
581        return unwrap().toArray();
582    }
583
584    /**
585     * TODO Package-private for now, needs IOIntFunction?
586     *
587     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
588     * would be ideal to have only one.
589     *
590     * Like {@link Stream#toArray(IntFunction)}.
591     *
592     * Package private for now.
593     *
594     * @param <A> Like {@link Stream#toArray(IntFunction)}.
595     * @param generator Like {@link Stream#toArray(IntFunction)}.
596     * @return Like {@link Stream#toArray(IntFunction)}.
597     */
598    default <A> A[] toArray(final IntFunction<A[]> generator) {
599        return unwrap().toArray(generator);
600    }
601
602}