我的编程空间,编程开发者的网络收藏夹
学习永远不晚

JUC循环屏障CyclicBarrier与CountDownLatch区别详解

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

JUC循环屏障CyclicBarrier与CountDownLatch区别详解

前言

jdk中提供了许多的并发工具类,大家可能比较熟悉的有CountDownLatch,主要用来阻塞一个线程运行,直到其他线程运行完毕。而jdk还有一个功能类似并发工具类CyclicBarrier,你知道它的作用吗?和CountDownLatch有什么区别呢?

对于CountDownLatch不了解的可以参考# CountDownLatch源码硬核解析

介绍和使用

CyclicBarrier,循环屏障,用来进行线程协作,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,会触发自己运行,运行完后,屏障又会开门,所有被屏障拦截的线程又可以继续运行。所以CyclicBarrier 是可以重用的。

为了更好的理解,我们举个例子,如下图所示:

我们将屏障想成栅栏,5个线程想成5头猪。5头猪开始往前跑,直到都跑到栅栏前,栅栏开始做个自己的任务,比如看看猪多重。然后打开栅栏,猪又会继续跑,跑到下一个栅栏,就这样循环....

API介绍

构造方法

  • public CyclicBarrier(int parties): 创建parties个线程任务的循环CyclicBarrier
  • public CyclicBarrier(int parties, Runnable barrierAction): 当parties个线程到到屏障出,自己执行任务barrierAction

常用API

  • int await():线程调用 await 方法通知 CyclicBarrier 本线程已经到达屏障

基本使用

我们将上面猪猪的例子通过CyclicBarrier简单做一个实现。

@Slf4j(topic = "c.CyclicBarrierPig")
public class CyclicBarrierPig {
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(5);
        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            System.out.println("主人看看哪个猪跑最快,最肥...");
        });
        // 循环跑3次
        for (int i = 0; i < 3; i++) {
            // 5条猪开始跑
            for(int j = 0; j<5; j++) {
                int finalJ = j;
                service.submit(() -> {
                   log.info("pig{} is run .....", finalJ);
                    try {
                        // 随机时间,模拟跑花费的时间
                        Thread.sleep(new Random().nextInt(5000));
                        log.info("pig{} reach barrier .....", finalJ);
                        barrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
        service.shutdown();
    }
}

运行结果:

实现原理

我们已经明白CyclicBarrier的基本使用了,那我们看看它是如何实现的。

成员属性

  • 全局锁:利用可重入锁实现的工具类
// barrier 实现是依赖于Condition条件队列,condition 条件队列必须依赖lock才能使用
private final ReentrantLock lock = new ReentrantLock();
// 线程挂起实现使用的 condition 队列,当前代所有线程到位,这个条件队列内的线程才会被唤醒
private final Condition trip = lock.newCondition();
  • 线程数量
// 代表多少个线程到达屏障开始触发线程任务
private final int parties;	
// 表示当前“代”还有多少个线程未到位,初始值为 parties
private int count;			
  • 当前代中最后一个线程到位后要执行的任务
private final Runnable barrierCommand;
  • 代, 也是用标记一次循环
// 表示 barrier 对象当前 代
private Generation generation = new Generation();
private static class Generation {
    // 表示当前“代”是否被打破,如果被打破再来到这一代的线程 就会直接抛出 BrokenException 异常
    // 且在这一代挂起的线程都会被唤醒,然后抛出 BrokerException 异常。
    boolean broken = false;
}

构造方法

public CyclicBarrie(int parties, Runnable barrierAction) {
    // 因为小于等于 0 的 barrier 没有任何意义
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    // 可以为 null
    this.barrierCommand = barrierAction;
}

成员方法

  • await():阻塞等待所有线程到位
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
// timed:表示当前调用await方法的线程是否指定了超时时长,如果 true 表示线程是响应超时的
// nanos:线程等待超时时长,单位是纳秒
private int dowait(boolean timed, long nanos) {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 获取当前代
        final Generation g = generation;
        // 【如果当前代是已经被打破状态,则当前调用await方法的线程,直接抛出Broken异常】
        if (g.broken)
            throw new BrokenBarrierException();
		// 如果当前线程被中断了,则打破当前代,然后当前线程抛出中断异常
        if (Thread.interrupted()) {
            // 设置当前代的状态为 broken 状态,唤醒在 trip 条件队列内的线程
            breakBarrier();
            throw new InterruptedException();
        }
        // 逻辑到这说明,当前线程中断状态是 false, 当前代的 broken 为 false(未打破状态)
        // 假设 parties 给的是 5,那么index对应的值为 4,3,2,1,0
        int index = --count;
        // 条件成立说明当前线程是最后一个到达 barrier 的线程,【需要开启新代,唤醒阻塞线程】
        if (index == 0) {
            // 栅栏任务启动标记
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    // 启动触发的任务
                    command.run();
                // run()未抛出异常的话,启动标记设置为 true
                ranAction = true;
                // 开启新的一代,这里会【唤醒所有的阻塞队列】
                nextGeneration();
                // 返回 0 因为当前线程是此代最后一个到达的线程,index == 0
                return 0;
            } finally {
                // 如果 command.run() 执行抛出异常的话,会进入到这里
                if (!ranAction)
                    breakBarrier();
            }
        }
        // 自旋,一直到条件满足、当前代被打破、线程被中断,等待超时
        for (;;) {
            try {
                // 根据是否需要超时等待选择阻塞方法
                if (!timed)
                    // 当前线程释放掉 lock,【进入到 trip 条件队列的尾部挂起自己】,等待被唤醒
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 被中断后来到这里的逻辑
                // 当前代没有变化并且没有被打破
                if (g == generation && !g.broken) {
                    // 打破屏障
                    breakBarrier();
                    // node 节点在【条件队列】内收到中断信号时 会抛出中断异常
                    throw ie;
                } else {
                    // 等待过程中代变化了,完成一次自我打断
                    Thread.currentThread().interrupt();
                }
            }
			// 唤醒后的线程,【判断当前代已经被打破,线程唤醒后依次抛出 BrokenBarrier 异常】
            if (g.broken)
                throw new BrokenBarrierException();
            // 当前线程挂起期间,最后一个线程到位了,然后触发了开启新的一代的逻辑
            if (g != generation)
                return index;
			// 当前线程 trip 中等待超时,然后主动转移到阻塞队列
            if (timed && nanos <= 0L) {
                breakBarrier();
                // 抛出超时异常
                throw new TimeoutException();
            }
        }
    } finally {
        // 解锁
        lock.unlock();
    }
}
  • breakBarrier():打破 Barrier 屏障
private void breakBarrier() {
    // 将代中的 broken 设置为 true,表示这一代是被打破了,再来到这一代的线程,直接抛出异常
    generation.broken = true;
    // 重置 count 为 parties
    count = parties;
    // 将在trip条件队列内挂起的线程全部唤醒,唤醒后的线程会检查当前是否是打破的,然后抛出异常
    trip.signalAll();
}
  • nextGeneration():开启新的下一代
private void nextGeneration() {
    // 将在 trip 条件队列内挂起的线程全部唤醒
    trip.signalAll();
    // 重置 count 为 parties
    count = parties;
    // 开启新的一代,使用一个新的generation对象,表示新的一代,新的一代和上一代【没有任何关系】
    generation = new Generation();
}

和CountDownLatch的区别

相同点

二者都能让一个或多个线程阻塞等待,都可以用在多个线程间的协调,起到线程同步的作用。

不同点

  • CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以反复 使用。
  • CountDownLatch.await 一般阻塞工作线程,所有的进行预备工作的线程执行 countDown,而 CyclicBarrier 通过工作线程调用 await 从而自行阻塞,直到所有工作线程达到指定屏障,所有的线程才会返回各自执行自己的工作。
  • CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  • CountDownLatch 会阻塞主线程,CyclicBarrier 不会阻塞主线程,只会阻塞子线程。

总结

本文讲解了CyclicBarrier 的基本功能和使用,同时讲解了它大致的实现。关于CyclicBarrier 具体有什么使用场景,你可能还是比较迷惑,我再举个例子,比如CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景,更多关于JUC循环屏障的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

JUC循环屏障CyclicBarrier与CountDownLatch区别详解

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

JUC循环屏障CyclicBarrier与CountDownLatch区别详解

这篇文章主要为大家介绍了JUC循环屏障CyclicBarrier与CountDownLatch区别详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2022-12-08

Java CountDownLatch计数器与CyclicBarrier循环屏障怎么定义

本文小编为大家详细介绍“Java CountDownLatch计数器与CyclicBarrier循环屏障怎么定义”,内容详细,步骤清晰,细节处理妥当,希望这篇“Java CountDownLatch计数器与CyclicBarrier循环屏障
2023-07-05

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录