Java并发编程:阻塞队列BlockingQueue
Alexandria8
7年前
<h3>阻塞队列BlockingQueue简介</h3> <p>阻塞队列BlockingQueue是JDK1.5并发新特性中的内容,阻塞队列首先是一个队列,同样实现了Collection接口。阻塞队列提供了可阻塞的put和take方法,以及支持定时的poll和offer方法。</p> <p>阻塞队列跟普通队列相比,首页它是线程安全的,另外还提供了两个附加操作:当队列为空时,从队列中获取元素的操作将被阻塞;当队列填满是,向队列添加元素将被阻塞。这两个附加操作分别由BlockingQueue提供的两个take和put方法支持。如果队列已经满了,那么put方法将被阻塞直到有空间可用;如果队列为空,那么take方法将被阻塞直到有元素可用。队列可以是有界的也可以是无界的,无界队列永远不会充满,因此在无界队列上面put方法也永远不会被阻塞。</p> <p>BlockingQueue提供了4中类型的处理方法:</p> <table> <tbody> <tr> <th>方法\处理方式</th> <th>抛出异常</th> <th>返回特殊值</th> <th>一直阻塞</th> <th>超时退出</th> </tr> <tr> <td>插入方法</td> <td>add(e)</td> <td>offer(e)</td> <td>put(e)</td> <td>offer(e,time,unit)</td> </tr> <tr> <td>移除方法</td> <td>remove()</td> <td>poll()</td> <td>take()</td> <td>poll(time,unit)</td> </tr> <tr> <td>检查方法</td> <td>element()</td> <td>peek()</td> <td>不可用</td> <td>不可用</td> </tr> </tbody> </table> <ul> <li><strong>抛出异常:</strong> 当队列满时,再向队列中插入元素,则会抛出IllegalStateException异常。当队列空时,再向队列中获取元素,则会抛出NoSuchElementException异常。</li> <li><strong>返回特殊值:</strong> 当队列满时,向队列中添加元素,则返回false,否则返回true。当队列为空时,向队列中获取元素,则返回null,否则返回元素。</li> <li><strong>一直阻塞:</strong> 当阻塞队列满时,如果生产者向队列中插入元素,则队列会一直阻塞当前线程,直到队列可用或响应中断退出。当阻塞队列为空时,如果消费者线程向阻塞队列中获取数据,则队列会一直阻塞当前线程,直到队列空闲或响应中断退出。</li> <li><strong>超时退出:</strong> 当队列满时,如果生产线程向队列中添加元素,则队列会阻塞生产线程一段时间,超过指定的时间则退出返回false。当队列为空时,消费线程从队列中移除元素,则队列会阻塞一段时间,如果超过指定时间退出返回null。</li> </ul> <h3>Java提供的7个阻塞队列</h3> <table> <tbody> <tr> <th>队列</th> <th>有界性</th> <th>锁</th> <th>数据结构</th> </tr> <tr> <td>ArrayBlockingQueue</td> <td>有界</td> <td>加锁</td> <td>arraylist</td> </tr> <tr> <td>LinkedBlockingQueue</td> <td>可选有界</td> <td>加锁</td> <td>单向linkedlist</td> </tr> <tr> <td>PriorityBlockingQueue</td> <td>无界</td> <td>加锁</td> <td>Heap</td> </tr> <tr> <td>DelayQueue</td> <td>无界</td> <td>加锁</td> <td>Heap</td> </tr> <tr> <td>SynchronousQueue</td> <td>有界</td> <td>无锁(JDK1.6)</td> <td>~</td> </tr> <tr> <td>LinkedTransferQueue</td> <td>无界</td> <td>无锁</td> <td>单向linkedlist</td> </tr> <tr> <td>LinkedBlockingDeque</td> <td>可选有界</td> <td>加锁</td> <td>双向linkedlist</td> </tr> </tbody> </table> <p>在多线程环境中,通过队列可以很容易的实现数据共享。在基于队列的生产者-消费者模型中,数据生产时,生产者就把数据放入队列,当消费者准备使用数据时就从队列中取出数据。生产者不需要知道消费者的标识或者数量,或者他们是唯一的生产者。同样,消费者也不需要知道生产者来自何处。BlockingQqueue简化了生产者-消费者的过程,它支持任意数量的生产者-消费者。一种最常见的生产者-消费者模式就是线程池与工作队列的组合,在Executor执行框架中就体现了这种模式。</p> <h3>阻塞队列BlockingQueue的成员介绍</h3> <p>ArrayBlockingQueue</p> <p>ArrayBlockingQueue是一个基于数组的阻塞队列实现,内部维护了一个定长数组,以便缓存数据。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致放入操作受阻塞;试图从空队列中检索元素将导致类似阻塞。ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。</p> <ul> <li><strong>ArrayBlockingQueue(int capacity)</strong> 创建一个带有给定的(固定)容量和默认访问策略(非公平锁)的 ArrayBlockingQueue。capacity是队列容量。</li> <li><strong>ArrayBlockingQueue(int capacity, boolean fair)</strong> 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue。fair访问策略如果为 true,则按照 FIFO 顺序访问插入或移除时受阻塞线程的队列,如果为 false,则访问顺序是不确定的。fair是“可重入的独占锁(ReentrantLock)”的类型。fair为true,表示是公平锁,fair为false,表示是非公平锁。</li> <li>ArrayBlockingQueue(int capacity, boolean fair, Collectionc) 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。</li> </ul> <p>由于ArrayBlockingQueue内部只维护一个ReentrantLock类型的lock锁对象,所以在生成者-消费者模型中,并不能真正的实现并行,这一点不同于LinkedBlockingQueue,LinkedBlockingQueue内部维护了两个锁。事实上ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。</p> <p>ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。</p> <p>LinkedBlockingQueue</p> <p>LinkedBlockingQueue是一个单向链表实现的阻塞队列,支持真正的并行操作,因为内部使用ReentrantLock实现插入锁(putLock)和取出锁(takeLock),维护了两个所对象。其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。</p> <p>在开发中新建LinkedBlockingQueue实例的时候,一般要指定其大小,如果没有指定大小,大小默认是Integer.MAX_VALUE,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。在线程池框架Executors中newSingleThreadExecutor和newFixedThreadPool方法内部维护的都是LinkedBlockingQueue。</p> <p>PriorityBlockingQueue</p> <p>PriorityBlockingQueue是一个按照优先级排序的队列,如果想要某个队列不是按照FIFO的顺序来处理元素,该队列非常有用,内部维护一个堆的数据结构。PriorityBlockingQueue既可以根据元素的自然顺序进行排序,如果元素实现了Comparable接口,也可以根据Comparator进行比较。该队列看似有界队列,实际上它会自动扩容,因此是无界队列,因此在生产者-消费者模型中,生产者并不会真正的阻塞,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是非公平锁。</p> <p>DelayQueue</p> <p>DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。</p> <p>DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。</p> <p>SynchronousQueue</p> <p>SynchronousQueue是这样一种阻塞队列,其中每个 put 必须等待一个take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有,它不会为队列中的元素维护存储空间。与其它队列不同的是,它维护一组线程,这些线程在等待着元素加入或者移除队列。不能在同步队列上进行peek,因为仅在试图要取得元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)添加元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头是尝试添加到队列中的首个已排队线程元素; 如果没有已排队线程,则不添加元素并且头为null。SynchronousQueue类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。</p> <p>SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。</p> <p>创建SynchronousQueue有两种构造方法,一种时SynchronousQueue(),默认采用非公平的形式,从JDK1.6开始SynchronousQueue的实现采用了一种性能更好的无锁算法。竞争机制支持公平和非公平两种:非公平竞争模式使用的数据结构是后进先出栈(LIFO Stack);公平竞争模式则使用先进先出队列(FIFO),性能上两者是相当的,一般情况下,FIFO通常可以支持更大的吞吐量,但LIFO可以更大程度的保持线程的本地化。另外一种SynchronousQueue(boolean fair),可以自己指定访问方式是否采用公平方式。</p> <p>LinkedTransferQueue</p> <p>LinkedTransferQueue是JDK1.7中新引入的队列,该队列的实现基于CAS无锁机制,它也是一个基于链表实现的无界队列。相比前面队列它多transfer和tryTransfer方法。</p> <p>LinkedBlockingDeque</p> <p>LinkedBlockingDeque一个基于已链接节点的、任选范围的阻塞双端队列。可选的容量范围构造方法参数是一种防止过度膨胀的方式。如果未指定容量,那么容量将等于 Integer.MAX_VALUE。只要插入元素不会使双端队列超出容量,每次插入后都将动态地创建链接节点。要想支持阻塞功能,队列的容量一定是固定的,否则无法在入队的时候挂起线程。也就是capacity是final类型的。</p> <h3>阻塞队列示例</h3> <p>这是一个使用LinkedBlockedQueue设计实现的简单的生产者-消费者模式。</p> <pre> <code class="language-java">public class Producer implements Runnable { private volatile boolean isRunning = true; private BlockingQueue<String> queue; private static final int DEFAULT_SLEEP = 1000; private static AtomicInteger count = new AtomicInteger(); public Producer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { String data = null; Random r = new Random(); System.out.println("启动生产者线程!"); try { while (isRunning) { Thread.sleep(r.nextInt(DEFAULT_SLEEP)); data = "data:" + count.incrementAndGet(); queue.put(data); System.out.println("将数据:" + data + "放入队列..."); } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出生产者线程!"); } } public void stop() { isRunning = false; } } public class Consumer implements Runnable { private volatile boolean isRunning = true; private BlockingQueue<String> queue; private static final int DEFAULT_SLEEP = 1000; public Consumer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { System.out.println("启动消费者线程!"); Random r = new Random(); try { while (isRunning) { String data = queue.take(); if (null != data) { System.out.println("正在消费:" + data); Thread.sleep(r.nextInt(DEFAULT_SLEEP)); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出消费者线程!"); } } public void stop() { isRunning = false; } } public class MainTest { public static void main(String[] args) throws InterruptedException { // 声明一个容量为10的缓存队列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 借助Executors ExecutorService service = Executors.newCachedThreadPool(); // 启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 执行20s Thread.sleep(20 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2 * 1000); consumer.stop(); // 退出Executor service.shutdown(); } } </code></pre> <p>如果不使用阻塞队列,使用Object.wait()和Object.notify()非阻塞队列实现生产者-消费者模式,生产者线程在缓冲区为满的时候,消费者在缓冲区为空的时候,都应该暂停运行。然后用notify 和notifyAll通知等待中的线程重新开始执行。</p> <h3>参考资料</h3> <p>Java并发编程实践</p> <p><a href="/misc/goto?guid=4959755059589594030" rel="nofollow,noindex">Java中的阻塞队列(BlockingQueue)</a></p> <p><a href="/misc/goto?guid=4959755059690079442" rel="nofollow,noindex">Java多线程-工具篇-BlockingQueue</a></p> <p><a href="/misc/goto?guid=4959755059785424720" rel="nofollow,noindex">聊聊并发(七)——Java中的阻塞队列</a></p> <p><a href="/misc/goto?guid=4959755059874872446" rel="nofollow,noindex">阻塞队列</a></p> <p><a href="/misc/goto?guid=4959755059966614778" rel="nofollow,noindex">JDK7之前已有的队列实现</a></p> <p><a href="/misc/goto?guid=4959755060067510093" rel="nofollow,noindex">ArrayBlockingQueue</a></p> <p> </p> <p>来自:http://www.sunnyang.com/733.html</p> <p> </p>