Fork-Join框架
关于
【位于Java并行包“java.util.concurrent”中】
Fork/Join 框架:是Java7提供了的一个用于并行执行任务的框架,是一个把大任务分割成若干个(互不依赖的)小任务,最终汇总每个小任务结果后得到大任务结果的框架,这种开发方法也叫分治编程。
- 分治思想,迭代实现;
solve(任务): if(任务已经划分到足够小): 顺序执行任务 else: for(划分任务得到子任务) solve(子任务) 结合所有子任务的结果到上一层循环 return 最终结合的结果
- 分治编程可以极大地利用CPU资源,提高任务执行的效率,也是目前与多线程有关的前沿技术;
- 与其他“ExecutorService”相关的实现(线程池):
- 相同的是,Fork/Join框架会将任务分配给线程池中的线程;
- 而与之不同的是,Fork/Join框架在执行任务时使用了“工作窃取算法”。
- (Java 8 Stream的并行操作底层就是用到了Fork/Join框架!!!)
传统分治编程
在不使用 Fork-Join 框架时,使用普通的线程池实现:
- 向线程池提交了一个任务(大任务),并规定好任务切割阀值。
- 由线程池中线程(假设是线程A)执行大任务,若发现大任务的大小超过阀值,则切割为两个子任务,并调用“submit()”提交到线程池,得到返回的子任务的“Future”对象。
- 线程A就调用(上一步返回的)“Future”的“get()”方法阻塞等待子任务的执行结果。
- 池中的其他线程(除线程A外的其他线程,线程A被阻塞)执行两个子任务:根据子任务大小,进行:1、迭代以上步骤;2、计算并返回结果。
缺点:
- 每个进行任务切割的线程(如:步骤中的线程A)都会被阻塞,直到其所有子任务完成;
- 如果任务太大了,需要切割多次,那么就会有多个线程被阻塞,性能将会急速下降;
- 如果线程池的线程数量有上限,且小于任务切割次数,则极可能造成池中所有线程被阻塞,从而无法执行任务;
工作窃取算法
“工作窃取(work-stealing)”:在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。
- 每个线程中维护了一个双端队列来存储所需要执行的任务;
- 允许线程从其他线程的双端队列中窃取一个最晚的任务来执行,以此避免线程发生竞争。
- 被窃取任务线程从队头获取任务;
- 窃取任务线程从队尾获取任务;
- 当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”;
优点:
- 充分利用线程进行并行计算(允许窃取)
- 减少了线程间的竞争(双端队列的使用)
缺点:
- 在某些情况下会存在竞争(双端队列中只有一个任务)
- 消耗了更多的系统资源(用于窃取操作?)
Fork-Join框架
Fork/Join 框架创建的任务需要通过“ForkJoinPool”来启动,“ForkJoinPool”是一个线程池,比较特殊的是:其线程数量是根据 CPU 的核心数来设置的。而“ForkJoinPool”是通过工作窃取(work-stealing)算法来提高 CPU 的利用率的。
Fork/Join 框架核心类:主要由子任务(ForkJoinTask)、任务调度(ForkJoinPool)两部分组成:
ForkJoinPool
ForkJoin框架中:“任务调度器”。【继承了“AbstractExecutorService”】
- 启动 Fork/Join 任务,用来执行 Task。或生成新的“ForkJoinWorkerThread”,执行“ForkJoinWorkerThread”间的“work-stealing”逻辑;
- ForkJoinPool管理着执行池中的“线程”(“ForkJoinWorkerThread”)和“任务队列”(“WorkQueue”),此外,执行池是否还接受任务,显示线程的运行状态也是在这里处理。
public class ForkJoinPool extends AbstractExecutorService { // 任务队列 volatile WorkQueue[] workQueues; // 线程的运行状态 volatile int runState; // 创建ForkJoinWorkerThread的默认工厂,可以通过构造函数重写 public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; // 公用的线程池,其运行状态不受shutdown()和shutdownNow()的影响 static final ForkJoinPool common; // 私有构造方法,没有任何安全检查和参数校验,由makeCommonPool直接调用 // 其他构造方法都是源自于此方法 // parallelism: 并行度, // 默认调用java.lang.Runtime.availableProcessors() 方法返回可用处理器的数量 private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, // 工作线程工厂 UncaughtExceptionHandler handler, // 拒绝任务的handler int mode, // 同步模式 String workerNamePrefix) { // 线程名prefix this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); } ... }
- “WorkQueue”:双端队列,“ForkJoinTask”存放在这里【“外部任务”】。
- 每一个工作线程(“ForkJoinWorkerThread”)都维护了一个工作队列【“内部任务”】;
- 当工作线程(“ForkJoinWorkerThread”)在处理自己的工作队列时,会从队列首取任务来执行(FIFO);
- 如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。
- “runState”:ForkJoinPool的运行状态;
- “SHUTDOWN”状态用负数表示,其他用2的幂次表示;
- 和“ThreadPoolExecutor”一样实现了自己的线程池,提供了三种调度子任务的方法:
- “execute()”:异步执行指定任务,无返回结果;
- “invoke()”、“invokeAll()”:异步执行指定任务,等待完成才返回结果;
- “submit()”:异步执行指定任务,并立即返回一个Future对象(保存了异步计算的结果);
(ForkJoinWorkerThread)
ForkJoin框架中:执行任务的工作线程。【继承了“Thread”类】
- 即“ForkJoinPool”线程池里的线程;
- 每个线程都维护着一个内部队列,用于存放“ForkJoinTask”【“内部任务”】;
ForkJoinTask
Fork/Join框架中:实际执行的“任务类”,是一个类似普通线程的实体,但是比普通线程轻量得多。【实现了“Future”接口】
- “fork()”方法:使用线程池中的空闲线程异步提交任务;
- (即:把任务推入当前工作线程的工作队列里)
public final ForkJoinTask<V> fork() { Thread t; // ForkJoinWorkerThread 是执行ForkJoinTask的专有线程,由ForkJoinPool管理 // 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else // 如果不是则将线程加入队列 // 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中 ForkJoinPool.common.externalPush(this); return this; }
- “join()”方法:等待处理任务的线程处理完毕,获得返回值;
public final V join() { int s; // doJoin()方法来获取当前任务的执行状态 if ((s = doJoin() & DONE_MASK) != NORMAL) // 任务异常,抛出异常 reportException(s); // 任务正常完成,获取返回值 return getRawResult(); } /** * doJoin()方法用来返回当前任务的执行状态 **/ private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; // 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态) return (s = status) < 0 ? s : // 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? // 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它) // tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true // doExec()方法执行任务 (w = (wt = (ForkJoinWorkerThread)t).workQueue). // 如果是处于顶端并且任务执行完毕,返回结果 tryUnpush(this) && (s = doExec()) < 0 ? s : // 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务 // awaitJoin():使用自旋使任务执行完成,返回结果 wt.pool.awaitJoin(w, this, 0L) : // 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果 externalAwaitDone(); }
RecursiveAction 与 RecursiveTask
通常情况下,不直接继承ForkJoinTask,而是继承其子类:
- 两个子类都有执行主要计算的方法compute();
- “RecursiveAction”:用于无结果返回的子任务;
- compute()返回void;
- “RecursiveTask”:用于有结果返回的子任务;
- compute()有具体的返回值;
compute 方法的实现模式,一般是:
if 任务足够小
直接返回结果
else
分割成N个子任务
依次调用每个子任务的fork方法执行子任务
依次调用每个子任务的join方法合并执行结果
Fork/Join 总结
- Fork/Join 的任务,分为:
- “外部任务”:放在 ForkJoinPool 的全局队列里;
- “内部任务”:放在 ForkJoinPool 池中的每个线程内维护的队列里;
Fork/Join 总结:
- ForkJoinPool 的每个工作线程都维护着一个双端工作队列(“WorkQueue”),队列中存放着是任务(“ForkJoinTask”);
- 任务分割:
- 每个工作线程在运行中产生新的任务(调用“fork()”方法)时,得到的子任务会被放入工作线程的工作队列队尾(作为内部任务);
- 处理任务队列:
- 工作线程在处理自己的工作队列时,使用的是FIFO方式,也就是说每次从队首取出任务来执行。
- 窃取任务:
- 工作线程在处理完“内部队列”后,会尝试窃取一个任务(1、来自于刚刚提交到“pool”的“外部任务”,2、或来自于其他工作线程的“内部任务”),窃取的任务位于其他线程的工作队列的队尾,也就是说工作线程在窃取其他工作线程的任务时,使用的是LIFO方式。
- 获取结果:
- 在遇到“join()”时,如果需要join的任务尚未完成,则会先处理其他任务,并等待其完成。
- 执行本线程“内部队列”的其他任务,
- 或者(内部任务队列为空)扫描其他的任务队列进行窃取任务;
- 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
Fork/Join 示例
使用Fork/Join进行累加
示例:“从1+2+...10亿,每个任务只能处理1000个数相加,超过1000个的自动分解成小任务并行处理”:
- “RecursiveTask”:递归任务类;
- 这里需要计算结果,所以任务继承的是“RecursiveTask”类。
- ForkJoinTask需要实现“compute”方法:
- 判断任务是否小于等于阈值1000,如果是就直接执行任务;
- 否则,分割成两个子任务,并调用子任务的“fork”方法(迭代:进入子任务的“compute”方法),然后使用“join”方法(阻塞等待子任务执行完,并得到其结果)组合结果并返回。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ForkJoinTask extends RecursiveTask<Long> {
private static final long MAX = 1000000000L;
private static final long THRESHOLD = 1000L;
private long start;
private long end;
public ForkJoinTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if (end - start <= THRESHOLD) {
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long mid = (start + end) / 2;
ForkJoinTask task1 = new ForkJoinTask(start, mid);
task1.fork();
ForkJoinTask task2 = new ForkJoinTask(mid + 1, end);
task2.fork();
return task1.join() + task2.join();
}
}
public static void main(String[] args) {
// 串行计算
System.out.println("test");
long start = System.currentTimeMillis();
Long sum = 0L;
for (long i = 0L; i <= MAX; i++) {
sum += i;
}
System.out.println(sum);
System.out.println(System.currentTimeMillis() - start + "ms");
System.out.println("--------------------");
// 使用Fork/Join计算
System.out.println("testForkJoin");
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
Long sum = forkJoinPool.invoke(new ForkJoinTask(1, MAX));
System.out.println(sum);
System.out.println(System.currentTimeMillis() - start + "ms");
}
}
程序输出:
test
500000000500000000
4992ms
--------------------
testForkJoin
500000000500000000
508ms
通过斐波那契数列:Fork/Join、递归、循环
斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和: 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······ 如果设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。
通过不同方式计算斐波那契数列:
- 使用“Fork/Join”:
public class FibonacciTest { class Fibonacci extends RecursiveTask<Integer> { int n; public Fibonacci(int n) { this.n = n; } // 主要的实现逻辑都在compute()里 @Override protected Integer compute() { // 这里先假设 n >= 0 if (n <= 1) { return n; } else { // f(n-1) Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); // f(n-2) Fibonacci f2 = new Fibonacci(n - 2); f2.fork(); // f(n) = f(n-1) + f(n-2) return f1.join() + f2.join(); } } } @Test public void testFib() throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors()); long start = System.currentTimeMillis(); Fibonacci fibonacci = new Fibonacci(40); Future<Integer> future = forkJoinPool.submit(fibonacci); System.out.println(future.get()); long end = System.currentTimeMillis(); System.out.println(String.format("耗时:%d millis", end - start)); } }
- 输出:
CPU核数:4 计算结果:102334155 耗时:9490 millis
- 使用“递归”:
// 普通递归,复杂度为O(2^n) public int plainRecursion(int n) { if (n == 1 || n == 2) { return 1; } else { return plainRecursion(n -1) + plainRecursion(n - 2); } } @Test public void testPlain() { long start = System.currentTimeMillis(); int result = plainRecursion(40); long end = System.currentTimeMillis(); System.out.println("计算结果:" + result); System.out.println(String.format("耗时:%d millis", end -start)); }
- 输出:
计算结果:102334155 耗时:436 millis
- 使用“循环”:
// 通过循环来计算,复杂度为O(n) private int computeFibonacci(int n) { // 假设n >= 0 if (n <= 1) { return n; } else { int first = 1; int second = 1; int third = 0; for (int i = 3; i <= n; i ++) { // 第三个数是前两个数之和 third = first + second; // 前两个数右移 first = second; second = third; } return third; } } @Test public void testComputeFibonacci() { long start = System.currentTimeMillis(); int result = computeFibonacci(40); long end = System.currentTimeMillis(); System.out.println("计算结果:" + result); System.out.println(String.format("耗时:%d millis", end -start)); }
- 输出:(0表示耗时几乎可以忽略不计)
计算结果:102334155 耗时:0 millis
为什么在这里普通的递归或循环效率更快呢?
- 因为Fork/Join是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。
若计算任务比较简单(如上),则直接使用单线程更快(Fork/Join的线程协作开支远大于多核计算提高的效率);只有当计算任务复杂,才应考虑使用Fork/Join框架,以充分利用多核CPU来提高计算速度(Java 8 Stream)。
FAQ
- ForkJoinPool 使用 submit 与 invoke 提交的区别?
- invoke 是同步执行,调用之后需要等待任务完成,才能执行后面的代码。
- submit 是异步执行,只有在 Future 调用 get 的时候会阻塞。【Future???】
- 继承 RecursiveTask 与 RecursiveAction的区别?
- RecursiveTask:适用于有返回值的场景。
- RecursiveAction:适合于没有返回值的场景。
- 子任务调用 fork 与 invokeAll 的区别?
- fork:让子线程自己去完成任务,父线程监督子线程执行,浪费父线程。
- invokeAll:子父线程共同完成任务,可以更好的利用线程池。