ForkJoin框架

it2022-05-05  141

Fork/Join框架

1. Fork/Join框架简介

Fork/Join框架是java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果得到大任务结果的框架。Fork指的就是把一个大任务分割成若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。

比如计算1+2+3+4+5+……+10000,可以分割成10个子任务,每个子任务负责分别对1000个数进行求和,最终汇总这10个子任务的结果。

2. Fork/Join框架使用说明

使用Fork/Join框架我们需要两个类:

ForkJoinTask:Fork/Join任务,提供fork()和Join()方法,通常情况下继承它的两个子类: RecursiveAction:返回没有结果的任务。RecursiveTask:返回有结果的任务。ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。

从ForkJoinTask的两个子类的名字中就可以看到,这是一种采用递归方式实现的任务分割,任务分割的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部,当一个工作线程的队列里面暂时没有任务时,它会随机从其他工作线程队列的尾部获取一个任务去执行。

3. Fork/Join框架实例

3.1 需求:

计算1 + 2 + 3 +……+10 的结果。

3.2 需求分析设计:

使用Fork/Join框架首先要考虑的就是如何分割任务,和分割任务的粒度,这里我们考虑每个子任务最多执行两个数相加,那我们分割的阈值就是2,Fork/Join框架会把这个任务fork成5个子任务,然后再join5个子任务的结果。因为是有结果的任务,所以必须继承RecursiveTask。

3.3 代码实例

package com.wangjun.thread; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class ForkJoinTaskTest extends RecursiveTask<Integer>{ private static final int THRESHOLD = 2; // 阈值 private int start; private int end; public ForkJoinTaskTest(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if(end - start < THRESHOLD) { System.out.println(Thread.currentThread().getName() + ":" + start + "-" + end); for(int i = start; i <= end; i++) { sum += i; } }else { // 任务大于阈值,拆分子任务 int middle = start + (end - start)/2; ForkJoinTaskTest task1 = new ForkJoinTaskTest(start, middle); ForkJoinTaskTest task2 = new ForkJoinTaskTest(middle + 1, end); // 执行子任务 task1.fork(); task2.fork(); // 等待子任务执行完成,并得到其结果 int result1 = task1.join(); int result2 = task2.join(); // 合并子任务 sum = result1 + result2; } return sum; } public static void main(String[] args) throws InterruptedException, ExecutionException { int start = 1; int end = 10; ForkJoinTaskTest task = new ForkJoinTaskTest(start, end); // 用来执行ForkJoinTask ForkJoinPool pool = new ForkJoinPool(); // 执行任务 Future<Integer> result = pool.submit(task); System.out.println("最后的结果为:" + result.get()); } }

运行结果:

ForkJoinPool-1-worker-2:3-3 ForkJoinPool-1-worker-0:4-5 ForkJoinPool-1-worker-3:6-7 ForkJoinPool-1-worker-1:1-2 ForkJoinPool-1-worker-3:8-8 ForkJoinPool-1-worker-0:9-10 最后的结果为:55

可以看到ForkJoinTask与一般的任务在于它需要实现compute()方法,在这个方法里面首先需要判断是否需要分割子任务,如果需要分割,每个子任务再调用fork方法时,就会进入compute()方法,如果不需要分割,则执行当前任务并返回结果,使用join方法会等待子任务结果的返回。可以看到这是一种递归的实现。

转载于:https://www.cnblogs.com/scuwangjun/p/9129615.html


最新回复(0)