转载java_eye(http://www.iteye.com/topic/711162)
一:分析题目
从题中可以看到“很大的List”以及“充分利用多核CPU”,这就已经充分告诉我们要采用多线程(任务)进行编写。具体怎么做呢?大概的思路就是分割List,每一小块的List采用一个线程(任务)进行计算其和,最后等待所有的线程(任务)都执行完后就可得到这个“很大的List”中所有整数的和。
二:具体分析和技术方案
既然我们已经决定采用多线程(任务),并且还要分割List,每一小块的List采用一个线程(任务)进行计算其和,那么我们必须要等待所有的线程(任务)完成之后才能得到正确的结果,那么怎么才能保证“等待所有的线程(任务)完成之后输出结果呢”?这就要靠java.util.concurrent包中的CyclicBarrier类了。它是一个同步辅助类,它允许一组线程(任务)互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程(任务)的程序中,这些线程(任务)必须不时地互相等待,此时 CyclicBarrier 很有用。简单的概括其适应场景就是:当一组线程(任务)并发的执行一件工作的时候,必须等待所有的线程(任务)都完成时才能进行下一个步骤。具体技术方案步骤如下:
示意图如下:
三:详细编码实现
代码中有很详细的注释,这里就不解释了。
有人可能对barrier=new CyclicBarrier(threadCounts+1);//创建的线程数和主线程main有点不解,不是采用的线程(任务)数是threadCounts个吗?怎么为CyclicBarrier设置的给定数量的线程参与者比我们要采用的线程数多一个呢?答案就是这个多出来的一个用于控制main主线程的,主线程也要等待,它要等待其他所有的线程完成才能输出sum值,这样才能保证sum值的正确性,如果main不等待的话,那么结果将是不可预料的。
四:总结
本文主要通过一个淘宝的面试题为引子,介绍了并发的一点小知识,主要是介绍通过CyclicBarrier同步辅助器辅助多个并发任务共同完成一件工作。Java SE5的java.util.concurrent引入了大量的设计来解决并发问题,使用它们有助于我们编写更加简单而健壮的并发程序。
附mathfox提到的ExecutorService.invokeAll()方法的实现
这个不用自己控制等待,invokeAll执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。sdh5724也说用了同步,性能不好。这个去掉了同步,根据返回结果的 Future 列表相加就得到总和了。
从题中可以看到“很大的List”以及“充分利用多核CPU”,这就已经充分告诉我们要采用多线程(任务)进行编写。具体怎么做呢?大概的思路就是分割List,每一小块的List采用一个线程(任务)进行计算其和,最后等待所有的线程(任务)都执行完后就可得到这个“很大的List”中所有整数的和。
二:具体分析和技术方案
既然我们已经决定采用多线程(任务),并且还要分割List,每一小块的List采用一个线程(任务)进行计算其和,那么我们必须要等待所有的线程(任务)完成之后才能得到正确的结果,那么怎么才能保证“等待所有的线程(任务)完成之后输出结果呢”?这就要靠java.util.concurrent包中的CyclicBarrier类了。它是一个同步辅助类,它允许一组线程(任务)互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程(任务)的程序中,这些线程(任务)必须不时地互相等待,此时 CyclicBarrier 很有用。简单的概括其适应场景就是:当一组线程(任务)并发的执行一件工作的时候,必须等待所有的线程(任务)都完成时才能进行下一个步骤。具体技术方案步骤如下:
- 分割List,根据采用的线程(任务)数平均分配,即list.size()/threadCounts。
- 定义一个记录“很大List”中所有整数和的变量sum,采用一个线程(任务)处理一个分割后的子List,计算子List中所有整数和(subSum),然后把和(subSum)累加到sum上。
- 等待所有线程(任务)完成后输出总和(sum)的值。
示意图如下:
三:详细编码实现
代码中有很详细的注释,这里就不解释了。
- /**
- * 计算List中所有整数的和<br>
- * 采用多线程,分割List计算
- * @author 飞雪无情
- * @since 2010-7-12
- */
- public class CountListIntegerSum {
- private long sum;//存放整数的和
- private CyclicBarrier barrier;//障栅集合点(同步器)
- private List<Integer> list;//整数集合List
- private int threadCounts;//使用的线程数
- public CountListIntegerSum(List<Integer> list,int threadCounts) {
- this.list=list;
- this.threadCounts=threadCounts;
- }
- /**
- * 获取List中所有整数的和
- * @return
- */
- public long getIntegerSum(){
- ExecutorService exec=Executors.newFixedThreadPool(threadCounts);
- int len=list.size()/threadCounts;//平均分割List
- //List中的数量没有线程数多(很少存在)
- if(len==0){
- threadCounts=list.size();//采用一个线程处理List中的一个元素
- len=list.size()/threadCounts;//重新平均分割List
- }
- barrier=new CyclicBarrier(threadCounts+1);
- for(int i=0;i<threadCounts;i++){
- //创建线程任务
- if(i==threadCounts-1){//最后一个线程承担剩下的所有元素的计算
- exec.execute(new SubIntegerSumTask(list.subList(i*len,list.size())));
- }else{
- exec.execute(new SubIntegerSumTask(list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1))));
- }
- }
- try {
- barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处
- } catch (InterruptedException e) {
- System.out.println(Thread.currentThread().getName()+":Interrupted");
- } catch (BrokenBarrierException e) {
- System.out.println(Thread.currentThread().getName()+":BrokenBarrier");
- }
- exec.shutdown();
- return sum;
- }
- /**
- * 分割计算List整数和的线程任务
- * @author lishuai
- *
- */
- public class SubIntegerSumTask implements Runnable{
- private List<Integer> subList;
- public SubIntegerSumTask(List<Integer> subList) {
- this.subList=subList;
- }
- public void run() {
- long subSum=0L;
- for (Integer i : subList) {
- subSum += i;
- }
- synchronized(CountListIntegerSum.this){//在CountListIntegerSum对象上同步
- sum+=subSum;
- }
- try {
- barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处
- } catch (InterruptedException e) {
- System.out.println(Thread.currentThread().getName()+":Interrupted");
- } catch (BrokenBarrierException e) {
- System.out.println(Thread.currentThread().getName()+":BrokenBarrier");
- }
- System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum);
- }
- }
- }
/** * 计算List中所有整数的和<br> * 采用多线程,分割List计算 * @author 飞雪无情 * @since 2010-7-12 */ public class CountListIntegerSum { private long sum;//存放整数的和 private CyclicBarrier barrier;//障栅集合点(同步器) private List<Integer> list;//整数集合List private int threadCounts;//使用的线程数 public CountListIntegerSum(List<Integer> list,int threadCounts) { this.list=list; this.threadCounts=threadCounts; } /** * 获取List中所有整数的和 * @return */ public long getIntegerSum(){ ExecutorService exec=Executors.newFixedThreadPool(threadCounts); int len=list.size()/threadCounts;//平均分割List //List中的数量没有线程数多(很少存在) if(len==0){ threadCounts=list.size();//采用一个线程处理List中的一个元素 len=list.size()/threadCounts;//重新平均分割List } barrier=new CyclicBarrier(threadCounts+1); for(int i=0;i<threadCounts;i++){ //创建线程任务 if(i==threadCounts-1){//最后一个线程承担剩下的所有元素的计算 exec.execute(new SubIntegerSumTask(list.subList(i*len,list.size()))); }else{ exec.execute(new SubIntegerSumTask(list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1)))); } } try { barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处 } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName()+":Interrupted"); } catch (BrokenBarrierException e) { System.out.println(Thread.currentThread().getName()+":BrokenBarrier"); } exec.shutdown(); return sum; } /** * 分割计算List整数和的线程任务 * @author lishuai * */ public class SubIntegerSumTask implements Runnable{ private List<Integer> subList; public SubIntegerSumTask(List<Integer> subList) { this.subList=subList; } public void run() { long subSum=0L; for (Integer i : subList) { subSum += i; } synchronized(CountListIntegerSum.this){//在CountListIntegerSum对象上同步 sum+=subSum; } try { barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处 } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName()+":Interrupted"); } catch (BrokenBarrierException e) { System.out.println(Thread.currentThread().getName()+":BrokenBarrier"); } System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum); } } }
有人可能对barrier=new CyclicBarrier(threadCounts+1);//创建的线程数和主线程main有点不解,不是采用的线程(任务)数是threadCounts个吗?怎么为CyclicBarrier设置的给定数量的线程参与者比我们要采用的线程数多一个呢?答案就是这个多出来的一个用于控制main主线程的,主线程也要等待,它要等待其他所有的线程完成才能输出sum值,这样才能保证sum值的正确性,如果main不等待的话,那么结果将是不可预料的。
- /**
- * 计算List中所有整数的和测试类
- * @author 飞雪无情
- * @since 2010-7-12
- */
- public class CountListIntegerSumMain {
- /**
- * @param args
- */
- public static void main(String[] args) {
- List<Integer> list = new ArrayList<Integer>();
- int threadCounts = 10;//采用的线程数
- //生成的List数据
- for (int i = 1; i <= 1000000; i++) {
- list.add(i);
- }
- CountListIntegerSum countListIntegerSum=new CountListIntegerSum(list,threadCounts);
- long sum=countListIntegerSum.getIntegerSum();
- System.out.println("List中所有整数的和为:"+sum);
- }
- }
/** * 计算List中所有整数的和测试类 * @author 飞雪无情 * @since 2010-7-12 */ public class CountListIntegerSumMain { /** * @param args */ public static void main(String[] args) { List<Integer> list = new ArrayList<Integer>(); int threadCounts = 10;//采用的线程数 //生成的List数据 for (int i = 1; i <= 1000000; i++) { list.add(i); } CountListIntegerSum countListIntegerSum=new CountListIntegerSum(list,threadCounts); long sum=countListIntegerSum.getIntegerSum(); System.out.println("List中所有整数的和为:"+sum); } }
四:总结
本文主要通过一个淘宝的面试题为引子,介绍了并发的一点小知识,主要是介绍通过CyclicBarrier同步辅助器辅助多个并发任务共同完成一件工作。Java SE5的java.util.concurrent引入了大量的设计来解决并发问题,使用它们有助于我们编写更加简单而健壮的并发程序。
附mathfox提到的ExecutorService.invokeAll()方法的实现
这个不用自己控制等待,invokeAll执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。sdh5724也说用了同步,性能不好。这个去掉了同步,根据返回结果的 Future 列表相加就得到总和了。
- /**
- * 使用ExecutorService的invokeAll方法计算
- * @author 飞雪无情
- *
- */
- public class CountSumWithCallable {
- /**
- * @param args
- * @throws InterruptedException
- * @throws ExecutionException
- */
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- int threadCounts =19;//使用的线程数
- long sum=0;
- ExecutorService exec=Executors.newFixedThreadPool(threadCounts);
- List<Callable<Long>> callList=new ArrayList<Callable<Long>>();
- //生成很大的List
- List<Integer> list = new ArrayList<Integer>();
- for (int i = 0; i <= 1000000; i++) {
- list.add(i);
- }
- int len=list.size()/threadCounts;//平均分割List
- //List中的数量没有线程数多(很少存在)
- if(len==0){
- threadCounts=list.size();//采用一个线程处理List中的一个元素
- len=list.size()/threadCounts;//重新平均分割List
- }
- for(int i=0;i<threadCounts;i++){
- final List<Integer> subList;
- if(i==threadCounts-1){
- subList=list.subList(i*len,list.size());
- }else{
- subList=list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1));
- }
- //采用匿名内部类实现
- callList.add(new Callable<Long>(){
- public Long call() throws Exception {
- long subSum=0L;
- for(Integer i:subList){
- subSum+=i;
- }
- System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum);
- return subSum;
- }
- });
- }
- List<Future<Long>> futureList=exec.invokeAll(callList);
- for(Future<Long> future:futureList){
- sum+=future.get();
- }
- exec.shutdown();
- System.out.println(sum);
- }
- }
/** * 使用ExecutorService的invokeAll方法计算 * @author 飞雪无情 * */ public class CountSumWithCallable { /** * @param args * @throws InterruptedException * @throws ExecutionException */ public static void main(String[] args) throws InterruptedException, ExecutionException { int threadCounts =19;//使用的线程数 long sum=0; ExecutorService exec=Executors.newFixedThreadPool(threadCounts); List<Callable<Long>> callList=new ArrayList<Callable<Long>>(); //生成很大的List List<Integer> list = new ArrayList<Integer>(); for (int i = 0; i <= 1000000; i++) { list.add(i); } int len=list.size()/threadCounts;//平均分割List //List中的数量没有线程数多(很少存在) if(len==0){ threadCounts=list.size();//采用一个线程处理List中的一个元素 len=list.size()/threadCounts;//重新平均分割List } for(int i=0;i<threadCounts;i++){ final List<Integer> subList; if(i==threadCounts-1){ subList=list.subList(i*len,list.size()); }else{ subList=list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1)); } //采用匿名内部类实现 callList.add(new Callable<Long>(){ public Long call() throws Exception { long subSum=0L; for(Integer i:subList){ subSum+=i; } System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum); return subSum; } }); } List<Future<Long>> futureList=exec.invokeAll(callList); for(Future<Long> future:futureList){ sum+=future.get(); } exec.shutdown(); System.out.println(sum); } }