2023-01-14
原文作者: HelloWorld_EE 原文地址:https://blog.csdn.net/u010412719/category_6159934_2.html

《Java源码分析》:CountDownLatch

Latch:闭锁。

有人把Latch比喻成是一个门,在门打开之前,所有想进门的线程都被阻塞,在门打开之后,所有想进门的线程全部通过,且门打开之后就不能再关闭。

CountDownLatch是一个同步辅助类,允许一个或多个线程等待直到其它线程的一些操作已经准备完成。

CountDownLatch是JDK 5+里面闭锁的一个实现,允许一个或者多个线程等待某个事件的发生。CountDownLatch有一个正数计数器,countDown方法对计数器做减操作,await方法等待计数器达到0。所有await的线程都会阻塞直到计数器为0或者等待线程中断或者超时。

即CountDownLatch里面有一个计数器,当计数器不为零时所有线程一直阻塞。当计数器为零时,则所有等待此门的线程就全部唤醒开始工作。

下面这个例子就很好的介绍了CountDownLatch的含义和用法。

        import java.util.concurrent.CountDownLatch;
    
        public class CountDownLatchDemo {
            private static final int NUM = 10;
            private static CountDownLatch  doneSignal = new CountDownLatch(NUM);
            private static CountDownLatch startSignal = new CountDownLatch(1);
    
            public static void main(String[] args) {
                for(int i=0;i<NUM;i++){
                    new Thread(){
    
                        @Override
                        public void run() {
                            try {
                                //System.out.println(Thread.currentThread().getName()+"   等待一个signal....");
                                startSignal.await();
                                System.out.println(Thread.currentThread().getName()+"  is running...");
                                doneSignal.countDown();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                    }.start();
                }
                //模拟为其它线程的运行准备资源,例如,在所有准备从数据库中读数据的线程之前连接好数据库等操作
                init();
                startSignal.countDown();//运行到这里,就会将上面的线程全部激活
                try {
                    System.out.println("main线程awaiting....");
                    doneSignal.await();//main线程在这里等待,等到上面的所有线程全部执行完毕后
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("main线程又开始运行");
                System.out.println("main线程运行结束");
    
            }
    
            private static void init() {
                System.out.println("main为所有的线程的运行做准备。。。。");
            }
    
        }

运行结果:

    main为所有的线程的运行做准备。。。。
    main线程awaiting....
    Thread-0  is running...
    Thread-2  is running...
    Thread-4  is running...
    Thread-6  is running...
    Thread-8  is running...
    Thread-1  is running...
    Thread-3  is running...
    Thread-5  is running...
    Thread-7  is running...
    Thread-9  is running...
    main线程又开始运行
    main线程运行结束

源码分析

首先看下CountDownLa的构造函数,构造函数需要传入一个大于的零的数。

从构造函数中可以看到,CountDownLatch类是直接委托给实现了AQS类的内部类Sync类实现的。

        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        Sync(int count) {
            setState(count);//调用AQS类的setState设置状态位
        }

分析await()方法的内部实现

        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        /*
        具体如下:
        1、检测中断标志位
        2、调用tryAcquireShared方法来检查AQS标志位state是否等于0,如果state等于0,则说明不需要等待,立即返回,否则进行3
        3、调用doAcquireSharedInterruptibly方法进入AQS同步队列进行等待,并不断的自旋检测是否需要唤醒
        */
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
    
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
        /*
            函数功能:根据AQS的状态位state来返回值,
            如果为state=0,返回 1
            如果state=1,则返回-1
        */
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
    
        /**
         * Acquires in shared interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {//如果大于零,则说明需要唤醒
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

调用countDown方法的内部实现

        /**
         * Decrements the count of the latch, releasing all waiting threads if
         * the count reaches zero.
         *
         * <p>If the current count is greater than zero then it is decremented.
         * If the new count is zero then all waiting threads are re-enabled for
         * thread scheduling purposes.
         *
         * <p>If the current count equals zero then nothing happens.
         */
        public void countDown() {
            sync.releaseShared(1);
        }
    
    
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();//释放所有正在等待的线程节点
                return true;
            }
            return false;
        }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

由于里面的代码逻辑和前面博文中介绍Semaphore类、ReentrantLock基本一致,这里就不再介绍。

小结

只需要记住:CountDownLatch是一个同步辅助类,当CountDownLatch类中的计数器减少为零之前所有调用await方法的线程都会被阻塞,如果计数器减少为零,则所有线程被唤醒继续运行。

一般的应用场景为:

1、其它的一些线程需要某个线程做准备工作。例如:数据库的连接等。

2、某个线程需要等待一些线程工作完之后清理资源。断开数据库的连接等。

阅读全文