正如每个Java文档所描述的那样,CountDownLatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。在Java并发中,countdownlatch的概念是一个常见的面试题,所以一定要确保你很好的理解了它。
CountDownLatch是什么 CountDownLatch是在java1.5被引入的,跟它一起被引入的并发工具类还有CyclicBarrier和Semaphore,它们都存在于java.util.concurrent包下,后面会讲解另外俩个。CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
如上图:TA主线程会一直等待,等待T1、T2和T3将计数器减为0,才继续执行。
例子 在这个例子中,我模拟了一个应用程序启动类,它开始时启动了n个线程类,这些线程将检查外部系统并通知闭锁,并且启动类一直在闭锁上等待着。一旦验证和检查了所有外部服务,那么启动类恢复执行。
BaseHealthChecker.java: 这个类是一个Runnable,负责所有特定的外部服务健康的检测。它删除了重复的代码和闭锁的中心控制代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package JUC.tools.CountDownLathDemo;import java.util.concurrent.CountDownLatch;public abstract class BaseHealthChecker implements Runnable { private CountDownLatch _latch; private String _serviceName; private boolean _serviceUp; public BaseHealthChecker (String serviceName, CountDownLatch latch) { super (); this ._latch = latch; this ._serviceName = serviceName; this ._serviceUp = false ; } @Override public void run () { try { verifyService(); _serviceUp = true ; } catch (Throwable t) { t.printStackTrace(System.err); _serviceUp = false ; } finally { if (_latch != null ) { _latch.countDown(); } } } public String getServiceName () { return _serviceName; } public boolean isServiceUp () { return _serviceUp; } public abstract void verifyService () ; }
NetworkHealthChecker.java: 这个类继承了BaseHealthChecker,实现了verifyService()方法。DatabaseHealthChecker.java 和CacheHealthChecker.java 除了服务名和休眠时间外,与NetworkHealthChecker.java是一样的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package JUC.tools.CountDownLathDemo;import java.util.concurrent.CountDownLatch;public class CacheHealthChecker extends BaseHealthChecker { public CacheHealthChecker (CountDownLatch latch) { super ("CacheHealthChecker" , latch); } @Override public void verifyService () { System.out.println("Checking " + this .getServiceName()); try { Thread.sleep(7000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this .getServiceName() + " is UP" ); } }
ApplicationStartupUtil.java: 这个类是一个主启动类,它负责初始化闭锁,然后等待,直到所有服务都被检测完。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package JUC.tools.CountDownLathDemo;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.Executor;import java.util.concurrent.Executors;public class ApplicationStartupUtil { public static void main (String[] args) { boolean result = false ; try { result = ApplicationStartupUtil.checkExternalServices(); } catch (Exception e) { e.printStackTrace(); } System.out.println("External services validation completed !! Result was :: " + result); } private static List<BaseHealthChecker> _services; private static CountDownLatch _latch; private ApplicationStartupUtil () { } private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil (); public static ApplicationStartupUtil getInstance () { return INSTANCE; } public static boolean checkExternalServices () throws Exception { _latch = new CountDownLatch (3 ); _services = new ArrayList <BaseHealthChecker>(); _services.add(new NetworkHealthChecker (_latch)); _services.add(new CacheHealthChecker (_latch)); _services.add(new DatabaseHealthChecker (_latch)); Executor executor = Executors.newFixedThreadPool(_services.size()); for (final BaseHealthChecker v : _services) { executor.execute(v); } _latch.await(); for (final BaseHealthChecker v : _services) { if (!v.isServiceUp()) { return false ; } } return true ; } }
运行结果
1 2 3 4 5 6 7 Checking networkService Checking CacheHealthChecker Checking DatabaseHealth networkService is UP CacheHealthChecker is UP DatabaseHealth is UP External services validation completed !! Result was :: true
源码解析 CountDownLath的类图如下
CountDownLatch是通过共享锁实现的。下面,我们分析CountDownLatch中3个核心函数: CountDownLatch(int count), await(), countDown()。
首先看一下构造函数
1 2 3 4 5 public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new Sync (count); }
里面很简单,创建一个Sync对象,下面看看Sync类,这个类继承了AQS,源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; Sync(int count) { setState(count); } int getCount () { return getState(); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc = = 0 ; } } }
await 这个函数想当我我们前面的lock函数,只是这里换了个名称而已,源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); }
从上面可以看出,这里await功能是通过获取共享锁来实现的。在我们构造CountDownLath对象的时候,闯进来一个state值,因此默认情况下,这里是获取到不到锁,从而使得线程进入阻塞状态。tryAcquireShared函数比较简单,这里就不仔细说这个函数,可以看上米娜Sync对象中的定义。下面看看doAcquireSharedInterruptibly这个函数,源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
上面流程还是比较简单,具体细节可以看上面的注释,主要的细节在前面的文章中已经解释过,下面重点解释一下setHeadAndPropagate这个函数。
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
主要流程就是判断当前队列中是否有后续需要唤醒获取共享锁的节点,如果有则调用doReleaseShared唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
上面注释已经解释了整体的逻辑,这里重点解释ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
这个条件,通常情况下,如果头结点释放,那么会唤醒后继节点,但是出现这里的情况是因为队列里面只有当前节点,没有需要唤醒的节点,但是如果我们在改变当前节点状态过程中如果有后继节点计入阻塞队列,因为我们已经释放了此节点,会造成后继节点没有唤醒而造成阻塞。因此这里循环判断,直到有新的节点入队。而新节点入队,就会改变当前节点状态,从而使得循环结束。不过出现这种情况很少,因为共享锁获取的时候,入队的节点都会通过shouldParkAfterFailedAcquire将前继节点值设置成为SIGNAL。所以一般情况先,在循环的第一步就会结束。
countDown() 1 2 3 4 5 6 7 8 9 10 11 public void countDown () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
该函数实际上调用releaseShared(1)释放共享锁,releaseShared()在AQS中实现。releaseShared()的目的是让当前线程释放它所持有的共享锁。它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁,上面已经解释过这个函数。先买看看CountDownLath中实现的tryReleaseShared。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc = = 0 ; } }
tryReleaseShared()的作用是释放共享锁,将锁计数器的值减1。
总结 CountDownLatch是通过共享锁实现的。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是锁计数器的初始状态,表示该“享锁最多能被count给线程同时获取。当某线程调用该CountDownLatch对象的await()方法时,该线程会等待共享锁可用时,才能获取“共享锁进而继续运行。而共享锁可用的条件,就是锁计数器的值为0!而锁计数器的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将锁计数器减1;通过这种方式,必须有count个线程调用countDown()之后,锁计数器才为0,而前面提到的等待线程才能继续运行!
参考
Java多线程系列–“JUC锁”09之 CountDownLatch原理和示例
什么时候使用CountDownLatch