Java使用Fork/Join框架来并行执行任务

jopen 10年前

现代的计算机已经向多CPU方向发展,即使是普通的PC,甚至现在的智能手机、多核处理器已被广泛应用。在未来,处理器的核心数将会发展的越来越多。

虽然硬件上的多核CPU已经十分成熟,但是很多应用程序并未这种多核CPU做好准备,因此并不能很好地利用多核CPU的性能优势。

为了充分利用多CPU、多核CPU的性能优势,级软基软件系统应该可以充分“挖掘”每个CPU的计算能力,决不能让某个CPU处于“空闲”状态。为此,可以考虑把一个任务拆分成多个“小任务”,把多个"小任务"放到多个处理器核心上并行执行。当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

如下面的示意图所示:


第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。

第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。


Java提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合成总的计算结果。

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器。

  •  public ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool
  •  public ForkJoinPool() :以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool

创建ForkJoinPool实例后,可以钓鱼ForkJoinPool的submit(ForkJoinTask<T> task)或者invoke(ForkJoinTask<T> task)来执行指定任务。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它有两个抽象子类:RecursiveAction和RecursiveTask。

  • RecursiveTask代表有返回值的任务
  • RecursiveAction代表没有返回值的任务。


一、RecursiveAction

下面以一个没有返回值的大任务为例,介绍一下RecursiveAction的用法。

大任务是:打印0-200的数值。

小任务是:每次只能打印50个数值。

    import java.util.concurrent.ForkJoinPool;        import java.util.concurrent.RecursiveAction;        import java.util.concurrent.TimeUnit;                //RecursiveAction为ForkJoinTask的抽象子类,没有返回值的任务        class PrintTask extends RecursiveAction {            // 每个"小任务"最多只打印50个数            private static final int MAX = 50;                    private int start;            private int end;                    PrintTask(int start, int end) {                this.start = start;                this.end = end;            }                    @Override            protected void compute() {                // 当end-start的值小于MAX时候,开始打印                if ((end - start) < MAX) {                    for (int i = start; i < end; i++) {                        System.out.println(Thread.currentThread().getName() + "的i值:"                                + i);                    }                } else {                    // 将大任务分解成两个小任务                    int middle = (start + end) / 2;                    PrintTask left = new PrintTask(start, middle);                    PrintTask right = new PrintTask(middle, end);                    // 并行执行两个小任务                    left.fork();                    right.fork();                }            }        }                public class ForkJoinPoolTest {            /**            * @param args            * @throws Exception            */            public static void main(String[] args) throws Exception {                // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool                ForkJoinPool forkJoinPool = new ForkJoinPool();                // 提交可分解的PrintTask任务                forkJoinPool.submit(new PrintTask(0, 200));                forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束                // 关闭线程池                forkJoinPool.shutdown();            }                }  

运行结果如下:
    ForkJoinPool-1-worker-2的i值:75        ForkJoinPool-1-worker-2的i值:76        ForkJoinPool-1-worker-2的i值:77        ForkJoinPool-1-worker-2的i值:78        ForkJoinPool-1-worker-2的i值:79        ForkJoinPool-1-worker-2的i值:80        ForkJoinPool-1-worker-2的i值:81        ForkJoinPool-1-worker-2的i值:82        ForkJoinPool-1-worker-2的i值:83        ForkJoinPool-1-worker-2的i值:84        ForkJoinPool-1-worker-2的i值:85        ForkJoinPool-1-worker-2的i值:86        ForkJoinPool-1-worker-2的i值:87        ForkJoinPool-1-worker-2的i值:88        ForkJoinPool-1-worker-2的i值:89        ForkJoinPool-1-worker-2的i值:90        ForkJoinPool-1-worker-2的i值:91        ForkJoinPool-1-worker-2的i值:92        ForkJoinPool-1-worker-2的i值:93        ForkJoinPool-1-worker-2的i值:94        ForkJoinPool-1-worker-2的i值:95        ForkJoinPool-1-worker-2的i值:96        ForkJoinPool-1-worker-2的i值:97        ForkJoinPool-1-worker-2的i值:98        ForkJoinPool-1-worker-2的i值:99        ForkJoinPool-1-worker-2的i值:50        ForkJoinPool-1-worker-2的i值:51        ForkJoinPool-1-worker-2的i值:52        ForkJoinPool-1-worker-2的i值:53        ForkJoinPool-1-worker-2的i值:54        ForkJoinPool-1-worker-2的i值:55        ForkJoinPool-1-worker-2的i值:56        ForkJoinPool-1-worker-2的i值:57        ForkJoinPool-1-worker-2的i值:58        ForkJoinPool-1-worker-2的i值:59        ForkJoinPool-1-worker-2的i值:60        ForkJoinPool-1-worker-2的i值:61        ForkJoinPool-1-worker-2的i值:62        ForkJoinPool-1-worker-2的i值:63        ForkJoinPool-1-worker-2的i值:64        ForkJoinPool-1-worker-2的i值:65        ForkJoinPool-1-worker-2的i值:66        ForkJoinPool-1-worker-2的i值:67        ForkJoinPool-1-worker-2的i值:68        ForkJoinPool-1-worker-2的i值:69        ForkJoinPool-1-worker-1的i值:175        ForkJoinPool-1-worker-1的i值:176        ForkJoinPool-1-worker-1的i值:177        ForkJoinPool-1-worker-1的i值:178        ForkJoinPool-1-worker-1的i值:179        ForkJoinPool-1-worker-1的i值:180        ForkJoinPool-1-worker-1的i值:181        ForkJoinPool-1-worker-1的i值:182        ForkJoinPool-1-worker-1的i值:183        ForkJoinPool-1-worker-1的i值:184        ForkJoinPool-1-worker-1的i值:185        ForkJoinPool-1-worker-1的i值:186        ForkJoinPool-1-worker-1的i值:187        ForkJoinPool-1-worker-1的i值:188        ForkJoinPool-1-worker-1的i值:189        ForkJoinPool-1-worker-1的i值:190        ForkJoinPool-1-worker-1的i值:191        ForkJoinPool-1-worker-1的i值:192        ForkJoinPool-1-worker-1的i值:193        ForkJoinPool-1-worker-1的i值:194        ForkJoinPool-1-worker-1的i值:195        ForkJoinPool-1-worker-1的i值:196        ForkJoinPool-1-worker-1的i值:197        ForkJoinPool-1-worker-1的i值:198        ForkJoinPool-1-worker-1的i值:199        ForkJoinPool-1-worker-1的i值:150        ForkJoinPool-1-worker-1的i值:151        ForkJoinPool-1-worker-1的i值:152        ForkJoinPool-1-worker-1的i值:153        ForkJoinPool-1-worker-1的i值:154        ForkJoinPool-1-worker-1的i值:155        ForkJoinPool-1-worker-1的i值:156        ForkJoinPool-1-worker-1的i值:157        ForkJoinPool-1-worker-1的i值:158        ForkJoinPool-1-worker-1的i值:159        ForkJoinPool-1-worker-1的i值:160        ForkJoinPool-1-worker-1的i值:161        ForkJoinPool-1-worker-1的i值:162        ForkJoinPool-1-worker-1的i值:163        ForkJoinPool-1-worker-1的i值:164        ForkJoinPool-1-worker-1的i值:165        ForkJoinPool-1-worker-1的i值:166        ForkJoinPool-1-worker-1的i值:167        ForkJoinPool-1-worker-1的i值:168        ForkJoinPool-1-worker-1的i值:169        ForkJoinPool-1-worker-1的i值:170        ForkJoinPool-1-worker-1的i值:171        ForkJoinPool-1-worker-1的i值:172        ForkJoinPool-1-worker-1的i值:173        ForkJoinPool-1-worker-1的i值:174        ForkJoinPool-1-worker-1的i值:125        ForkJoinPool-1-worker-1的i值:126        ForkJoinPool-1-worker-1的i值:127        ForkJoinPool-1-worker-1的i值:128        ForkJoinPool-1-worker-1的i值:129        ForkJoinPool-1-worker-1的i值:130        ForkJoinPool-1-worker-1的i值:131        ForkJoinPool-1-worker-1的i值:132        ForkJoinPool-1-worker-1的i值:133        ForkJoinPool-1-worker-1的i值:134        ForkJoinPool-1-worker-1的i值:135        ForkJoinPool-1-worker-1的i值:136        ForkJoinPool-1-worker-1的i值:137        ForkJoinPool-1-worker-1的i值:138        ForkJoinPool-1-worker-1的i值:139        ForkJoinPool-1-worker-1的i值:140        ForkJoinPool-1-worker-1的i值:141        ForkJoinPool-1-worker-1的i值:142        ForkJoinPool-1-worker-1的i值:143        ForkJoinPool-1-worker-1的i值:144        ForkJoinPool-1-worker-1的i值:145        ForkJoinPool-1-worker-1的i值:146        ForkJoinPool-1-worker-1的i值:147        ForkJoinPool-1-worker-1的i值:148        ForkJoinPool-1-worker-1的i值:149        ForkJoinPool-1-worker-1的i值:100        ForkJoinPool-1-worker-1的i值:101        ForkJoinPool-1-worker-1的i值:102        ForkJoinPool-1-worker-1的i值:103        ForkJoinPool-1-worker-1的i值:104        ForkJoinPool-1-worker-1的i值:105        ForkJoinPool-1-worker-1的i值:106        ForkJoinPool-1-worker-1的i值:107        ForkJoinPool-1-worker-1的i值:108        ForkJoinPool-1-worker-1的i值:109        ForkJoinPool-1-worker-1的i值:110        ForkJoinPool-1-worker-1的i值:111        ForkJoinPool-1-worker-1的i值:112        ForkJoinPool-1-worker-1的i值:113        ForkJoinPool-1-worker-1的i值:114        ForkJoinPool-1-worker-1的i值:115        ForkJoinPool-1-worker-1的i值:116        ForkJoinPool-1-worker-1的i值:117        ForkJoinPool-1-worker-1的i值:118        ForkJoinPool-1-worker-1的i值:119        ForkJoinPool-1-worker-1的i值:120        ForkJoinPool-1-worker-1的i值:121        ForkJoinPool-1-worker-1的i值:122        ForkJoinPool-1-worker-1的i值:123        ForkJoinPool-1-worker-1的i值:124        ForkJoinPool-1-worker-1的i值:25        ForkJoinPool-1-worker-1的i值:26        ForkJoinPool-1-worker-1的i值:27        ForkJoinPool-1-worker-1的i值:28        ForkJoinPool-1-worker-1的i值:29        ForkJoinPool-1-worker-1的i值:30        ForkJoinPool-1-worker-1的i值:31        ForkJoinPool-1-worker-1的i值:32        ForkJoinPool-1-worker-1的i值:33        ForkJoinPool-1-worker-1的i值:34        ForkJoinPool-1-worker-1的i值:35        ForkJoinPool-1-worker-1的i值:36        ForkJoinPool-1-worker-1的i值:37        ForkJoinPool-1-worker-1的i值:38        ForkJoinPool-1-worker-1的i值:39        ForkJoinPool-1-worker-1的i值:40        ForkJoinPool-1-worker-1的i值:41        ForkJoinPool-1-worker-1的i值:42        ForkJoinPool-1-worker-1的i值:43        ForkJoinPool-1-worker-1的i值:44        ForkJoinPool-1-worker-1的i值:45        ForkJoinPool-1-worker-1的i值:46        ForkJoinPool-1-worker-1的i值:47        ForkJoinPool-1-worker-1的i值:48        ForkJoinPool-1-worker-1的i值:49        ForkJoinPool-1-worker-1的i值:0        ForkJoinPool-1-worker-1的i值:1        ForkJoinPool-1-worker-1的i值:2        ForkJoinPool-1-worker-1的i值:3        ForkJoinPool-1-worker-1的i值:4        ForkJoinPool-1-worker-1的i值:5        ForkJoinPool-1-worker-1的i值:6        ForkJoinPool-1-worker-1的i值:7        ForkJoinPool-1-worker-1的i值:8        ForkJoinPool-1-worker-1的i值:9        ForkJoinPool-1-worker-1的i值:10        ForkJoinPool-1-worker-1的i值:11        ForkJoinPool-1-worker-1的i值:12        ForkJoinPool-1-worker-1的i值:13        ForkJoinPool-1-worker-1的i值:14        ForkJoinPool-1-worker-1的i值:15        ForkJoinPool-1-worker-1的i值:16        ForkJoinPool-1-worker-1的i值:17        ForkJoinPool-1-worker-1的i值:18        ForkJoinPool-1-worker-1的i值:19        ForkJoinPool-1-worker-1的i值:20        ForkJoinPool-1-worker-1的i值:21        ForkJoinPool-1-worker-1的i值:22        ForkJoinPool-1-worker-1的i值:23        ForkJoinPool-1-worker-1的i值:24        ForkJoinPool-1-worker-2的i值:70        ForkJoinPool-1-worker-2的i值:71        ForkJoinPool-1-worker-2的i值:72        ForkJoinPool-1-worker-2的i值:73        ForkJoinPool-1-worker-2的i值:74  

从上面结果来看,ForkJoinPool启动了两个线程来执行这个打印任务,这是因为笔者的计算机的CPU是双核的。不仅如此,读者可以看到程序虽然打印了0-199这两百个数字,但是并不是连续打印的,这是因为程序将这个打印任务进行了分解,分解后的任务会并行执行,所以不会按顺序从0打印 到199。

二、RecursiveTask


下面以一个有返回值的大任务为例,介绍一下RecursiveTask的用法。

大任务是:计算随机的100个数字的和。

小任务是:每次只能20个数值的和。

    import java.util.Random;        import java.util.concurrent.ForkJoinPool;        import java.util.concurrent.Future;        import java.util.concurrent.RecursiveTask;                //RecursiveTask为ForkJoinTask的抽象子类,有返回值的任务        class SumTask extends RecursiveTask<Integer> {            // 每个"小任务"最多只打印50个数            private static final int MAX = 20;            private int arr[];            private int start;            private int end;                    SumTask(int arr[], int start, int end) {                this.arr = arr;                this.start = start;                this.end = end;            }                    @Override            protected Integer compute() {                int sum = 0;                // 当end-start的值小于MAX时候,开始打印                if ((end - start) < MAX) {                    for (int i = start; i < end; i++) {                        sum += arr[i];                    }                    return sum;                } else {                    System.err.println("=====任务分解======");                    // 将大任务分解成两个小任务                    int middle = (start + end) / 2;                    SumTask left = new SumTask(arr, start, middle);                    SumTask right = new SumTask(arr, middle, end);                    // 并行执行两个小任务                    left.fork();                    right.fork();                    // 把两个小任务累加的结果合并起来                    return left.join() + right.join();                }            }                }                public class ForkJoinPoolTest2 {            /**            * @param args            * @throws Exception            */            public static void main(String[] args) throws Exception {                int arr[] = new int[100];                Random random = new Random();                int total = 0;                // 初始化100个数字元素                for (int i = 0; i < arr.length; i++) {                    int temp = random.nextInt(100);                    // 对数组元素赋值,并将数组元素的值添加到total总和中                    total += (arr[i] = temp);                }                System.out.println("初始化时的总和=" + total);                // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool                ForkJoinPool forkJoinPool = new ForkJoinPool();                // 提交可分解的PrintTask任务                Future<Integer> future = forkJoinPool.submit(new SumTask(arr, 0,                        arr.length));                System.out.println("计算出来的总和=" + future.get());                // 关闭线程池                forkJoinPool.shutdown();            }                }  

计算结果如下:
    初始化时的总和=4283        =====任务分解======        =====任务分解======        =====任务分解======        =====任务分解======        =====任务分解======        =====任务分解======        =====任务分解======        计算出来的总和=4283  

从上面结果来看,ForkJoinPool将任务分解了7次,程序通过SumTask计算出来的结果,和初始化数组时统计出来的总和是相等的,这表明计算结果一切正常。



读者还参考以下文章加深对ForkJoinPool的理解:

http://www.infoq.com/cn/articles/fork-join-introduction/

http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/



==================================================================================================

  作者:欧阳鹏  欢迎转载,与人分享是进步的源泉!

  转载请保留原文地址:http://blog.csdn.net/ouyang_peng