Java使用Fork/Join框架来并行执行任务
现代的计算机已经向多CPU方向发展,即使是普通的PC,甚至现在的智能手机、多核处理器已被广泛应用。在未来,处理器的核心数将会发展的越来越多。
虽然硬件上的多核CPU已经十分成熟,但是很多应用程序并未这种多核CPU做好准备,因此并不能很好地利用多核CPU的性能优势。
为了充分利用多CPU、多核CPU的性能优势,级软基软件系统应该可以充分“挖掘”每个CPU的计算能力,决不能让某个CPU处于“空闲”状态。为此,可以考虑把一个任务拆分成多个“小任务”,把多个"小任务"放到多个处理器核心上并行执行。当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。
如下面的示意图所示:
第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Java提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合成总的计算结果。
ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器。
- public ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool
- public ForkJoinPool() :以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool
创建ForkJoinPool实例后,可以钓鱼ForkJoinPool的submit(ForkJoinTask<T> task)或者invoke(ForkJoinTask<T> task)来执行指定任务。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它有两个抽象子类:RecursiveAction和RecursiveTask。
- RecursiveTask代表有返回值的任务
- RecursiveAction代表没有返回值的任务。
一、RecursiveAction
下面以一个没有返回值的大任务为例,介绍一下RecursiveAction的用法。
大任务是:打印0-200的数值。
小任务是:每次只能打印50个数值。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; //RecursiveAction为ForkJoinTask的抽象子类,没有返回值的任务 class PrintTask extends RecursiveAction { // 每个"小任务"最多只打印50个数 private static final int MAX = 50; private int start; private int end; PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { // 当end-start的值小于MAX时候,开始打印 if ((end - start) < MAX) { for (int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + "的i值:" + i); } } else { // 将大任务分解成两个小任务 int middle = (start + end) / 2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); // 并行执行两个小任务 left.fork(); right.fork(); } } } public class ForkJoinPoolTest { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); // 提交可分解的PrintTask任务 forkJoinPool.submit(new PrintTask(0, 200)); forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束 // 关闭线程池 forkJoinPool.shutdown(); } }
运行结果如下:
ForkJoinPool-1-worker-2的i值:75 ForkJoinPool-1-worker-2的i值:76 ForkJoinPool-1-worker-2的i值:77 ForkJoinPool-1-worker-2的i值:78 ForkJoinPool-1-worker-2的i值:79 ForkJoinPool-1-worker-2的i值:80 ForkJoinPool-1-worker-2的i值:81 ForkJoinPool-1-worker-2的i值:82 ForkJoinPool-1-worker-2的i值:83 ForkJoinPool-1-worker-2的i值:84 ForkJoinPool-1-worker-2的i值:85 ForkJoinPool-1-worker-2的i值:86 ForkJoinPool-1-worker-2的i值:87 ForkJoinPool-1-worker-2的i值:88 ForkJoinPool-1-worker-2的i值:89 ForkJoinPool-1-worker-2的i值:90 ForkJoinPool-1-worker-2的i值:91 ForkJoinPool-1-worker-2的i值:92 ForkJoinPool-1-worker-2的i值:93 ForkJoinPool-1-worker-2的i值:94 ForkJoinPool-1-worker-2的i值:95 ForkJoinPool-1-worker-2的i值:96 ForkJoinPool-1-worker-2的i值:97 ForkJoinPool-1-worker-2的i值:98 ForkJoinPool-1-worker-2的i值:99 ForkJoinPool-1-worker-2的i值:50 ForkJoinPool-1-worker-2的i值:51 ForkJoinPool-1-worker-2的i值:52 ForkJoinPool-1-worker-2的i值:53 ForkJoinPool-1-worker-2的i值:54 ForkJoinPool-1-worker-2的i值:55 ForkJoinPool-1-worker-2的i值:56 ForkJoinPool-1-worker-2的i值:57 ForkJoinPool-1-worker-2的i值:58 ForkJoinPool-1-worker-2的i值:59 ForkJoinPool-1-worker-2的i值:60 ForkJoinPool-1-worker-2的i值:61 ForkJoinPool-1-worker-2的i值:62 ForkJoinPool-1-worker-2的i值:63 ForkJoinPool-1-worker-2的i值:64 ForkJoinPool-1-worker-2的i值:65 ForkJoinPool-1-worker-2的i值:66 ForkJoinPool-1-worker-2的i值:67 ForkJoinPool-1-worker-2的i值:68 ForkJoinPool-1-worker-2的i值:69 ForkJoinPool-1-worker-1的i值:175 ForkJoinPool-1-worker-1的i值:176 ForkJoinPool-1-worker-1的i值:177 ForkJoinPool-1-worker-1的i值:178 ForkJoinPool-1-worker-1的i值:179 ForkJoinPool-1-worker-1的i值:180 ForkJoinPool-1-worker-1的i值:181 ForkJoinPool-1-worker-1的i值:182 ForkJoinPool-1-worker-1的i值:183 ForkJoinPool-1-worker-1的i值:184 ForkJoinPool-1-worker-1的i值:185 ForkJoinPool-1-worker-1的i值:186 ForkJoinPool-1-worker-1的i值:187 ForkJoinPool-1-worker-1的i值:188 ForkJoinPool-1-worker-1的i值:189 ForkJoinPool-1-worker-1的i值:190 ForkJoinPool-1-worker-1的i值:191 ForkJoinPool-1-worker-1的i值:192 ForkJoinPool-1-worker-1的i值:193 ForkJoinPool-1-worker-1的i值:194 ForkJoinPool-1-worker-1的i值:195 ForkJoinPool-1-worker-1的i值:196 ForkJoinPool-1-worker-1的i值:197 ForkJoinPool-1-worker-1的i值:198 ForkJoinPool-1-worker-1的i值:199 ForkJoinPool-1-worker-1的i值:150 ForkJoinPool-1-worker-1的i值:151 ForkJoinPool-1-worker-1的i值:152 ForkJoinPool-1-worker-1的i值:153 ForkJoinPool-1-worker-1的i值:154 ForkJoinPool-1-worker-1的i值:155 ForkJoinPool-1-worker-1的i值:156 ForkJoinPool-1-worker-1的i值:157 ForkJoinPool-1-worker-1的i值:158 ForkJoinPool-1-worker-1的i值:159 ForkJoinPool-1-worker-1的i值:160 ForkJoinPool-1-worker-1的i值:161 ForkJoinPool-1-worker-1的i值:162 ForkJoinPool-1-worker-1的i值:163 ForkJoinPool-1-worker-1的i值:164 ForkJoinPool-1-worker-1的i值:165 ForkJoinPool-1-worker-1的i值:166 ForkJoinPool-1-worker-1的i值:167 ForkJoinPool-1-worker-1的i值:168 ForkJoinPool-1-worker-1的i值:169 ForkJoinPool-1-worker-1的i值:170 ForkJoinPool-1-worker-1的i值:171 ForkJoinPool-1-worker-1的i值:172 ForkJoinPool-1-worker-1的i值:173 ForkJoinPool-1-worker-1的i值:174 ForkJoinPool-1-worker-1的i值:125 ForkJoinPool-1-worker-1的i值:126 ForkJoinPool-1-worker-1的i值:127 ForkJoinPool-1-worker-1的i值:128 ForkJoinPool-1-worker-1的i值:129 ForkJoinPool-1-worker-1的i值:130 ForkJoinPool-1-worker-1的i值:131 ForkJoinPool-1-worker-1的i值:132 ForkJoinPool-1-worker-1的i值:133 ForkJoinPool-1-worker-1的i值:134 ForkJoinPool-1-worker-1的i值:135 ForkJoinPool-1-worker-1的i值:136 ForkJoinPool-1-worker-1的i值:137 ForkJoinPool-1-worker-1的i值:138 ForkJoinPool-1-worker-1的i值:139 ForkJoinPool-1-worker-1的i值:140 ForkJoinPool-1-worker-1的i值:141 ForkJoinPool-1-worker-1的i值:142 ForkJoinPool-1-worker-1的i值:143 ForkJoinPool-1-worker-1的i值:144 ForkJoinPool-1-worker-1的i值:145 ForkJoinPool-1-worker-1的i值:146 ForkJoinPool-1-worker-1的i值:147 ForkJoinPool-1-worker-1的i值:148 ForkJoinPool-1-worker-1的i值:149 ForkJoinPool-1-worker-1的i值:100 ForkJoinPool-1-worker-1的i值:101 ForkJoinPool-1-worker-1的i值:102 ForkJoinPool-1-worker-1的i值:103 ForkJoinPool-1-worker-1的i值:104 ForkJoinPool-1-worker-1的i值:105 ForkJoinPool-1-worker-1的i值:106 ForkJoinPool-1-worker-1的i值:107 ForkJoinPool-1-worker-1的i值:108 ForkJoinPool-1-worker-1的i值:109 ForkJoinPool-1-worker-1的i值:110 ForkJoinPool-1-worker-1的i值:111 ForkJoinPool-1-worker-1的i值:112 ForkJoinPool-1-worker-1的i值:113 ForkJoinPool-1-worker-1的i值:114 ForkJoinPool-1-worker-1的i值:115 ForkJoinPool-1-worker-1的i值:116 ForkJoinPool-1-worker-1的i值:117 ForkJoinPool-1-worker-1的i值:118 ForkJoinPool-1-worker-1的i值:119 ForkJoinPool-1-worker-1的i值:120 ForkJoinPool-1-worker-1的i值:121 ForkJoinPool-1-worker-1的i值:122 ForkJoinPool-1-worker-1的i值:123 ForkJoinPool-1-worker-1的i值:124 ForkJoinPool-1-worker-1的i值:25 ForkJoinPool-1-worker-1的i值:26 ForkJoinPool-1-worker-1的i值:27 ForkJoinPool-1-worker-1的i值:28 ForkJoinPool-1-worker-1的i值:29 ForkJoinPool-1-worker-1的i值:30 ForkJoinPool-1-worker-1的i值:31 ForkJoinPool-1-worker-1的i值:32 ForkJoinPool-1-worker-1的i值:33 ForkJoinPool-1-worker-1的i值:34 ForkJoinPool-1-worker-1的i值:35 ForkJoinPool-1-worker-1的i值:36 ForkJoinPool-1-worker-1的i值:37 ForkJoinPool-1-worker-1的i值:38 ForkJoinPool-1-worker-1的i值:39 ForkJoinPool-1-worker-1的i值:40 ForkJoinPool-1-worker-1的i值:41 ForkJoinPool-1-worker-1的i值:42 ForkJoinPool-1-worker-1的i值:43 ForkJoinPool-1-worker-1的i值:44 ForkJoinPool-1-worker-1的i值:45 ForkJoinPool-1-worker-1的i值:46 ForkJoinPool-1-worker-1的i值:47 ForkJoinPool-1-worker-1的i值:48 ForkJoinPool-1-worker-1的i值:49 ForkJoinPool-1-worker-1的i值:0 ForkJoinPool-1-worker-1的i值:1 ForkJoinPool-1-worker-1的i值:2 ForkJoinPool-1-worker-1的i值:3 ForkJoinPool-1-worker-1的i值:4 ForkJoinPool-1-worker-1的i值:5 ForkJoinPool-1-worker-1的i值:6 ForkJoinPool-1-worker-1的i值:7 ForkJoinPool-1-worker-1的i值:8 ForkJoinPool-1-worker-1的i值:9 ForkJoinPool-1-worker-1的i值:10 ForkJoinPool-1-worker-1的i值:11 ForkJoinPool-1-worker-1的i值:12 ForkJoinPool-1-worker-1的i值:13 ForkJoinPool-1-worker-1的i值:14 ForkJoinPool-1-worker-1的i值:15 ForkJoinPool-1-worker-1的i值:16 ForkJoinPool-1-worker-1的i值:17 ForkJoinPool-1-worker-1的i值:18 ForkJoinPool-1-worker-1的i值:19 ForkJoinPool-1-worker-1的i值:20 ForkJoinPool-1-worker-1的i值:21 ForkJoinPool-1-worker-1的i值:22 ForkJoinPool-1-worker-1的i值:23 ForkJoinPool-1-worker-1的i值:24 ForkJoinPool-1-worker-2的i值:70 ForkJoinPool-1-worker-2的i值:71 ForkJoinPool-1-worker-2的i值:72 ForkJoinPool-1-worker-2的i值:73 ForkJoinPool-1-worker-2的i值:74
从上面结果来看,ForkJoinPool启动了两个线程来执行这个打印任务,这是因为笔者的计算机的CPU是双核的。不仅如此,读者可以看到程序虽然打印了0-199这两百个数字,但是并不是连续打印的,这是因为程序将这个打印任务进行了分解,分解后的任务会并行执行,所以不会按顺序从0打印 到199。
二、RecursiveTask
下面以一个有返回值的大任务为例,介绍一下RecursiveTask的用法。
大任务是:计算随机的100个数字的和。
小任务是:每次只能20个数值的和。
import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; //RecursiveTask为ForkJoinTask的抽象子类,有返回值的任务 class SumTask extends RecursiveTask<Integer> { // 每个"小任务"最多只打印50个数 private static final int MAX = 20; private int arr[]; private int start; private int end; SumTask(int arr[], int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 当end-start的值小于MAX时候,开始打印 if ((end - start) < MAX) { for (int i = start; i < end; i++) { sum += arr[i]; } return sum; } else { System.err.println("=====任务分解======"); // 将大任务分解成两个小任务 int middle = (start + end) / 2; SumTask left = new SumTask(arr, start, middle); SumTask right = new SumTask(arr, middle, end); // 并行执行两个小任务 left.fork(); right.fork(); // 把两个小任务累加的结果合并起来 return left.join() + right.join(); } } } public class ForkJoinPoolTest2 { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int arr[] = new int[100]; Random random = new Random(); int total = 0; // 初始化100个数字元素 for (int i = 0; i < arr.length; i++) { int temp = random.nextInt(100); // 对数组元素赋值,并将数组元素的值添加到total总和中 total += (arr[i] = temp); } System.out.println("初始化时的总和=" + total); // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); // 提交可分解的PrintTask任务 Future<Integer> future = forkJoinPool.submit(new SumTask(arr, 0, arr.length)); System.out.println("计算出来的总和=" + future.get()); // 关闭线程池 forkJoinPool.shutdown(); } }
计算结果如下:
初始化时的总和=4283 =====任务分解====== =====任务分解====== =====任务分解====== =====任务分解====== =====任务分解====== =====任务分解====== =====任务分解====== 计算出来的总和=4283
从上面结果来看,ForkJoinPool将任务分解了7次,程序通过SumTask计算出来的结果,和初始化数组时统计出来的总和是相等的,这表明计算结果一切正常。
读者还参考以下文章加深对ForkJoinPool的理解:
http://www.infoq.com/cn/articles/fork-join-introduction/
http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/
==================================================================================================
作者:欧阳鹏 欢迎转载,与人分享是进步的源泉!
转载请保留原文地址:http://blog.csdn.net/ouyang_peng