Java中的ForkJoin
是适合计算密集型任务的多线程调度框架。Fork
其实这里就是指任务拆分,而Join
则是合并任务执行的结果。ForkJoin
的思路很简单,就是将一个大任务按照指定的策略拆分成几个小任务,通过线程池放到不同的线程中执行,小任务还可以继续拆分,像这样递归拆分直到满足某一条件,小任务全部运行完成后再合并计算结果,最终达到充分利用多核处理器进行运算的能力。
ForkJoinPool:具体执行ForkJoin
任务的线程池,线程池默认大小为处理器核心数。
RecursiveTask:具体实现递归调用和拆分的任务。
下面例子中,我们使用ForkJoin
方式计算1+2+...+1000
的值。
ComputeTask.java
package com.gacfox.demo;
import java.util.concurrent.RecursiveTask;
public class ComputeTask extends RecursiveTask<Integer> {
private final int start;
private final int end;
private int sum = 0;
public ComputeTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= 100) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
int middle = start + 100;
ComputeTask task1 = new ComputeTask(start, middle);
ComputeTask task2 = new ComputeTask(middle + 1, end);
task1.fork();
task2.fork();
sum = task1.join() + task2.join();
}
return sum;
}
}
Main.java
package com.gacfox.demo;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
ComputeTask task = new ComputeTask(1, 1000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> result = forkJoinPool.submit(task);
try {
int i = result.get();
System.out.println("计算结果:" + i);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
代码其实很简单,我们对于整个计算任务不断将其拆分,直到计算的范围小于100,每个任务都提交到ForkJoinPool
线程池中,最终效果就是整个计算任务被拆分后交给了处理器的多个核心来并行执行。