1. Fork/Join Framework
Запустить вычисления параллельно в Java 8 можно всего лишь изменением вызова одного метода. Если у вас уже есть Stream объект вы можете вызвать у него parallel(). Если у вы получаете Stream из коллекции, тогда делайте это через parallelStream().
Параллельные вычисления основаны в Java на разбивке задачи на подзадачи, решение каждой из них параллельно в отдельном потоке, последующием объединение результатов. За это отвечает fork/join framework.
2. ForkJoinPool
Fork/join framework это реализация ExecutorService интерфейса, который позволяет вам насладиться всеми преимуществами многопоточности. Фреймворк базируется на ForkJoinPool, которые реализует work stealing алгоритм:
Когда процессор выполнил своя работу, он смотрит на очередь задач и другого процессора и "крадет" его задачу из очередиПолучить доступ к инстансу ForkJoinPool можно следующими способами:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
[...]
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
// в конструкторе можем задать уровень параллелизма (2 в нашем случае)
3. ForkJoinTask
ForkJoinPool выполняет задачи типа ForkJoinTask. Это базовый тип. На практике используется один из двух специализированных типов RecursiveTask(возращает результат) или RecursiveAction(не возвращает). Вы наследуете свою задачу от одного из этих классов и передаете объект в метод ForkJoinPool#invoke , там ваша задача разбиватся на подзадачи(fork) и происходит формирование объединенного результата без ручного вмешательства (join). Происходит это с помощью переопределенного метода ForkJoinTask#compute.
if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
Если работа достаточно мала - то она выполняется, если нет - форкается на две части и ожидается результат выполнения (ForkJoinPool#invokeAll).
4. Пример
Мне это напомнило merge sort, поэтому предлагаю рассмотреть детальнее на его примере.
Наследуем RecursiveAction:
public class MergeForkJoin extends RecursiveAction {
[...]
public MergeForkJoin(Comparable[] a, int low, int high) {
this.a = a;
this.aux = new Comparable[a.length];
this.low = low;
this.high = high;
}
[...]
}
Реализуем compute():
@Override
protected void compute() {
if (low >= high) return;
int mid = low + (high-low)/2;
MergeForkJoin left = new MergeForkJoin(a, low, mid);
MergeForkJoin right = new MergeForkJoin(a, mid+1, high);
invokeAll(left, right);
merge(this.a, this.aux, this.low, mid, this.high);
}
Это, пожалуй, самая важная часть. Здесь происходит fork - разбивка на подзадачи и запуск их параллельно (invokeAll(left, right)), ожидание результата и последующий merge.
Сам merge я взял из Merge.java:
private static void merge(Comparable[] a, Comparable[] aux, int lo, int mid, int hi) {
// copy to aux[]
for (int k = lo; k <= hi; k++) {
aux[k] = a[k];
}
// merge back to a[]
int i = lo, j = mid+1;
for (int k = lo; k <= hi; k++) {
if (i > mid) a[k] = aux[j++];
else if (j > hi) a[k] = aux[i++];
else if (less(aux[j], aux[i])) a[k] = aux[j++];
else a[k] = aux[i++];
}
}
Теперь запуск:
public static void main(String[] args) {
String[] a = "S O R T E X A M P L E".split(" ");
MergeForkJoin mergeForkJoin = new MergeForkJoin(a, 0, a.length -1);
mergeForkJoin.sort();
show(mergeForkJoin.a);
}
[...]
public void sort() {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.invoke(this);
}
Как и было сказано выше - передаем нашу задачу в ForkJoinPool#invoke5. Вывод
Фреймворк спроектирован так, чтобы позволить разбить задачу на подзадачи используя свободную процессорную мощность и улучшить производительность программы.
Материалы:
Fork/Join
Parallelism
Fork/Join Framework в Java 7
Guide to the Fork/Join
Код здесь
Комментариев нет:
Отправить комментарий