搜搜吧

查看: 51|回复: 0

Java高并发编程基础三大利器之CyclicBarrier

[复制链接]

托儿所

1万

主题

3万

帖子

8万

积分

Rank: 1

UID
15150
威望
32
搜搜币
45207
在线时间
261 小时
注册时间
2015-10-2
发表于 2021-3-18 10:11:23 | 显示全部楼层 |阅读模式


5bad993e3a205e26efd15cb4cdf5f49b.jpg-wh_651x-s_2612015580.jpg 引言

前面一篇文章我们《Java高并发编程基础三大利器之CountDownLatch》它有一个缺点,就是它的计数器只能够使用一次,也就是说当计数器(state)减到为 0的时候,如果 再有线程调用去 await() 方法,该线程会直接通过,不会再起到等待其他线程执行结果起到同步的作用。为了解决这个问题CyclicBarrier就应运而生了。

什么是CyclicBarrier

CyclicBarrier是什么?把它拆开来翻译就是循环(Cycle)和屏障(Barrier)

e2bea8bbfc913e95288291a6e814f234.png-wh_600x-s_77403869.png

它的主要作用其实和CountDownLanch差不多,都是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障会被打开,所有被屏障阻塞的线程才会继续执行,不过它是可以循环执行的,这是它与CountDownLanch最大的不同。CountDownLanch是只有当最后一个线程把计数器置为0的时候,其他阻塞的线程才会继续执行。学习CyclicBarrier之前建议先去看看这几篇文章:

《Java高并发编程基础之AQS》

《Java高并发编程基础三大利器之Semaphore》

《Java高并发编程基础三大利器之CountDownLatch》

如何使用

我们首先先来看下关于使用CyclicBarrier的一个demo:比如游戏中有个关卡的时候,每次进入下一关的时候都需要进行加载一些地图、特效背景音乐什么的只有全部加载完了才能够进行游戏:

  • /**demo 来源https://blog.csdn.net/lstcui/article/details/107389371
  • * 公众号【java金融】
  • */
  • public class CyclicBarrierExample {
  •     static class PreTaskThread implements Runnable {
  •         private String task;
  •         private CyclicBarrier cyclicBarrier;
  •         public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
  •             this.task = task;
  •             this.cyclicBarrier = cyclicBarrier;
  •         }
  •         @Override
  •         public void run() {
  •             for (int i = 0; i < 4; i++) {
  •                 Random random = new Random();
  •                 try {
  •                     Thread.sleep(random.nextInt(1000));
  •                     System.out.println(String.format("关卡 %d 的任务 %s 完成", i, task));
  •                     cyclicBarrier.await();
  •                 } catch (InterruptedException | BrokenBarrierException e) {
  •                     e.printStackTrace();
  •                 }
  •             }
  •         }
  •         public static void main(String[] args) {
  •             CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
  •                 System.out.println("本关卡所有的前置任务完成,开始游戏... ...");
  •             });
  •             new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
  •             new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
  •             new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
  •         }
  •     }
  • }

输出结果如下:

4e15a5d123e945fdb23598e5641971c8.png-wh_600x-s_629214247.png

我们可以看到每次游戏开始都会等当前关卡把游戏的人物模型,地图数据、背景音乐加载完成后才会开始进行游戏。并且还是可以循环控制的。

源码分析结构组成
  • /** The lock for guarding barrier entry */
  • private final ReentrantLock lock = new ReentrantLock();
  • /** Condition to wait on until tripped */
  • private final Condition trip = lock.newCondition();
  • /** The number of parties */
  • private final int parties;
  • /* The command to run when tripped */
  • private final Runnable barrierCommand;
  • /** The current generation */
  • private Generation generation = new Generation();
  • lock:用于保护屏障入口的锁
  • trip :达到屏障并且不能放行的线程在trip条件变量上等待
  • parties :栅栏开启需要的到达线程总数
  • barrierCommand:最后一个线程到达屏障后执行的回调任务
  • generation:这是一个内部类,通过它实现CyclicBarrier重复利用,每当await达到最大次数的时候,就会重新new 一个,表示进入了下一个轮回。里面只有一个boolean型属性,用来表示当前轮回是否有线程中断。
主要方法
  • public int await() throws InterruptedException, BrokenBarrierException {
  •     try {
  •         return dowait(false, 0L);
  •     } catch (TimeoutException toe) {
  •         throw new Error(toe); // cannot happen
  •     }
  • }
  • * Main barrier code, covering the various policies.
  • */
  • private int dowait(boolean timed, long nanos)
  •     throws InterruptedException, BrokenBarrierException,
  •            TimeoutException {
  •     final ReentrantLock lock = this.lock;
  •     lock.lock();
  •      try {
  •            //获取barrier当前的 “代”也就是当前循环
  •          final Generation g = generation;
  •         if (g.broken)
  •             throw new BrokenBarrierException();
  •         if (Thread.interrupted()) {
  •             breakBarrier();
  •             throw new InterruptedException();
  •         }
  •         // 每来一个线程调用await方法都会进行减1
  •         int index = --count;
  •         if (index == 0) {  // tripped
  •             boolean ranAction = false;
  •             try {
  •                 final Runnable command = barrierCommand;
  •                 // new CyclicBarrier 传入 的barrierCommand, command.run()这个方法是同步的,如果耗时比较多的话,是否执行的时候需要考虑下是否异步来执行。
  •                 if (command != null)
  •                     command.run();
  •                 ranAction = true;
  •                 // 这个方法1. 唤醒所有阻塞的线程,2. 重置下count(count 每来一个线程都会进行减1)和generation,以便于下次循环。
  •                 nextGeneration();
  •                 return 0;
  •             } finally {
  •                 if (!ranAction)
  •                     breakBarrier();
  •             }
  •         }
  •         // loop until tripped, broken, interrupted, or timed out
  •         for (;;) {
  •             try {
  •                  // 进入if条件,说明是不带超时的await
  •                 if (!timed)
  •                      // 当前线程会释放掉lock,然后进入到trip条件队列的尾部,然后挂起自己,等待被唤醒。
  •                     trip.await();
  •                 else if (nanos > 0L)
  •                      //说明当前线程调用await方法时 是指定了 超时时间的!
  •                     nanos = trip.awaitNanos(nanos);
  •             } catch (InterruptedException ie) {
  •                  //Node节点在 条件队列内 时 收到中断信号时 会抛出中断异常!
  •                 //g == generation 成立,说明当前代并没有变化。
  •                 //! g.broken 当前代如果没有被打破,那么当前线程就去打破,并且抛出异常..
  •                 if (g == generation && ! g.broken) {
  •                     breakBarrier();
  •                     throw ie;
  •                 } else {
  •                     // We're about to finish waiting even if we had not
  •                     // been interrupted, so this interrupt is deemed to
  •                     // "belong" to subsequent execution.
  •                 //执行到else有几种情况?
  •                 //1.代发生了变化,这个时候就不需要抛出中断异常了,因为 代已经更新了,这里唤醒后就走正常逻辑了..只不过设置下 中断标记。
  •                 //2.代没有发生变化,但是代被打破了,此时也不用返回中断异常,执行到下面的时候会抛出  brokenBarrier异常。也记录下中断标记位。
  •                     Thread.currentThread().interrupt();
  •                 }
  •             }
  •            //唤醒后,执行到这里,有几种情况?
  •           //1.正常情况,当前barrier开启了新的一代(trip.signalAll())
  •           //2.当前Generation被打破,此时也会唤醒所有在trip上挂起的线程
  •           //3.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
  •             if (g.broken)
  •                 throw new BrokenBarrierException();
  •            //唤醒后,执行到这里,有几种情况?
  •         //1.正常情况,当前barrier开启了新的一代(trip.signalAll())
  •         //2.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
  •             if (g != generation)
  •                 return index;
  •            //唤醒后,执行到这里,有几种情况?
  •         //.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
  •             if (timed && nanos <= 0L) {
  •                 breakBarrier();
  •                 throw new TimeoutException();
  •             }
  •         }
  •     } finally {
  •          lock.unlock();
  •     }
  • }
小结

到了这里我们是不是可以知道为啥CyclicBarrier可以进行循环计数?

  • CyclicBarrier采用一个内部类Generation来维护当前循环,每一个await方法都会存储当前的generation,获取到相同generation对象的属于同一组,每当count的次数耗尽就会重新new一个Generation并且重新设置count的值为parties,表示进入下一次新的循环。

从这个await方法我们是不是可以知道只要有一个线程被中断了,当代的 generation的broken 就会被设置为true,所以会导致其他的线程也会被抛出BrokenBarrierException。相当于一个失败其他也必须失败,感觉有“强一致性“的味道。

总结
  • CountDownLanch是为计数器是设置一个值,当多次执行countdown后,计数器减为0的时候所有线程被唤醒,然后CountDownLanch失效,只能够使用一次。
  • CyclicBarrier是当count为0时同样唤醒全部线程,同时会重新设置count为parties,重新new一个generation来实现重复利用。

本文转载自微信公众号「java金融」,可以通过以下二维码关注。转载本文请联系java金融公众号。


Powered by www.sosoba.org Copyright © 2013-2021 搜搜吧社区 小黑屋|手机版|Archiver|地图|联系站长|腾讯云代金券|seo优化服务|搜搜吧
广告服务/项目合作/会员购买:QQ 侵权举报邮箱: fuwu-sosoba@qq.com 举报流程必看 搜搜吧建站时间:创建于2013年07月23日
免责声明:本站所有的内容均来自互联网以及第三方作者自由发布,版权归原作者版权所有,搜搜吧不承担任何的法律责任,若有侵权请来信告知,我们立即删除!
版权声明:搜搜吧影视资源均收集自互联网,没有提供影片资源存储和下载,也未参与录制上传,若本站收录的资源涉及您的版权或知识产权或其他利益,我们会立即删除

GMT+8, 2021-4-16 08:10 , Processed in 0.032599 second(s), 8 queries , Gzip On, Redis On.

快速回复 返回顶部 返回列表