2023-01-14
原文作者: HelloWorld_EE 原文地址:https://blog.csdn.net/u010412719/category_6159934_2.html

《Java源码分析》:LinkedBlockingQueue

LinkedBlockingQueue是大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。

1、LinkedBlockingQueue的继承体系结构

        public class LinkedBlockingQueue<E> extends AbstractQueue<E>
                implements BlockingQueue<E>, java.io.Serializable

LinkedBlockingQueue的继承体系结构如下,和ArrayBlockingQueue一模一样。

2、LinkedBlockingQueue的相关属性介绍

1、链表节点

        static class Node<E> {
            E item;
            Node<E> next;
    
            Node(E x) { item = x; }
        }

2、private final int capacity;容量边界,如果没有指定,则为Integer.MAX_VALUE

3、private final AtomicInteger count = new AtomicInteger(); 当前队列中存储元素的数量

4、transient Node head;队列的头结点,且始终head.item=null,即此头结点不存储任何元素。

5、private transient Node last;队列的尾结点,且始终last.next=null.

6、private final ReentrantLock takeLock = new ReentrantLock();take操作时所需要加的锁

7、当take操作时如果队列中存储的元素为空,则调用此Condition的await方法。

        private final Condition notEmpty = takeLock.newCondition();

8、put操作时所需要加的锁和Condition

       private final ReentrantLock putLock = new ReentrantLock();
    
        private final Condition notFull = putLock.newCondition();

从上面可以看到,LinkedBlockingQueue阻塞队列的实现采用了两把锁,一个是take锁,一个是put锁,这就说明,put和take操作是可以同时进行的。

3、LinkedBlockingQueue的构造方法

创造一个容量为Integer.MAX_VALUE的队列

        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }

创造一个指定大小(必须大于零)的队列,如果参数小于等于零,则抛异常。

        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);//头结点和尾结点中存储的元素为null
        }

4、LinkedBlockingQueue的put方法介绍

函数功能:插入一个元素到队列的末尾,如果队列已满则等待直到有空间进行存储

源码如下:(有详细的注释)

        public void put(E e) throws InterruptedException {
            //检查是否为null,如果为null,则抛异常
            if (e == null) throw new NullPointerException();
            //注意:用此局部变量持有一个负数来指示CAS操作是否操作成功。 c = count.getAndIncrement();//利用原子性加一
            int c = -1;
            Node<E> node = new Node<E>(e);//根据当前的值构造一个节点
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();//加锁
            try {
    
                //如果已到达最大容量,则等待直到有空间来进行存储才会被唤醒
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);//将节点加入到链表的末尾
                c = count.getAndIncrement();//利用原子性加一
                //如果当前存储的元素个数小于容量,则唤醒正在等待的生产者的线程。
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();//释放锁
            }
            //队列不为空,刚刚添加了一个元素,因此需要唤醒一个消费者线程
            if (c == 0)
                signalNotEmpty();
        }
    
        /*
         * 将节点加入到链表的末尾
         */
        private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }
    
        //唤醒一个消费者线程
        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }

put方法实现的思想也比较简单,叙述如下:

1、首先检查元素是否为null,如果是,则抛异常,如果不是,则继续进行

2、加锁

3、检查队列是否已满,如果已满,则调用put所对应的Condition的await方法进行等待,直到有signal来将其唤醒。如果没有满,则进行 4.

4、调用enqueue方法将此节点加入到队列中,并将计数器进行加一操作。

本来到这里就差不多结束了,但是还需要处理两种唤醒操作。

5、如果此时队列中存储的元素小于最大容量,则唤醒其他正在等待生产的线程。

6、由于刚刚生产了元素,因此最后还需要唤醒消费者线程。

7、释放锁

put(e)方法的思想相当简单哈,下面来看take方法

LinkedBlockingQueue的take方法介绍

函数功能:获取队列中队首的元素。

源码如下:(有详细的注释)

        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();//加take锁
            try {
                //如果当前队列为空,则一直等待直到队列非空
                while (count.get() == 0) {
                    notEmpty.await();
                }
                x = dequeue();//将头结点取出来
                c = count.getAndDecrement();//利用原子性进行进行减一操作
                //如果此时的容量是大于1的,则说明还有其它元素可以被消费线程获取,因此唤醒其他消费者线程
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();//释放锁
            }
            //如果c的容量是等于capacity,又被消费了一个,因此可以通知生产者线程来进行生产
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
        private E dequeue() {
            // assert takeLock.isHeldByCurrentThread();
            // assert head.item == null;
            Node<E> h = head;
            Node<E> first = h.next;
            h.next = h; // help GC
            //头结点指向下一个节点,并将头结点中的元素置为null。即头结点的item始终为null。
            head = first;
            E x = first.item;
            first.item = null;
            return x;
        }
    
    
        /**
         * 唤醒一个生产者线程
         */
        private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                notFull.signal();
            } finally {
                putLock.unlock();
            }
        }

此方法实现的思想也特别简单,叙述如下:

1、加锁

2、判断队列中是否有元素,如果没有,则阻塞等待直至队列非空被唤醒。如果有,则进行 3

3、取出队列中头结点的下一个节点(头结点为傀儡节点)。并将计数器进行减一操作。

4、如果此时的容量是大于1的,则说明还有其它元素可以被消费线程获取,因此唤醒一个正在等待的消费者线程

5、如果此时的容量小于队列的最大容量,则唤醒正在等待的生产者线程进行生产。

6、释放锁

以上就是关于LinkedBlockingQueue的相关分析,比较简单哈。

小结

关于LinkedBlockingQueue我们只需要记住以下几点

1、是基于链表来实现的

2、使用了两个锁,take、put操作各一把锁。以及借用了两个Condition来进行阻塞和唤醒操作。

阅读全文