пятница, 11 ноября 2016 г.

Stream API и Fork/Join. Параллельные вычисления. Java 8 [10 min reading]

Stream. Что это?


Определим Stream как свободную последовательность элементов, которая не хранит никаких данных и использует коллекции как ресурс. Соответственно Stream не предоставляет непосредственного доступа к данным, а дает возможность применить к ресурсу данных вычислительные операции, которые могут быть выполнены последовательно либо параллельно. Эти вычисления в свою очередь "ленивые" и будут выполнены только после вызова терминальной операции.

Все вместе(ресурс данных, последовательность агрегирующих операций и терминальная операция) это называется stream pipeline.


Stream и параллельные вычисления


В разрезе данной темы для нас важным моментом является как раз возможность выполнять вычисления параллельно. И это не требует от нас ни единой строчки многопоточного кода. Или parallel(), когда мы работаем с готовым stream'ом или parallelStream(), когда создаем его сами.

Но все же интересно, как это работает.

Когда вы запускаем код на многоядерном процессоре, Java 8 "распараллеливает" наш stream, на несколько stream'ов, каждый из которых в отдельном потоке выполняет свою подзадачу и результаты объединяются вместе. За это отвечает наш Fork/Join Framework из прошлого поста.

Пример. Сумма элементов в листе с reduce


Рассмотрим простой пример:

    public static int sumReduce(List numbers) {
        System.out.println(String.format("%"+ 15 +"s %s", "acc", "elem"));
        return numbers.stream()
                .reduce(0,
                        (acc, elem) -> {
                            System.out.println(String.format("accumulator: %d + %d", acc, elem));
                            return acc + elem;
                        });
    }
Этот метод считает последовательно сумму элементов. Выполним его:

    System.out.println(String.format("sum%" + 8 + "s"  +  ":%" + 6 + "d","", 
                sumReduce(asList(1, 2, 3, 4))));
Вывод:

            acc elem
accumulator: 0 + 1
accumulator: 1 + 2
accumulator: 3 + 3
accumulator: 6 + 4
sum        :    10

Последовательное вычисление


Мы воспользовались Stream#reduce:

     * @param identity the identity value for the accumulating function
     * @param accumulator an function for combining two values
     * @return the result of the reduction
     */
    T reduce(T identity, BinaryOperator accumulator);
Reduce c данной сигнатурой обеспечивает выполнение вычислений последовательно - последовательно добавляем к аккумулятору (accumulator), который изначально равен нулю (identity) по одному элементу из листа:

Параллельное вычисление


Теперь вызовем parallelStream():

    public static int sumParallelReduce(List numbers) {
        return numbers.parallelStream()
                .reduce(0,
                        (acc, elem) -> {
                            System.out.println(String.format("accumulator: %d + %d", acc, elem));
                            return acc + elem;
                        });
    }
В этом раз вывод другой:

accumulator: 0 + 4
accumulator: 0 + 2
accumulator: 0 + 1
accumulator: 1 + 2
accumulator: 0 + 3
accumulator: 3 + 4
accumulator: 3 + 7
sum        :    10
Посмотрим на имплементацию reduce в Java 8 в классе ReferencePipeline:

    //класс ReferencePipeline
    @Override
    public final P_OUT reduce(final P_OUT identity, final BinaryOperator accumulator) {
        return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
    }

    //класс ReduceOps
    public static <T, U> TerminalOp<T, U>
    makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
    [...]
    }
Мы видим, что наш accumulator используется еще и как combiner. Но вызывается combiner только в случае параллельных вычислений.

Давайте передадим свой combiner непосредственно через вызов друго метода reduce:

     * @param combiner function for combining two values, which must be
     *                    compatible with the accumulator function
     */
    <U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);
Из доки мы видим требование:
combiner function for combining two values, which must compatible with the accumulator function
Еще это можно выразить вот так:
combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
В нашем случае это означает, что нам нужно правильно смерджить аккумуляторы из разных потоков, т.е. просто их сложить, и можно было оставить все как есть и не заморачиваться, но для наглядности немного переделаем наш код:

    public static int sumParallelReduce(List numbers) {
        return numbers.parallelStream()
                .reduce(0,
                        (acc, elem) -> {
                            System.out.println(String.format("accumulator: %d + %d", acc, elem));
                            return acc + elem;
                        },
                        (left, right) -> {
                            System.out.println(String.format("combiner: %d + %d", left, right));
                            return left + right;
                        });
    }
Вывод:

accumulator: 0 + 1
accumulator: 0 + 3
accumulator: 0 + 4
accumulator: 0 + 2
combiner: 1 + 2
combiner: 3 + 4
combiner: 3 + 7
sum        :    10
О чем это нам говорит? О том, что наш задача была fork'нута на четыре подзадачи, каждая из которых выполнилась в своем потоке, а после этого результаты были объединены(join):


Вывод


Stream API дает нам возможность выполнять вычисления параллельно, используя для этого Fork/Join Framework, который выбирает подходящий вариант разбивки задачи и объединения результата. При этом, мы работаем с тем же самым common ForkJoinPool, которые мы использовали в прошлом посте:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

Это мы можем увидеть в дебаггере:
Мы видим текущий поток(1), вызов нашего метода(2) из подзадачи(3), которая запущена из общего ForkJoinPool(4).

Метериалы:
Java 8 streams API and parallelism
Java 8 parallel streams - an essential trick

Комментариев нет:

Отправить комментарий