java之线程池简单实现

13年前

  以前做的东西,实现一个简单的多线程机制,开始之前,现说说原理性的东西吧,下面是我在ibm开发者上搜到的内容

线程池的技术背景

  在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是如此,虚拟机将试图跟踪每一个对象, 以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用 已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因。

  多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。但如果对多线程应用不当,会增加对单个任务的处理时间。可以举一个简单的例子:

  假设在一台服务器完成一项任务的时间为T

      T1 创建线程的时间        T2 在线程中执行任务的时间,包括线程间同步所需时间        T3 线程销毁的时间  

  显然T = T1+T2+T3。注意这是一个极度简化的假设。

  可以看出T1,T3是多线程本身的带来的开销,我们渴望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到 这一点,所以在程序中频繁的创建或销毁线程,这导致T1和T3在T中占有相当比例。显然这是突出了线程的弱点(T1,T3),而不是优点(并发性)。

  线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

  线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目。在看一个例子:

  假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。我们比较利用线程池技术和不利于线程池技术的服务器 处理这些请求时所产生的线程总数。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目或者上限(以下简称线程池尺寸),而如果 服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池尺寸是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处 理请求时浪费时间,从而提高效率。

  这些都是假设,不能充分说明问题,下面我将讨论线程池的简单实现并对该程序进行对比测试,以说明线程技术优点及应用领域。

一般一个简单线程池至少包含下列组成部分

  • 线程池管理器(ThreadPoolManager):用于创建并管理线程池
  • 工作线程(WorkThread): 线程池中线程
  • 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
  • 任务队列:用于存放没有处理的任务。提供一种缓冲机制。

  线程池管理器至少有下列功能:创建线程池,销毁线程池,添加新任务。下面就是小弟的实现,还是欢迎拍砖哈:

public class ThreadPoolManager {      private static ThreadPoolManager instance = null;      private List<Upload> taskQueue = Collections.synchronizedList(new LinkedList<Upload>());//任务队列     private WorkThread[] workQueue ;    //工作线程(真正执行任务的线程)     private static int worker_num = 5;    //工作线程数量(默认工作线程数量是5)     private static int worker_count = 0;            private ThreadPoolManager(){          this(5);      }      private ThreadPoolManager(int num){          worker_num = num;          workQueue = new WorkThread[worker_num];          for(int i=0;i<worker_num;i++){              workQueue[i] = new WorkThread(i);          }      }            public static synchronized ThreadPoolManager getInstance(){          if(instance==null)              instance = new ThreadPoolManager();          return instance;      }            public void addTask(Upload task){          //对任务队列的操作要上锁         synchronized (taskQueue) {              if(task!=null){                  taskQueue.add(task);                  taskQueue.notifyAll();                  System.out.println("task id "+task.getInfo() + " submit!");              }                            }      }            public void BatchAddTask(Upload[] tasks){          //对任务队列的修改操作要上锁         synchronized (taskQueue) {              for(Upload e:tasks){                  if(e!=null){                      taskQueue.add(e);                      taskQueue.notifyAll();                      System.out.println("task id "+e.getInfo() + " submit!");                  }              }                  }      }            public void destory(){          System.out.println("pool begins to destory ...");          for(int i = 0;i<worker_num;i++){              workQueue[i].stopThread();              workQueue[i] = null;          }          //对任务队列的操作要上锁         synchronized (taskQueue) {              taskQueue.clear();          }                    System.out.println("pool ends to destory ...");      }            private class WorkThread extends Thread{          private int taksId ;          private boolean isRuning = true;          private boolean isWaiting = false;                                         public WorkThread(int taskId){              this.taksId= taskId;              this.start();          }                    public boolean isWaiting(){              return isWaiting;          }          // 如果任务进行中时,不能立刻终止线程,需要等待任务完成之后检测到isRuning为false的时候,退出run()方法         public void stopThread(){              isRuning = false;          }                    @Override          public void run() {              while(isRuning){                  Upload temp = null;                  //对任务队列的操作要上锁                 synchronized (taskQueue) {                      //任务队列为空,等待新的任务加入                     while(isRuning&&taskQueue.isEmpty()){                          try {                              taskQueue.wait(20);                          } catch (InterruptedException e) {                              System.out.println("InterruptedException occre...");                              e.printStackTrace();                          }                      }                      if(isRuning)                          temp = taskQueue.remove(0);                  }                  //当等待新任务加入时候,终止线程(调用stopThread函数)造成 temp = null                 if(temp!=null){                      System.out.println("task info: "+temp.getInfo()+ " is begining");                      isWaiting = false;                      temp.uploadPic();                      isWaiting = true;                      System.out.println("task info: "+temp.getInfo()+ " is finished");                  }                  }          }      }  }

  然后定义任务接口(Task):这里我定义的是上传图片的功能接口(这里用抽象类或者接口随你自己).

Upload
public abstract class Upload {      protected String info;      abstract boolean uploadPic();      public String getInfo(){          return info;      }  }

  然后定义具体任务类:我这里简单,让它睡眠2s。当然你也可以定义很多实现Upload的任务类。

TaskUpload
public class TaskUpload extends Upload {            public TaskUpload(String info){          this.info = info;      }      public String getInfo(){          return info;      }      @Override      public boolean uploadPic()  {          // TODO Auto-generated method stub         System.out.println(info+"sleep begin ....");          try {              Thread.sleep(2000);          } catch (InterruptedException e) {              // TODO Auto-generated catch block             e.printStackTrace();          }          System.out.println(info+"sleep end ....");          return false;      }  }

  最后,测试这个简单的线程池:

public class ThreadPoolManagerTest {          public static void main(String[] args) {          // TODO Auto-generated method stub         Upload[] tasks = createBatchTask(7);          ThreadPoolManager pool = ThreadPoolManager.getInstance();          pool.BatchAddTask(tasks);          pool.destory();      }      private static Upload[] createBatchTask(int n){          Upload[] tasks = new TaskUpload[n];          for(int i = 0;i<n ;i++ ){              tasks[i] = new TaskUpload("task id is "+ i);          }          return tasks;      }  }

线程池技术适用范围及应注意的问题

  线程池的应用范围:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速相应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,并出现"OutOfMemory"的错误。



参考:   线程池的介绍及简单实现