Java ThreadPool

jopen 12年前

线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列,主要使用ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue
handler: 线程池对拒绝任务的处理策略

创建新线程

使用ThreadFactory创建新线程.如果没有另外说明,则在同一个ThreadGroup中一律使用 Executors.defaultThreadFactory()创建线程,并且这些线程具有相同的NORM_PRIORITY优先级和非守护进程状态.通过提供不同的ThreadFactory,可以改变线程的名称,线程组,优先级,守护进程状态,等等.如果从newThread返回null时 ThreadFactory未能创建线程,则执行程序将继续运行,但不能执行任何任务.

如果池中当前有多于corePoolSize的线程,则这些多出的线程在空闲时间超过keepAliveTime时将会终止(参见 getKeepAliveTime(java.util.concurrent.TimeUnit)).这提供了当池处于非活动状态时减少资源消耗的方法.如果池后来变得更为活动,则可以创建新的线程.也可以使用方法 setKeepAliveTime(long,java.util.concurrent.TimeUnit)动态地更改此参数.

排队

所有BlockingQueue都可用于传输和保持提交的任务.可以使用此队列与池大小进行交互:

  • 如果运行的线程少于corePoolSize,则Executor始终首选添加新的线程,而不进行排队.
  • 如果运行的线程等于或多于corePoolSize,则Executor始终首选将请求加入队列,而不添加新的线程.
  • 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝.

根据以上的三种类型衍生出三种策略

  1. 直接提交.工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们.在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程.此策略可以避免在处理可能具有内部依赖性的请求集时出现锁.直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务.当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性.
  2. 无界队列.使用无界队列(不具有预定义容量的LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待.这样,创建的线程就不会超过 corePoolSize.(因此maximumPoolSize 的值也就无效了)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列.例如,在 Web 页服务器中.这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性.
  3. 有界队列.当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于有助于防止资源耗尽,但是可能较难调整和控制.队列大小和最大池大小可能需要相互折衷,使用大型队列和小型池可以最大限度地降低 CPU 使用率,操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量.如果任务频繁阻塞(如I/O 操作),则系统可能为超过您许可的更多线程安排时间.使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量.

举例

new ThreadPoolExecutor(2, 3, 30, TimeUnit.SECONDS, new SynchronousQueue(), new RecorderThreadFactory("CookieRecorderPool"), new ThreadPoolExecutor.CallerRunsPolicy());

当池中线程已经有两个正在运行,此时继续来了一个任务a,根据前面介绍的”如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程.”所以A被添加到queue中.

此时又来了一个任务B,且池中的两个线程还没有忙完,但是由于使用的SynchronousQueue,所以一定无法加入进去.此时便满足了上面提到的”如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝.”所以此时会新建一个线程来运行这个任务.但如果这三个任务都还没完成,连续又来了两个任务,第一个添加入queue中,后一个呢?queue中无法插入,而线程数达到了 maximumPoolSize,所以只好执行异常策略了.

所以在使用SynchronousQueue通常要求maximumPoolSize是无界的,这样就可以避免上述情况发生(如果希望限制就直接使用有界队列).对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁.
如何理解?如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中.

使用无界队列策略,即LinkedBlockingQueue,根据前文提到的规则:”如果运行的线程少于corePoolSize,则 Executor始终首选添加新的线程,而不进行排队.”那么当任务继续增加,会发生什么呢?如果运行的线程等于或多于corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程,此时任务变加入队列之中了,对于无界队列来说,总是可以加入的(除非资源耗尽).换句说,永远也不会触发产生新的线程!corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行.所以开发时要防止任务疯涨.

综上所述,在使用时需要注意以下几点:

  1. ThreadPoolExecutor的使用还是很有技巧的,需要关注corePoolSize,maximumPoolSizes以及Queue的选择.
  2. 使用无界queue可能会耗尽系统资源.
  3. 使用有界queue可能不能很好的满足性能,需要调节线程数和queue大小.
  4. 线程数自然也有开销,所以需要根据不同应用进行调节.

来自:http://www.pigg.co/java-threadpool-queue.html