JDK 1.5 之后,Doug Lea 大神为我们写了很多的工具,整个 concurrent 包基本都是他写的。也为我们程序员写好了很多工具,包括我们之前说的线程池,重入锁,线程协作工具,ConcurrentHashMap 等等,今天我们要讲的是和 ConcurrentHashMap 类似的数据结构,LinkedBolckingQueue,阻塞队列。在生产者消费者模型中,该类可以帮助我们快速的实现业务功能。
- 如何使用?
- 源码分析
1. 如何使用?
我们在生产者消费者模型,生产者向一个数据共享通道存放数据,消费者从相同的数据共享通道获取数据,将生产和消费完全隔离,不仅是生产者消费者,现在流行的消息队列,比如各种MQ,kafka,和这个都差不多。废话不多说,直接来个demo ,看看怎么使用:
public static void main(String[] args) { LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(1024); for (int i = 0; i < 5; i++) { final int num = i; new Thread(() -> { try { for (int j = 0; ; j++) { linkedBlockingQueue.put(num + "号线程的" + j + "号商品"); Thread.sleep(5000); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } for (int i = 0; i < 5; i++) { new Thread(() -> { try { for (; ; ) { System.out.println("消费了" + linkedBlockingQueue.take()); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
注意,这里的 put 方法和 take 方法都是阻塞的,不然就不是阻塞队列了,什么意思呢?如果队列满了,put 方法就会等待,直到队列有空为止,因此该方法使用时需要注意,如果业务即时性很高,那么最好使用带有超时选项的 offer (V,long,TimeUnit),方法,同样, take 方法也是如此,当队列中没有的时候,就会阻塞,直到队列中有数据为止。同样可以使用 poll(long, TimeUnit)方法超时退出。
// 如果满了,立即返回false boolean b = linkedBlockingQueue.offer(""); // 如果满了,则等待到给定的时间,如果还满,则返回false boolean b2 = linkedBlockingQueue.offer("", 1000, TimeUnit.MILLISECONDS); // 阻塞直到插入为止 linkedBlockingQueue.put("");
// 如果队列为空,直接返回null Object o3 = linkedBlockingQueue.poll(); // 如果队列为空,一直阻塞到给定的时间 Object o1 = linkedBlockingQueue.poll(1000, TimeUnit.MILLISECONDS); // 阻塞,直到取出数据 Object o = linkedBlockingQueue.take(); // 获取但不移除此队列的头;如果此队列为空,则返回 null。 Object peek = linkedBlockingQueue.peek();
2. 源码分析
阻塞队列,重点看 put 阻塞方法和 take 阻塞方法。
put 方法:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Nodenode = new Node (e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
- 根据给定的值创建一个 Node 对象,该对象有2个属性,一个是 item,一个是 Node 类型的 next,是链表结构的节点。
- 获取 put 的锁,注意,这里,put 锁和 take 锁是分开的。也就是说,当你插入的时候和取出的时候用的不是一把锁,可以高效并发,但是如果两个线程同时插入就会阻塞。
- 获取链表的长度。
- 使用中断锁,如果调用了线程的中断方法,那么,处于阻塞中的线程就会抛出异常。
- 判断如果当前链表长度达到了设置的长度,默认是 int 最大型,就调用 put 锁的伙伴 Condition 对象 notFull 让当前线程挂起等待。 直到 take 方法中会调用 notFull 对象的 signal 方法唤醒。
- 调用 enqueue 方法,将刚刚创建的 Node 节点连接到链表上。
- 将链表长度变量 count 加一。 判断如果加一后,链表长度还小于链表规定的容量,那么就唤醒其他等待在 notFull 对象上的线程,告诉他们可以取数据了。
- 放开锁,让其他线程争夺锁(非公平锁)。
- 如果c是0,表示队列已经有一个数据了,通知唤醒挂在 notEmpty 的线程,告诉他们可以取数据了。
take 方法如下:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
- 获取链长度,获取 take 锁。
- 调用可中断的 lock 方法。开始锁住。
- 如果队列是空,则挂起线程。开始等待。
- 如果不为空,则调用 dequeue 方法,拿到头节点的数据,并将头节点更新。
- 将队列长度减一。判断如果队列长度大于1,通知等待在 notEmpty 上的线程,可以拿数据了。
- 解锁。
- 如果变量 c 和 容量相同,而刚刚又消费了一个节点,说明队列不满了,则通知生产者可以添加数据了。
- 返回数据。
boolean offer(E e, long timeout, TimeUnit unit) 源码:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
和 put 方法很相似,步骤如下:- 将时间转成纳秒。
- 获取 put 锁。
- 调用可中断锁方法。
- 如果容量满了,并且设置的等待时间小于0,返回 false,表示插入失败,反之,调用 notFull 方法等待给定的时间,并返回一个负数,当第二次循环的时候,继续判断,如果还是满的并且小于0,返回false。
- 如果容量没有满,或者等待过程被唤醒,则调用 enqueue 插入数据。
- 获取当前链表长度。
- 判断链表长度+1是否小于设置的容量。如果小于,则链表没有满,通知生产者可以添加数据了。
- 释放锁。 如果 c 等于 0,表示之前没有数据,但是现在已经加入一个数据了,可以通知其他的消费者来消费了。
- 返回 true。
E poll(long timeout, TimeUnit unit) 源码分析
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
从源码分析中,我们可以看到,整个阻塞队列就是由重入锁和Condition 组合实现的,和我们之前用 synchronized 加上 wait 和 notify 实现很相似,只是楼主的那个例子没有使用队列,因此无法将锁分开,也就是我们之前说的锁分离的技术。那么,整体的性能当然不能和 Doug Lea 大神的比了。
