《Java源码分析》:LinkedBlockingQueue
LinkedBlockingQueue是大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。
1、LinkedBlockingQueue的继承体系结构
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
LinkedBlockingQueue的继承体系结构如下,和ArrayBlockingQueue一模一样。
2、LinkedBlockingQueue的相关属性介绍
1、链表节点
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
2、private final int capacity;容量边界,如果没有指定,则为Integer.MAX_VALUE
3、private final AtomicInteger count = new AtomicInteger(); 当前队列中存储元素的数量
4、transient Node head;队列的头结点,且始终head.item=null,即此头结点不存储任何元素。
5、private transient Node last;队列的尾结点,且始终last.next=null.
6、private final ReentrantLock takeLock = new ReentrantLock();take操作时所需要加的锁
7、当take操作时如果队列中存储的元素为空,则调用此Condition的await方法。
private final Condition notEmpty = takeLock.newCondition();
8、put操作时所需要加的锁和Condition
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
从上面可以看到,LinkedBlockingQueue阻塞队列的实现采用了两把锁,一个是take锁,一个是put锁,这就说明,put和take操作是可以同时进行的。
3、LinkedBlockingQueue的构造方法
创造一个容量为Integer.MAX_VALUE的队列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
创造一个指定大小(必须大于零)的队列,如果参数小于等于零,则抛异常。
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);//头结点和尾结点中存储的元素为null
}
4、LinkedBlockingQueue的put方法介绍
函数功能:插入一个元素到队列的末尾,如果队列已满则等待直到有空间进行存储
源码如下:(有详细的注释)
public void put(E e) throws InterruptedException {
//检查是否为null,如果为null,则抛异常
if (e == null) throw new NullPointerException();
//注意:用此局部变量持有一个负数来指示CAS操作是否操作成功。 c = count.getAndIncrement();//利用原子性加一
int c = -1;
Node<E> node = new Node<E>(e);//根据当前的值构造一个节点
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();//加锁
try {
//如果已到达最大容量,则等待直到有空间来进行存储才会被唤醒
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);//将节点加入到链表的末尾
c = count.getAndIncrement();//利用原子性加一
//如果当前存储的元素个数小于容量,则唤醒正在等待的生产者的线程。
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();//释放锁
}
//队列不为空,刚刚添加了一个元素,因此需要唤醒一个消费者线程
if (c == 0)
signalNotEmpty();
}
/*
* 将节点加入到链表的末尾
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
//唤醒一个消费者线程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
put方法实现的思想也比较简单,叙述如下:
1、首先检查元素是否为null,如果是,则抛异常,如果不是,则继续进行
2、加锁
3、检查队列是否已满,如果已满,则调用put所对应的Condition的await方法进行等待,直到有signal来将其唤醒。如果没有满,则进行 4.
4、调用enqueue方法将此节点加入到队列中,并将计数器进行加一操作。
本来到这里就差不多结束了,但是还需要处理两种唤醒操作。
5、如果此时队列中存储的元素小于最大容量,则唤醒其他正在等待生产的线程。
6、由于刚刚生产了元素,因此最后还需要唤醒消费者线程。
7、释放锁
put(e)方法的思想相当简单哈,下面来看take方法
LinkedBlockingQueue的take方法介绍
函数功能:获取队列中队首的元素。
源码如下:(有详细的注释)
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//加take锁
try {
//如果当前队列为空,则一直等待直到队列非空
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();//将头结点取出来
c = count.getAndDecrement();//利用原子性进行进行减一操作
//如果此时的容量是大于1的,则说明还有其它元素可以被消费线程获取,因此唤醒其他消费者线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();//释放锁
}
//如果c的容量是等于capacity,又被消费了一个,因此可以通知生产者线程来进行生产
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
//头结点指向下一个节点,并将头结点中的元素置为null。即头结点的item始终为null。
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* 唤醒一个生产者线程
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
此方法实现的思想也特别简单,叙述如下:
1、加锁
2、判断队列中是否有元素,如果没有,则阻塞等待直至队列非空被唤醒。如果有,则进行 3
3、取出队列中头结点的下一个节点(头结点为傀儡节点)。并将计数器进行减一操作。
4、如果此时的容量是大于1的,则说明还有其它元素可以被消费线程获取,因此唤醒一个正在等待的消费者线程
5、如果此时的容量小于队列的最大容量,则唤醒正在等待的生产者线程进行生产。
6、释放锁
以上就是关于LinkedBlockingQueue的相关分析,比较简单哈。
小结
关于LinkedBlockingQueue我们只需要记住以下几点
1、是基于链表来实现的
2、使用了两个锁,take、put操作各一把锁。以及借用了两个Condition来进行阻塞和唤醒操作。