Вспомогательный класс потока для Java 8


Компания, в которой я работаю, застрял с Java 8, и потоки выполнения в Java 8-это хорошо, но немного грубо вокруг краев.

Вот это вспомогательный класс, я слепленная, используя различные фрагменты из межсетях. Но это беспокоит меня немного, потому что я не понимаю некоторых Spliterator магия, и нет времени, чтобы погрузиться глубже в нее. Конечно, я писал тесты, но было бы приятно получить обратную связь.

package com.acme;

import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public final class Streams {

    private Streams() {
        // do not instantiate
    }

    public static <A, B, C> Stream<C> zipWith(Stream<A> leftStream, Stream<B> rightStream,
            BiFunction<A, B, C> combiner) {
        Spliterator<A> lefts = leftStream.spliterator();
        Spliterator<B> rights = rightStream.spliterator();
        return StreamSupport
                .stream(new Spliterators.AbstractSpliterator<C>(Long.min(lefts.estimateSize(), rights.estimateSize()),
                        lefts.characteristics() & rights.characteristics()) {
                    @Override
                    public boolean tryAdvance(Consumer<? super C> action) {
                        return lefts.tryAdvance(
                                left -> rights.tryAdvance(right -> action.accept(combiner.apply(left, right))));
                    }
                }, leftStream.isParallel() || rightStream.isParallel());
    }

    public static <A, C> Stream<C> zipWithIndex(Stream<A> leftStream, BiFunction<A, Integer, C> combiner) {
        return zipWith(leftStream, Stream.iterate(0, index -> index + 1), combiner);
    }

    public static <A> Stream<A> takeWhile(Stream<A> stream, Predicate<? super A> predicate) {
        Spliterator<A> spliterator = stream.spliterator();
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<A>(spliterator.estimateSize(), 0) {
            boolean stillGoing = true;
            @Override
            public boolean tryAdvance(Consumer<? super A> consumer) {
                if (stillGoing) {
                    boolean hadNext = spliterator.tryAdvance(elem -> {
                        if (predicate.test(elem)) {
                            consumer.accept(elem);
                        } else {
                            stillGoing = false;
                        }
                    });
                    return hadNext && stillGoing;
                }
                return false;
            }
        }, false);
    }

    public static <A> Stream<A> dropWhile(Stream<A> stream, Predicate<? super A> predicate) {
        Spliterator<A> spliterator = stream.spliterator();
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<A>(spliterator.estimateSize(), 0) {
            boolean dropped;
            public boolean tryAdvance(Consumer<? super A> action) {
                if (dropped) {
                    return spliterator.tryAdvance(action);
                }
                while (true) {
                    if (!(!dropped && spliterator.tryAdvance(t -> {
                        if (!predicate.test(t)) {
                            dropped = true;
                            action.accept(t);
                        }
                    }))) break;
                }
                return dropped;
            }
            public void forEachRemaining(Consumer<? super A> action) {
                while (!dropped) {
                    if (!tryAdvance(action)) {
                        return;
                    }
                }
                spliterator.forEachRemaining(action);
            }
        }, false);
    }

    public static <A> Stream<A> from(Iterable<A> iterable) {
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    public static <A> Stream<A> from(Optional<A> optional) {
        return optional.map(Stream::of).orElseGet(Stream::empty);
    }
}


219
1
задан 13 февраля 2018 в 12:02 Источник Поделиться
Комментарии
1 ответ


  • Я сделал подобные расширения потока в прошлом. Однако, я использовал шаблон "декоратор". Таким образом, способность к цепочке операций потока (в том числе добавления) остается неизменным. (это не относится к from() методы)

  • В zipWith()Я считаю это более разумно указать :

    leftStream.isParallel() && rightStream.isParallel()

    Если какой-либо поток не параллелен, кто может использовать операции на нем, которые не подходят для параллельной обработки.


  • в dropWhile()это является ненужным, чтобы переопределить forEachRemaining. реализация по умолчанию делегатов tryAdvanceи что работает просто отлично.

1
ответ дан 21 февраля 2018 в 08:02 Источник Поделиться