2023-01-14
原文作者: HelloWorld_EE 原文地址:https://blog.csdn.net/u010412719/category_6159934_2.html

《Java源码分析》:BlockingQueue

2016年8月27日21:10:51,今天是学校报道的第一天,从今天开始,我也就研三了,哎,有时候只能感叹时间过的真快,自己的研究生生活只剩下最后一年。在最后的一年里,希望自己的一切都顺利吧,其中包括:找一份好的工作,顺利将手上的项目完成,顺利完成自己的毕设。

说了这么多,还是开始本篇博文的正题吧。在前一段时间研究了下J.U.C包下面并发类库的原理,也都形成了相应的博文,唯独关于阻塞队列BlockingQueue以及它的4个实现类ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue的源码分析形成相应的博客。

今天抽空,又看了下这几个类的源码,也就做点记录并形成了这篇博文。

BlockingQueue

BlockingQueue是一个队列,还额外支持如下的操作:当获取元素时,如果队列为空,则一直等待直到队列非空。当存储元素时,如果队列中没有空间进行存储,则一直等待直到有空间进行存储。

1.BlockingQueue定义的常用方法如下:

Throwsexception Specialvalue Blocks Timesout
Throwsexception Specialvalue Blocks Timesout
Insert add(e) offer(e) put(e) offer(e,time,unit)
Remove remove() poll() take() poll(long,TimeUnit)
Examine element() peek() notapplicable notapplicable

上面就是BlockingQueue接口中提供的方法列表。

1、往队列中添加元素的方法有4钟,分别为:add(e)/offer(e)/put(e)/offer(e,time,unit)

2、往队列中取元素的方法有4种,分别为:remove()/poll()/take()/poll(long,TimeUnit).

3、检查队列中的元素有2种,分别为:element()/peek().

由于put()/take()方法是在并发中会发生阻塞,因此,我们以研究这两种方法的源码实现。

2、ArrayBlockingQueue

ArrayBlockingQueue是一个基于数组且有界的阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。即队列头保存的是在队列中待的时间最长的元素。队列尾则是待的时间最短的元素。元素插入到对尾,在队首获取元素。

2.1ArrayBlockingQueue继承体系结构

        public class ArrayBlockingQueue<E> extends AbstractQueue<E>
                implements BlockingQueue<E>, java.io.Serializable

即继承了AbstractQueue接口并实现了BlockingQueue、Serializable接口。

2.2、ArrayBlockingQueue的相关属性

        final Object[] items;
        int takeIndex;
        int putIndex;
        int count;
    
        /** Main lock guarding all access */
        final ReentrantLock lock;
    
        /** Condition for waiting takes */
        private final Condition notEmpty;
    
        /** Condition for waiting puts */
        private final Condition notFull;

ArrayBlockingQueue是基于数组的,因此会有一个数组来保存元素。还有两个指针来指向头结点和为节点的位置。当然还存在锁和Condition。Condition的原理和应用相当简单(调用await方法的线程会被阻塞直到其它线程发送signal来唤醒或者是被中断)。对Condition不熟悉的朋友,可以看我之前对Condition类分析的这篇博文:http://blog.csdn.net/u010412719/article/details/52089561

2.3 ArrayBlockingQueue的构造方法

        //创建一个指定大小的队列对象。
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }

构造方法就是创建一个指定大小的队列对象。要说明的一点是第二个参数,fair,如果为true,则表示创建一个公平的队列,即所有等待的消费者或者是生产者是按照顺序来访问这个队列。

在API文档中,对公平性由如下的介绍:此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。

2.4 ArrayBlockingQueue类中的put方法

函数功能:插入一个元素到队列的末尾,如果队列已满则等待

源代码如下:(有详细的注释)

        public void put(E e) throws InterruptedException {
            checkNotNull(e);//检查是否为空,如果为空,则抛NullPointerException
            //获取锁,
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                //检查是否已满,如果已满,则调用Condition的await方法等待并释放锁
                while (count == items.length)
                    notFull.await();
                enqueue(e);//如果没满,则直接加入到队列中
            } finally {
                lock.unlock();//最后释放锁
            }
        }
    
        private static void checkNotNull(Object v) {
            if (v == null)
                throw new NullPointerException();
        }
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)//转换下指针
                putIndex = 0;
            count++;
            //当添加元素后,则唤醒一个消费者
            notEmpty.signal();
        }

函数功能:插入一个元素到队列的末尾,如果队列已满则等待

此方法的实现思路如下:

1、判断要添加的元素是否为null,如果为null,则抛异常。否则进行 2

2、加锁

3、检测队列是否已满,如果已满,则等待并释放锁(condtion的await方法会释放锁),如果没有满,则将元素加入到队列中即可。

4、释放锁

看了put方法的源码,是不是思路相当的清晰、简单哈。

2.5 ArrayBlockingQueue类中的take方法

源码如下:

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                //如果队列中存储的元素为空,则等待直至队列中非空
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Extracts element at current take position, advances, and signals.
         * Call only when holding lock.
         */
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;//置为null
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            //唤醒等待的生产者
            notFull.signal();
            return x;
        }

函数功能:取出队列中队首的元素,如果为空,则等待直至队列为非空。

实现思路如下:

1、加锁

2、检查队列是否为空,如果为空,则阻塞等待,否则取出队首的元素并返回。

3、释放锁。

是不是特简单,相信写过生产消费者模型的我们对上面put、take方法实现的代码特别熟悉哈,因为ArrayBlockingQueue的实现就是和生产消费者模型思路一模一样

ArrayBlockingQueue中的其它插入元素的方法基本和put方法一致,获取元素的其它方法与take方法基本一致,这里就不对这些方法的实现一一详细介绍了哈。

小结

关于ArrayBlockingQueue的介绍就到这里为止了哈,确实实现思想特别简单哈。

需要我们记住的是:

1、ArrayBlockingQueue队列是基于数组+Condition类来实现的。

2、线程安全是通过ReentrantLock来保证的。

3、队列中不允许元素为null.

关于BlockingQueue的其它实现类LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue将在明天介绍。

阅读全文