среда, 9 ноября 2016 г.

Fork/Join Framework. Параллельные вычисления. Java 8 [5 min reading]

1. Fork/Join Framework


Запустить вычисления параллельно в Java 8 можно всего лишь изменением вызова одного метода. Если у вас уже есть Stream объект вы можете вызвать у него parallel(). Если у вы получаете Stream из коллекции, тогда делайте это через parallelStream().

Параллельные вычисления основаны в Java на разбивке задачи на подзадачи, решение каждой из них параллельно в отдельном потоке, последующием объединение результатов. За это отвечает fork/join framework.


2. ForkJoinPool


Fork/join framework это реализация ExecutorService интерфейса, который позволяет вам насладиться всеми преимуществами многопоточности. Фреймворк базируется на ForkJoinPool, которые реализует work stealing алгоритм:
Когда процессор выполнил своя работу, он смотрит на очередь задач и другого процессора и "крадет" его задачу из очереди
Получить доступ к инстансу ForkJoinPool можно следующими способами:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
[...]
ForkJoinPool forkJoinPool = new ForkJoinPool(2); 
// в конструкторе можем задать уровень параллелизма (2 в нашем случае)

3. ForkJoinTask


ForkJoinPool выполняет задачи типа ForkJoinTask. Это базовый тип. На практике используется один из двух специализированных типов RecursiveTask(возращает результат) или RecursiveAction(не возвращает). Вы наследуете свою задачу от одного из этих классов и передаете объект в метод ForkJoinPool#invoke , там ваша задача разбиватся на подзадачи(fork) и происходит формирование объединенного результата без ручного вмешательства (join). Происходит это с помощью переопределенного метода ForkJoinTask#compute.

if (my portion of the work is small enough)
    do the work directly
else
    split my work into two pieces
    invoke the two pieces and wait for the results

Если работа достаточно мала - то она выполняется, если нет - форкается на две части и ожидается результат выполнения (ForkJoinPool#invokeAll).

4. Пример


Мне это напомнило merge sort, поэтому предлагаю рассмотреть детальнее на его примере.

Наследуем RecursiveAction:


public class MergeForkJoin extends RecursiveAction {
    [...]
    public MergeForkJoin(Comparable[] a, int low, int high) {
        this.a = a;
        this.aux = new Comparable[a.length];
        this.low = low;
        this.high = high;
    }
    [...]
}

Реализуем compute():


    @Override
    protected void compute() {
        if (low >= high) return;
        int mid = low + (high-low)/2;
        MergeForkJoin left = new MergeForkJoin(a, low, mid);
        MergeForkJoin right = new MergeForkJoin(a, mid+1, high);
        invokeAll(left, right);
        merge(this.a, this.aux, this.low, mid, this.high);
    }

Это, пожалуй, самая важная часть. Здесь происходит fork - разбивка на подзадачи и запуск их параллельно (invokeAll(left, right)), ожидание результата и последующий merge.

Сам merge я взял из Merge.java:


    private static void merge(Comparable[] a, Comparable[] aux, int lo, int mid, int hi) {
        // copy to aux[]
        for (int k = lo; k <= hi; k++) {
            aux[k] = a[k];
        }
        // merge back to a[]
        int i = lo, j = mid+1;
        for (int k = lo; k <= hi; k++) {
            if      (i > mid)              a[k] = aux[j++];
            else if (j > hi)               a[k] = aux[i++];
            else if (less(aux[j], aux[i])) a[k] = aux[j++];
            else                           a[k] = aux[i++];
        }
    }

Теперь запуск:


    public static void main(String[] args) {
        String[] a = "S O R T E X A M P L E".split(" ");
        MergeForkJoin mergeForkJoin = new MergeForkJoin(a, 0, a.length -1);
        mergeForkJoin.sort();
        show(mergeForkJoin.a);
    }
    [...]
    public void sort() {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.invoke(this);
    }   
Как и было сказано выше - передаем нашу задачу в ForkJoinPool#invoke

5. Вывод


Фреймворк спроектирован так, чтобы позволить разбить задачу на подзадачи используя свободную процессорную мощность и улучшить производительность программы.

Материалы:
Fork/Join
Parallelism
Fork/Join Framework в Java 7
Guide to the Fork/Join

Код здесь

Комментариев нет:

Отправить комментарий