我的编程空间,编程开发者的网络收藏夹
学习永远不晚

并发编程之CyclicBarrier原理与使用

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

并发编程之CyclicBarrier原理与使用

 前言

控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间合作,让线程之间相互配合来满足业务逻辑。比如让线程A等待线程B执行完毕后再执行等合作策略。

控制并发流程的工具类主要有:


简介

从字面意思看,这个类的中文意思是“循环栅栏”。大概的意思就是一个可循环利用的屏障。它的作用就是会让所有线程都等待完成后才会继续下一步行动。

举个例子,就像生活中我们会约朋友到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但这个餐厅规定必须等到所有人到期之后才会让我们进去。这里的朋友们就各个线程,餐厅就是CyclicBarrier。

在JUC包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是CyclicBarrier类。利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作。下图演示了这一过程。


应用场景

可用于多线程计数数据,最后合并计数结果的场景。

使用CyclicBarrier实现等待的线程都被称为参与方。参与方只需要执行cyclicBarrier.await() 就可以实现等待。由于CyclicBarrier内部维护了一个显示锁,这可以知道参与方中谁最后一个执行cyclicBarrier.await() 。当最后一个线程执行完,会使得使用相应CyclicBarrier实例的其他参与方被唤醒,而最后一个线程自身不会被暂停。其流程图如下:


  1. public static void main(String[] args) { 
  2.         CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() ->{ 
  3.             System.out.println("****召唤神龙"); 
  4.         }); 
  5.         for(int i = 1;i <= 7; i++){ 
  6.             int finalI = i; 
  7.             new Thread(() -> { 
  8.                 System.out.println(Thread.currentThread().getName() + "\t 收集到第"+ finalI +"颗龙珠"); 
  9.                 try { 
  10.                     cyclicBarrier.await(); 
  11.                 } catch (InterruptedException e) { 
  12.                     e.printStackTrace(); 
  13.                 } catch (BrokenBarrierException e) { 
  14.                     e.printStackTrace(); 
  15.                 } 
  16.             },String.valueOf(i)).start(); 
  17.         } 
  18.     } 

 源码分析

CyclicBarrier 类图


CyclicBarrier是包含了 “ReentrantLock对象lock” 和 “Condition对象trip”,它是通过独占锁实现的。

其内部主要变量和方法如下:

成员变量

//同步操作锁

private final ReentrantLock lock = new ReentrantLock();

  1. //同步操作锁 
  2. private final ReentrantLock lock = new ReentrantLock(); 
  3. //线程拦截器 
  4. private final Condition trip = lock.newCondition(); 
  5. //每次拦截的线程数 
  6. private final int parties; 
  7. //换代前执行的任务 
  8. private final Runnable barrierCommand; 
  9. //表示栅栏的当前代 
  10. private Generation generation = new Generation(); 
  11. //计数器 
  12. private int count
  13.   
  14. //静态内部类Generation 
  15. private static class Generation { 
  16.   boolean broken = false

 可以看到 CyclicBarrier 内部是通过条件队列 trip 来对线程进行阻塞的,并且其内部维护了两个 int 型的变量 parties 和 count:

  • parties 表示每次拦截的线程数,该值在构造时进行赋值;
  • count 是内部计数器,它的初始值和 parties 相同,以后随着每次 await 方法的调用而减 1,直到减为 0 就将所有线程唤醒。

CycliBarrier 有一个静态内部类 Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局有些,利用它可以实现循环等待。barrierCommand 表示换代前执行的任务,当 count 减为 0 时表示本局游戏结束,需要转到下一句。在转到下一句游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定 barrierCommand 来执行自己的任务。

构造函数

主要提供了两个构造方法

  1. public CyclicBarrier(int parties) { 
  2.  this(parties, null); 
  3.  
  4. public CyclicBarrier(int parties, Runnable barrierAction) { 
  5.     if (parties <= 0) throw new IllegalArgumentException(); 
  6.     // parties表示“必须同时到达barrier的线程个数”。 
  7.     this.parties = parties; 
  8.     // count表示“处在等待状态的线程个数”。 
  9.     this.count = parties; 
  10.     // barrierCommand表示“parties个线程到达barrier时,会执行的动作”。 
  11.     this.barrierCommand = barrierAction; 

 解析:

  • parties 是参与线程的个数
  • 第二个构造方法有一个Runnable参数,这个参数的意思是最后一个到达线程要执行的动作。

重要方法

CyclicBarrier类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待。

await()方法

  1. //非定时等待 
  2. public int await() throws InterruptedException, BrokenBarrierException { 
  3.   try { 
  4.     return dowait(false, 0L); 
  5.   } catch (TimeoutException toe) { 
  6.     throw new Error(toe); 
  7.   } 
  8.   
  9. //定时等待 
  10. public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { 
  11.   return dowait(true, unit.toNanos(timeout)); 

 解析:

  • 线程调用await()表示总结已经到达栅栏
  • BrokenBarrierException表示栅栏已经被破坏,破坏的原因可能是其中一个线程await()时被中断或者超时。

dowait()方法

可以看到不管是定时等待还是非定时等待,它们都调用了dowait方法,只不过是传入的参数不同而已。下面我们就来看看dowait方法都做了些什么。

  1. //核心等待方法 
  2. private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { 
  3.   // 显示锁 
  4.   final ReentrantLock lock = this.lock; 
  5.   lock.lock(); 
  6.   try { 
  7.     final Generation g = generation; 
  8.     //检查当前栅栏是否被打翻 
  9.     if (g.broken) { 
  10.       throw new BrokenBarrierException(); 
  11.     } 
  12.     //检查当前线程是否被中断 
  13.     if (Thread.interrupted()) { 
  14.       //如果当前线程被中断会做以下三件事 
  15.       //1.打翻当前栅栏 
  16.       //2.唤醒拦截的所有线程 
  17.       //3.抛出中断异常 
  18.       breakBarrier(); 
  19.       throw new InterruptedException(); 
  20.     } 
  21.     //每次都将计数器的值减1 
  22.     int index = --count; 
  23.     //计数器的值减为0则需唤醒所有线程并转换到下一代 
  24.     if (index == 0) { 
  25.       boolean ranAction = false
  26.       try { 
  27.         //唤醒所有线程前先执行指定的任务 
  28.         final Runnable command = barrierCommand; 
  29.         if (command != null) { 
  30.           command.run(); 
  31.         } 
  32.         ranAction = true
  33.         //唤醒所有线程并转到下一代 
  34.         nextGeneration(); 
  35.         return 0; 
  36.       } finally { 
  37.         //确保在任务未成功执行时能将所有线程唤醒 
  38.         if (!ranAction) { 
  39.           breakBarrier(); 
  40.         } 
  41.       } 
  42.     } 
  43.   
  44.     //如果计数器不为0则执行此循环 
  45.     for (;;) { 
  46.       try { 
  47.         //根据传入的参数来决定是定时等待还是非定时等待 
  48.         if (!timed) { 
  49.           trip.await(); 
  50.         }else if (nanos > 0L) { 
  51.           nanos = trip.awaitNanos(nanos); 
  52.         } 
  53.       } catch (InterruptedException ie) { 
  54.         //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程 
  55.         if (g == generation && ! g.broken) { 
  56.           breakBarrier(); 
  57.           throw ie; 
  58.         } else { 
  59.           //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作 
  60.           Thread.currentThread().interrupt(); 
  61.         } 
  62.       } 
  63.       //如果线程因为打翻栅栏操作而被唤醒则抛出异常 
  64.       if (g.broken) { 
  65.         throw new BrokenBarrierException(); 
  66.       } 
  67.       //如果线程因为换代操作而被唤醒则返回计数器的值 
  68.       if (g != generation) { 
  69.         return index
  70.       } 
  71.       //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常 
  72.       if (timed && nanos <= 0L) { 
  73.         breakBarrier(); 
  74.         throw new TimeoutException(); 
  75.       } 
  76.     } 
  77.   } finally { 
  78.     lock.unlock(); 
  79.   } 

 上面执行的代码相对比较容易看懂,我们再来看一下执行流程:


  1. 获得显示锁,判断当前线程状态是否被中断,如果是,则执行 breakBarrier 方法,唤醒之前阻塞的所有线程,并将计数器重置;
  2. 计数器 count 减 1,如果 count == 0,表示最后一个线程达到栅栏,接着执行之前指定的 Runnable 接口,同时执行 nextGeneration 方法进入下一代;
  3. 否则,进入自旋,判断当前线程是进入定时等待还是非定时等待,如果在等待过程中被中断,执行 breakBarrier 方法,唤醒之前阻塞的所有线程;
  4. 判断是否是因为执行 breakBarrier 方法而被唤醒,如果是,则抛出异常;
  5. 判断是否是正常的换代操作而被唤醒,如果是,则返回计数器的值;
  6. 判断是否是超时而被唤醒,如果是,则唤醒之前阻塞的所有线程,并抛出异常;
  7. 释放锁。

breakBarrier()方法

  1. private void breakBarrier() { 
  2.  generation.broken = true;//栅栏被打破 
  3.  count = parties;//重置count 
  4.  trip.signalAll();//唤醒之前阻塞的线程 

 nextGeneration()方法

  1. private void nextGeneration() { 
  2.  //唤醒所以的线程 
  3.  trip.signalAll(); 
  4.  //重置计数器 
  5.  count = parties; 
  6.  //重新开始 
  7.  generation = new Generation(); 

 reset()方法

接下来看看栅栏重置的方法

  1. // 重置barrier到初始状态,所有还在等待中的线程最终会抛出BrokenBarrierException。 
  2. public void reset() { 
  3.  final ReentrantLock lock = this.lock; 
  4.     lock.lock(); 
  5.     try { 
  6.      breakBarrier();   // break the current generation 
  7.         nextGeneration(); // start a new generation 
  8.     } finally { 
  9.      lock.unlock(); 
  10.     } 

 其它方法

CyclicBarrier 其它还提供了例如getParties,isBroken,getNumberWaiting等方法,都比较简单,其中除了getParties由于parties被final修饰不可变,其余方法都会先去获得互斥锁。

  1.  
  2. public boolean isBroken() { 
  3.     final ReentrantLock lock = this.lock; 
  4.     lock.lock(); 
  5.     try { 
  6.         return generation.broken; 
  7.     } finally { 
  8.         lock.unlock(); 
  9.     } 
  10.  
  11.  
  12. public int getNumberWaiting() { 
  13.     final ReentrantLock lock = this.lock; 
  14.     lock.lock(); 
  15.     try { 
  16.         return parties - count
  17.     } finally { 
  18.         lock.unlock(); 
  19.     } 

 总结

CountDownLatch和CyclicBarrier区别

  • CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
  • CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再才执行;
  • CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的;
  • CountDownLathch是一个计数器,线程完成一个记录一个,计数器递减,只能用一次。如下图:

CyclicBarrier的计数器更像一个阀门,需要所有线程都到达,然后继续执行,计数器递减,提供reset功能,可以多次使用。如下图:

PS:以上代码提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

PS:这里有一个技术交流群(QQ群:1158819530),方便大家一起交流,持续学习,共同进步,有需要的可以加一下。

 

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

并发编程之CyclicBarrier原理与使用

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

并发编程之CyclicBarrier原理与使用

控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间合作,让线程之间相互配合来满足业务逻辑。比如让线程A等待线程B执行完毕后再执行等合作策略。
CyclicBarrier2024-12-03

并发编程之Exchanger原理与使用

Exchanger是适用在两个线程之间数据交换的并发工具类,它的作用是找到一个同步点,当两个线程都执行到了同步点(exchange方法)之后(有一个没有执行到就一直等待,也可以设置等待超时时间),就将自身线程的数据与对方交换。

并发编程之Phaser原理与应用

JDK5中引入了CyclicBarrier和CountDownLatch这两个并发控制类,而JDK7中引入的Phaser按照官方的说法是提供了一个功能类似但是更加灵活的实现。接下来我们带着几个问题来研究一下Phaser与(CountDown

并发编程之Semaphore原理与应用

控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间合作,让线程之间相互配合来满足业务逻辑。比如让线程A等待线程B执行完毕后再执行等合作策略。
Semaphore2024-12-03

如何使用Java高并发编程CyclicBarrier

本篇内容介绍了“如何使用Java高并发编程CyclicBarrier”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!什么是CyclicBarr
2023-06-15

Java并发实例之CyclicBarrier的使用

最近一直整并发这块东西,顺便写点Java并发的例子,给大家做个分享,也强化下自己记忆,如果有什么错误或者不当的地方,欢迎大家斧正。CyclicBarrier是一种多线程并发控制实用工具,和CountDownLatch非常类似,它也可以实现线
2023-05-30

Java高并发之CyclicBarrier怎么使用

这篇文章主要介绍了Java高并发之CyclicBarrier怎么使用的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java高并发之CyclicBarrier怎么使用文章都会有所收获,下面我们一起来看看吧。Jav
2023-07-05

Java高并发编程基础三大利器之CyclicBarrier

前面一篇文章我们《Java高并发编程基础三大利器之CountDownLatch》它有一个缺点,就是它的计数器只能够使用一次,也就是说当计数器(state)减到为 0的时候,如果 再有线程调用去 await() 方法,该线程会直接通过,不会再

并发编程之ThreadPoolExecutor线程池原理解析

在介绍线程池之前,我们先回顾下线程的基本知识。其中线程池包括ThreadPoolExecutor 默认线程和ScheduledThreadPoolExecutor 定时线程池 ,本篇重点介绍ThreadPoolExecutor线程池。

并发编程之ForkJoin框架原理分析

Java7 又提供了的一个用于并行执行的任务的框架 Fork/Join ,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。在介绍Fork/Join 框架之前我们先了解几个概念:CPU密集型、IO密集型,再逐步

如何在Java中利用CyclicBarrier实现并发编程

如何在Java中利用CyclicBarrier实现并发编程?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。 使用JAVA编写并发程序的时候,我们需要仔细去思考一
2023-05-31

Java并发编程之CountDownLatch的使用

CountDownLatch是一个倒数的同步器,常用来让一个线程等待其他N个线程执行完成再继续向下执行,本文主要介绍了CountDownLatch的具体使用方法,感兴趣的可以了解一下
2023-05-20

Go并发编程:资源管理与锁的使用

go并发编程中资源管理和锁的使用至关重要。go提供了并发安全类型、通道和waitgroup来管理共享资源访问,而互斥锁、读写锁和原子操作则用于控制对资源的访问。实战案例展示了如何使用sync.waitgroup同步对共享计数器的访问,确保并
Go并发编程:资源管理与锁的使用
2024-05-11

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录