我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Java中SynchronousQueue的底层实现原理剖析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Java中SynchronousQueue的底层实现原理剖析

上篇文章谈到BlockingQueue的使用场景,并重点分析了ArrayBlockingQueue的实现原理,了解到ArrayBlockingQueue底层是基于数组实现的阻塞队列。

但是BlockingQueue的实现类中,有一种阻塞队列比较特殊,就是SynchronousQueue(同步移交队列),队列长度为0。

作用就是一个线程往队列放数据的时候,必须等待另一个线程从队列中取走数据。同样,从队列中取数据的时候,必须等待另一个线程往队列中放数据。

这样特殊的队列,有什么应用场景呢?

1. SynchronousQueue用法

先看一个SynchronousQueue的简单用例:


public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建SynchronousQueue队列
        BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();

        // 2. 启动一个线程,往队列中放3个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 入队列 1");
                synchronousQueue.put(1);
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 入队列 2");
                synchronousQueue.put(2);
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 入队列 3");
                synchronousQueue.put(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 3. 等待1000毫秒
        Thread.sleep(1000L);

        // 4. 再启动一个线程,从队列中取出3个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}

输出结果:

Thread-0 入队列 1
Thread-1 出队列 1
Thread-0 入队列 2
Thread-1 出队列 2
Thread-0 入队列 3
Thread-1 出队列 3

从输出结果中可以看到,第一个线程Thread-0往队列放入一个元素1后,就被阻塞了。直到第二个线程Thread-1从队列中取走元素1后,Thread-0才能继续放入第二个元素2。

由于SynchronousQueue是BlockingQueue的实现类,所以也实现类BlockingQueue中几组抽象方法:

为了满足不同的使用场景,BlockingQueue设计了很多的放数据和取数据的方法。

操作抛出异常返回特定值阻塞阻塞一段时间
放数据addofferputoffer(e, time, unit)
取数据removepolltakepoll(time, unit)
查看数据(不删除)element()peek()不支持不支持

这几组方法的不同之处就是:

  • 当队列满了,再往队列中放数据,add方法抛异常,offer方法返回false,put方法会一直阻塞(直到有其他线程从队列中取走数据),offer(e, time, unit)方法阻塞指定时间然后返回false。
  • 当队列是空,再从队列中取数据,remove方法抛异常,poll方法返回null,take方法会一直阻塞(直到有其他线程往队列中放数据),poll(time, unit)方法阻塞指定时间然后返回null。
  • 当队列是空,再去队列中查看数据(并不删除数据),element方法抛异常,peek方法返回null。

工作中使用最多的就是offer、poll阻塞指定时间的方法。

2. SynchronousQueue应用场景

SynchronousQueue的特点:

队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。

这种特殊的实现逻辑有什么应用场景呢?

我的理解就是,如果你希望你的任务需要被快速处理,就可以使用这种队列。

Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

newCachedThreadPool线程池的核心线程数是0,最大线程数是Integer的最大值,线程存活时间是60秒。

如果你使用newCachedThreadPool线程池,你提交的任务会被更快速的处理,因为你每次提交任务,都会有一个空闲的线程等着处理任务。如果没有空闲的线程,也会立即创建一个线程处理你的任务。

你想想,这处理效率,杠杠滴!

当然也有弊端,如果你提交了太多的任务,导致创建了大量的线程,这些线程都在竞争CPU时间片,等待CPU调度,处理任务速度也会变慢,所以在使用过程中也要综合考虑。

3. SynchronousQueue源码解析

3.1 SynchronousQueue类属性

public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

    // 转换器,取数据和放数据的核心逻辑都在这个类里面
    private transient volatile Transferer<E> transferer;

    // 默认的构造方法(使用非公平队列)
    public SynchronousQueue() {
        this(false);
    }

    // 有参构造方法,可以指定是否使用公平队列
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

    // 转换器实现类
    abstract static class Transferer<E> {
        abstract E transfer(E e, boolean timed, long nanos);
    }

    // 基于栈实现的非公平队列
    static final class TransferStack<E> extends Transferer<E> {
    }

    // 基于队列实现的公平队列
    static final class TransferQueue<E> extends Transferer<E> {
    }

}

可以看到SynchronousQueue默认的无参构造方法,内部使用的是基于栈实现的非公平队列,当然也可以调用有参构造方法,传参是true,使用基于队列实现的公平队列。

// 使用非公平队列(基于栈实现)
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();
// 使用公平队列(基于队列实现)
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(true);

本次就常用的栈实现来剖析SynchronousQueue的底层实现原理。

3.2 栈底层结构

栈结构,是非公平的,遵循先进后出。

使用个case测试一下:


public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建SynchronousQueue队列
        SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();

        // 2. 启动一个线程,往队列中放1个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 入队列 0");
                synchronousQueue.put(0);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 3. 等待1000毫秒
        Thread.sleep(1000L);

        // 4. 启动一个线程,往队列中放1个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 入队列 1");
                synchronousQueue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 5. 等待1000毫秒
        Thread.sleep(1000L);

        // 6. 再启动一个线程,从队列中取出1个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 7. 等待1000毫秒
        Thread.sleep(1000L);

        // 8. 再启动一个线程,从队列中取出1个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}

输出结果:

Thread-0 入队列 0
Thread-1 入队列 1
Thread-2 出队列 1
Thread-3 出队列 0

从输出结果中可以看出,符合栈结构先进后出的顺序。

3.3 栈节点源码

栈中的数据都是由一个个的节点组成的,先看一下节点类的源码:

// 节点
static final class SNode {
    // 节点值(取数据的时候,该字段为null)
    Object item;
    // 存取数据的线程
    volatile Thread waiter;
    // 节点模式
    int mode;
    // 匹配到的节点
    volatile SNode match;
    // 后继节点
    volatile SNode next;
}

item

节点值,只在存数据的时候用。取数据的时候,这个值是null。

waiter

存取数据的线程,如果没有对应的接收线程,这个线程会被阻塞。

mode

节点模式,共有3种类型:

类型值类型描述类型的作用
0REQUEST表示取数据
1DATA表示存数据
2FULFILLING表示正在等待执行(比如取数据的线程,等待其他线程放数据)

3.4 put/take流程

放数据和取数据的逻辑,在底层复用的是同一个方法,以put/take方法为例,另外两个放数据的方法,add和offer方法底层实现是一样的。

先看一下数据流转的过程,方便理解源码。

还是以上面的case为例:

  • Thread0先往SynchronousQueue队列中放入元素0
  • Thread1再往SynchronousQueue队列放入元素1
  • Thread2从SynchronousQueue队列中取出一个元素

第一步:Thread0先往SynchronousQueue队列中放入元素0

把本次操作组装成SNode压入栈顶,item是元素0,waiter是当前线程Thread0,mode是1表示放入数据。

第二步:Thread1再往SynchronousQueue队列放入元素1

把本次操作组装成SNode压入栈顶,item是元素1,waiter是当前线程Thread1,mode是1表示放入数据,next是SNode0。

第三步:Thread2从SynchronousQueue队列中取出一个元素

这次的操作比较复杂,也是先把本次的操作包装成SNode压入栈顶。

item是null(取数据的时候,这个字段没有值),waiter是null(当前线程Thread2正在操作,所以不用赋值了),mode是2表示正在操作(即将跟后继节点进行匹配),next是SNode1。

然后,Thread2开始把栈顶的两个节点进行匹配,匹配成功后,就把SNode2赋值给SNode1的match属性,唤醒SNode1中的Thread1线程,然后弹出SNode2节点和SNode1节点。

3.5 put/take源码实现

先看一下put方法源码:

// 放数据
public void put(E e) throws InterruptedException {
    // 不允许放null元素
    if (e == null)
        throw new NullPointerException();
    // 调用转换器实现类,放元素
    if (transferer.transfer(e, false, 0) == null) {
        // 如果放数据失败,就中断当前线程,并抛出异常
        Thread.interrupted();
        throw new InterruptedException();
    }
}

核心逻辑都在transfer方法中,代码很长,理清逻辑后,也很容易理解。

// 取数据和放数据操作,共用一个方法
E transfer(E e, boolean timed, long nanos) {
    SNode s = null;
    // e为空,说明是取数据,否则是放数据
    int mode = (e == null) ? REQUEST : DATA;

    for (; ; ) {
        SNode h = head;
        // 1. 如果栈顶节点为空,或者栈顶节点类型跟本次操作相同(都是取数据,或者都是放数据)
        if (h == null || h.mode == mode) {
            // 2. 判断节点是否已经超时
            if (timed && nanos <= 0) {
                // 3. 如果栈顶节点已经被取消,就删除栈顶节点
                if (h != null && h.isCancelled())
                    casHead(h, h.next);
                else
                    return null;
                // 4. 把本次操作包装成SNode,压入栈顶
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 5. 挂起当前线程,等待被唤醒
                SNode m = awaitFulfill(s, timed, nanos);
                // 6. 如果这个节点已经被取消,就删除这个节点
                if (m == s) {
                    clean(s);
                    return null;
                }
                // 7. 把s.next设置成head
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
            // 8. 如果栈顶节点类型跟本次操作不同,并且不是FULFILLING类型
        } else if (!isFulfilling(h.mode)) {
            // 9. 再次判断如果栈顶节点已经被取消,就删除栈顶节点
            if (h.isCancelled())
                casHead(h, h.next);
                // 10. 把本次操作包装成SNode(类型是FULFILLING),压入栈顶
            else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
                // 11. 使用死循环,直到匹配到对应的节点
                for (; ; ) {
                    // 12. 遍历下个节点
                    SNode m = s.next;
                    // 13. 如果节点是null,表示遍历到末尾,设置栈顶节点是null,结束。
                    if (m == null) {
                        casHead(s, null);
                        s = null;
                        break;
                    }
                    SNode mn = m.next;
                    // 14. 如果栈顶的后继节点跟栈顶节点匹配成功,就删除这两个节点,结束。
                    if (m.tryMatch(s)) {
                        casHead(s, mn);
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else
                        // 15. 如果没有匹配成功,就删除栈顶的后继节点,继续匹配
                        s.casNext(m, mn);
                }
            }
        } else {
            // 16. 如果栈顶节点类型跟本次操作不同,并且是FULFILLING类型,
            // 就再执行一遍上面第11步for循环中的逻辑(很少概率出现)
            SNode m = h.next;
            if (m == null)
                casHead(h, null);
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))
                    casHead(h, mn);
                else
                    h.casNext(m, mn);
            }
        }
    }
}

transfer方法逻辑也很简单,就是判断本次操作类型是否跟栈顶节点相同,如果相同,就把本次操作压入栈顶。否则就跟栈顶节点匹配,唤醒栈顶节点线程,弹出栈顶节点。

transfer方法中调用了awaitFulfill方法,作用是挂起当前线程。

// 等待被唤醒
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 1. 计算超时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 2. 计算自旋次数
    int spins = (shouldSpin(s) ?
            (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel();
        // 3. 如果已经匹配到其他节点,直接返回
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            // 4. 超时时间递减
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        // 5. 自旋次数减一
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        else if (s.waiter == null)
            s.waiter = w;
        // 6. 开始挂起当前线程
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

awaitFulfill方法的逻辑也很简单,就是挂起当前线程。

take方法底层使用的也是transfer方法:

// 取数据
public E take() throws InterruptedException {
    // // 调用转换器实现类,取数据
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    // 没取到,就中断当前线程
    Thread.interrupted();
    throw new InterruptedException();
}

4. 总结

  • SynchronousQueue是一种特殊的阻塞队列,队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。
  • SynchronousQueue底层是基于栈和队列两种数据结构实现的。
  • Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。
  • 如果希望你的任务需要被快速处理,可以使用SynchronousQueue队列。

到此这篇关于Java中SynchronousQueue的底层实现原理剖析的文章就介绍到这了,更多相关Java SynchronousQueue内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Java中SynchronousQueue的底层实现原理剖析

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Java中SynchronousQueue的底层实现原理剖析

BlockingQueue的实现类中,有一种阻塞队列比较特殊,就是SynchronousQueue(同步移交队列),队列长度为0。本文就来剖析一下SynchronousQueue的底层实现原理,感兴趣的可以了解一下
2022-11-21

Java必会的Synchronized底层原理剖析

synchronized作为Java程序员最常用同步工具,很多人却对它的用法和实现原理一知半解,以至于还有不少人认为synchronized是重量级锁,性能较差,尽量少用。但不可否认的是synchronized依然是并发首选工具,本文就来详细讲讲
2022-11-13

Java中集合底层原理分析

这篇文章将为大家详细讲解有关Java中集合底层原理分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一、Collection集合Collection接口是单列集合类的父接口,这种集合可以将数据一个一个的存
2023-06-15

源码剖析Golang中map扩容底层的实现

之前的文章详细介绍过Go切片和map的基本使用,以及切片的扩容机制。本文针对map的扩容,会从源码的角度全面的剖析一下map扩容的底层实现,需要的可以参考一下
2023-03-06

HashMap的底层实现原理

这篇文章主要介绍“HashMap的底层实现原理”,在日常操作中,相信很多人在HashMap的底层实现原理问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”HashMap的底层实现原理”的疑惑有所帮助!接下来,请跟
2023-06-04

源码分析Java中ThreadPoolExecutor的底层原理

这篇文章主要带大家从源码分析一下Java中ThreadPoolExecutor的底层原理,文中的示例代码讲解详细,具有一定的学习价值,需要的可以参考一下
2023-05-19

一文带你彻底剖析Java中Synchronized原理

Synchronized是Java中的隐式锁,它的获取锁和释放锁都是隐式的,完全交由JVM帮助我们操作,在了解Synchronized关键字之前,首先要学习的知识点就是Java的对象结构,本文介绍的非常详细,需要的朋友可以参考下
2023-05-18

Java并发编程之synchronized底层实现原理分析

Java并发编程中的synchronized关键字通过JVM内部锁机制实现线程同步。锁对象与其监视器锁关联,单个线程同一时刻只能访问锁对象保护的代码段。获取锁时,线程需先获取锁对象监视器锁,释放锁时需释放该锁。每个Java对象的对象头中存储锁状态,偏向锁、轻量级锁、重量级锁会根据不同场景用于优化性能。
Java并发编程之synchronized底层实现原理分析
2024-04-02

Synchronized的底层实现原理(原理解析,面试必备)

synchronized 一. synchronized解读 1.1 简单描述 synchronized关键字解决的是多个线程之间访问资源的同步性,synchronized 翻译为中文的意思是同步,也称之为同步锁。 synchronize
2023-08-19

如何从Hotspot源码层面剖析java多态实现原理

今天给大家介绍一下如何从Hotspot源码层面剖析java多态实现原理。文章的内容小编觉得不错,现在给大家分享一下,觉得有需要的朋友可以了解一下,希望对大家有所帮助,下面跟着小编的思路一起来阅读吧。C++是如何实现多态的多态的实现,现在几乎
2023-06-29

微信小程序底层的实现原理分析

这篇文章主要为大家展示了“微信小程序底层的实现原理分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“微信小程序底层的实现原理分析”这篇文章吧。从map组件说起在今天公布的开发文档里,我们知道使用
2023-06-26

剖析 Swoole 扩展的底层原理,发挥其最大潜力

Swoole 扩展作为 PHP 语言的高性能并发网络框架,因其强大的功能和优异的性能受到了广泛的关注和应用。为了充分发挥 Swoole 扩展的潜力,深入了解其底层原理至关重要。本文将剖析 Swoole 扩展的底层原理,帮助开发者更好地理解和使用 Swoole。
剖析 Swoole 扩展的底层原理,发挥其最大潜力
2024-02-05

Java中Set集合的使用和底层原理解析

这篇文章主要介绍了Java中Set集合的使用和底层原理,Set集合的功能上基本上与Collection的API一致,Set集合没有扩展额外的API,本文通过示例代码给大家详细讲解,需要的朋友可以参考下
2022-12-10

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录