我的编程空间,编程开发者的网络收藏夹
学习永远不晚

【JavaEE】多线程案例-阻塞队列

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

【JavaEE】多线程案例-阻塞队列

在这里插入图片描述

1. 前言

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

  • 在队列为空时,获取元素的线程会等待队列变为非空
  • 当队列满时,存储元素的线程会等待队列可用

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

2. 什么是生产者-消费者模型

生产者消费者模型是一种多线程并发协作的模型,由两类线程和一个缓冲区组成:生产者线程生产数据并把数据放在缓冲区,消费者线程从缓冲区取数据并消费。生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品。当存储空间为空时,消费者阻塞;当存储空间满时,生产者阻塞。
在这里插入图片描述

3. 生产者-消费者模型的意义

  1. 解耦合
  2. 削峰填谷

3.1 解耦合

两个模块的联系越紧密,耦合度就越高,耦合度越高就意味着两个模块的影响程度很大,当一个模块出现问题的时候,另一个模块也会受到影响而导致出现问题,特别是对分布式系统来说:解耦合是非常重要的。

在这里插入图片描述
假设上面是一个简单的分布式系统,服务器 A 和服务器 B 之间直接进行交互(服务器 A 向服务器 B 发送请求并接收服务器 B 返回的信息,服务器 B 向服务器 A 发送请求,以及接收服务器 A 返回的信息),服务器 A 和服务器 B 之间的耦合度比较高,当两个服务器之间的一个发生故障的时候就会导致两个服务器都无法使用。

不仅如此,当我们想要再添加一个服务器 C 与服务器 A 之间进行交互的时候,不仅需要对服务器 C 做出修改,还需要对服务器 A 作出修改。

相比上面的情况,如果我们使用生产者-消费者模型的话就可以解决上面的耦合度过高的问题。

在这里插入图片描述
服务器 A 接收到客户端发来的请求不是直接发送给服务器 B ,而是将接收到的请求加入到阻塞队列中,然后服务器 B 从阻塞队列中获取到请求,这样就避免了两个服务器之间进行直接的交互,降低了耦合性;不仅如此,当我们需要额外添加一个服务器 C 的时候,就不需要对服务器 A 做出修改,而是直接从阻塞队列获取请求信息。

在这里插入图片描述

3.2 削峰填谷

当客户端向服务器 A 短时间发出大量请求信息的话,那么当服务器 A 接收到客户端发来的请求的时候,就会立即将收到的所有信息都发送给服务器 B ,但是由于虽然服务器 A 能够接收的请求量可以很多,但是服务器 B 却不能一次接收这么多请求,就会导致服务器 B 会挂掉。

如果使用生产者-消费者莫模型的话,当客户端向服务器 A 短时间发送大量请求的话,服务器 A 不会将请求发送给 B ,而是发送给阻塞队列中,当阻塞队列满了的时候,服务器 A 就会停止向阻塞队列中发送请求,陷入阻塞状态,等服务器 B 向阻塞队列中受到请求使得阻塞队列容量减少的时候,服务器 A 才会继续向阻塞队列中发送收到的请求,这样就避免了服务器 B 短时间内受到大量的请求而挂掉的情况;如果阻塞队列中收到的请求信息被读取完的时候,服务器 B 就会停止从阻塞队列中读取请求,进入阻塞状态,直到服务器 A 向阻塞队列中发送请求。
在这里插入图片描述

4. 如何使用Java标准库提供的阻塞队列

当知道了生产者-消费者模型的意义之后,我们就来看看如何使用阻塞队列。在Java标准库中提供了阻塞队列 BlockingQueue 可以直接使用。

在这里插入图片描述
因为 BlockingQueu 是一个接口,无法实例化,所以需要创建出实现了 BlockingQueue 接口的类,而 ArrayBlockingQueue 和 LinkedBlockingQueue 实现了这个接口。

我们还可以观察到,BlockingQueue 还继承了 Queue ,也就是说我们也可以使用 Queue 中的方法,比如 offer 和 poll 等,但是在阻塞队列中不使用这两个方法,因为这两个方法不具有阻塞特性,而是使用 put 和 take 方法。

public class Demo1 {    public static void main(String[] args) throws InterruptedException {        BlockingQueue<String> queue = new LinkedBlockingQueue<>();        queue.put("123");        queue.put("234");        queue.put("345");        queue.put("456");        System.out.println(queue.take());        System.out.println(queue.take());        System.out.println(queue.take());        System.out.println(queue.take());        System.out.println(queue.take());    }}

在这里插入图片描述
这里向阻塞队列中加入了四个数据,但是读取的时候读取了五次,所以看到线程进入了阻塞状态。

5. 自己实现一个阻塞队列

阻塞队列是建立在队列的基础上的,所以要想实现一个阻塞队列,首先需要实现出来一个队列,那么就先来看看如何实现出一个循环队列。

5.1 实现出循环队列

队列比较容易实现,但是循环队列该如何实现呢?当数据到达数组的最后的时候,将数组的下标修改为0,这样就可以达到循环的目的。

在这里插入图片描述
当 tail == head 的时候有两种情况:

  1. 队列中没有数据的时候
  2. 队列满了的时候

为了区分这两种情况,我们可以使用两种方法:

  1. 浪费一个空间,当tail++之后,如果tail + 1 == head,则表示队列满了,将 tail 修改为 0
  2. 定义一个size变量来表示队列中有效数据的个数,当size == queue.length的时候,表示队列满了
class MyQueue {    private final String[] data = new String[1000];    private int size;    private int head = 0;    private int tail = 0;    public void put(String str) {        //当添加数据的时候需要判断队列的容量是否已经满了        if(size == data.length) return;        data[tail++] = str;        size++;        if(tail == data.length) tail = 0;    }    public String take() {    //读取数据的时候需要判断队列是否为空        if(size == 0) return null;        String ret = data[head++];        size--;        if(head == data.length) head = 0;        return ret;    }}

5.2 实现阻塞队列

阻塞队列就是在队列为空时,获取元素的线程会等待队列变为非空;当队列满时,存储元素的线程会等待队列可用。并且因为阻塞队列运用的环境是多线程,需要考虑到线程安全的问题。

5.2.1 加锁

当需要进行查询和修改的操作时,需要对该操作进行加锁。因为我们的 put 和 take 基本上都在查询和修改数据,所以可以将这两个操作直接进行加锁操作。

class MyBlockingQueue {    private final String[] data = new String[1000];    private int size;    private int head = 0;    private int tail = 0;    public void put(String str) {        synchronized (this) {            if(size == data.length) return;            data[tail++] = str;            size++;            if(tail == data.length) tail = 0;        }    }    public String take() {        synchronized (this) {            if(size == 0) return null;            String ret = data[head++];            size--;            if(head == data.length) head = 0;            return ret;        }    }}

5.2.2 进行阻塞操作

当进行完加锁操作之后,我们还需要实现阻塞的作用,当添加数据的时候,如果队列中容量满了的时候就进入阻塞等待状态,直到进行了 take 读取数据操作删除数据的时候,才停止等待;当读取数据的时候,如果队列为空,那么该线程就进入阻塞等待状态,直到进行了 put 操作。

class MyBlockingQueue {    private final String[] data = new String[1000];    private int size;    private int head = 0;    private int tail = 0;    public void put(String str) throws InterruptedException {        synchronized (this) {            if(size == data.length) {                this.wait();            }            data[tail++] = str;            size++;            if(tail == data.length) tail = 0;            //这个 notify 用来唤醒 take 操作的等待            this.notify();        }    }    public String take() throws InterruptedException {        synchronized (this) {            if(size == 0) {                this.wait();            }            String ret = data[head++];            size--;            if(head == data.length) head = 0;            //这个 notify 用来唤醒 put 操作的等待            this.notify();            return ret;        }    }}

5.2.3 解决因被 interrupt 唤醒 waiting 状态的问题

当使用了 wait 和 notify 对 put 和 take 操作进行了阻塞等待和唤醒操作之后,我们还需要注意,难道只有 notify 才会唤醒 WAITING 状态吗?前面我们学习了使用 interrupt 来终止线程,但是 interrup 还会唤醒处于 WAITING 状态的线程,也就是说这里的 WAITING 状态的线程不仅可以被 notify 唤醒,还可以被 interrupt 唤醒。

  1. 当线程是因为 put 操作队列满了的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 take 操作的 notify 唤醒的时候就意味着此时队列还是满的,当进行添加操作的时候,就会将有效的数据覆盖掉;
  2. 当线程是因为 take 操作队列为空的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 put 操作的 notify 唤醒的时候就意味着此时队列还是空的,如果进行删除操作,并没有意义。

为了解决 WAITING 状态被 interrupt 唤醒而造成的问题,当线程被唤醒的时候,需要进行判断 size 是否还等于 0 或者 queue.length,如果还等于,就继续进入 WAITING 状态,但是光一次判断是不够的,因为还可能是被 interrupt 唤醒的,所以需要进行多次判断,可以用 while 循环来解决。

class MyBlockingQueue {    private final String[] data = new String[1000];    private int size;    private int head = 0;    private int tail = 0;    public void put(String str) throws InterruptedException {        synchronized (this) {            while(size == data.length) {                this.wait();            }            data[tail++] = str;            size++;            if(tail == data.length) tail = 0;            //这个 notify 用来唤醒 take 操作的等待            this.notify();        }    }    public String take() throws InterruptedException {        synchronized (this) {            while(size == 0) {                this.wait();            }            String ret = data[head++];            size--;            if(head == data.length) head = 0;            //这个 notify 用来唤醒 put 操作的等待            this.notify();            return ret;        }    }}

5.2.4 解决因指令重排序造成的问题

因为 put 和 take 操作要进行读和写的操作,可能会因为指令重排序的问题造成其他问题,这里就需要使用 volatile 解决指令重排序问题。

class MyBlockingQueue {    private final String[] data = new String[1000];    private volatile int size;    private volatile int head = 0;    private volatile int tail = 0;    public void put(String str) throws InterruptedException {        synchronized (this) {            while(size == data.length) {                this.wait();            }            data[tail++] = str;            size++;            if(tail == data.length) tail = 0;            //这个 notify 用来唤醒 take 操作的等待            this.notify();        }    }    public String take() throws InterruptedException {        synchronized (this) {            while(size == 0) {                this.wait();            }            String ret = data[head++];            size--;            if(head == data.length) head = 0;            //这个 notify 用来唤醒 put 操作的等待            this.notify();            return ret;        }    }}

测试实现的阻塞队列

public class Demo4 {    public static void main(String[] args) {        MyBlockingQueue queue = new MyBlockingQueue();        Thread t1 = new Thread(() -> {            while(true) {                try {                    System.out.println("消费元素" + queue.take());                } catch (InterruptedException e) {                    throw new RuntimeException(e);                }            }        });        Thread t2 = new Thread(() -> {            int count = 1;            while(true) {                try {                    queue.put(count + "");                    System.out.println("生产元素" + count);                    count++;                    Thread.sleep(100);                } catch (InterruptedException e) {                    throw new RuntimeException(e);                }            }        });        t1.start();        t2.start();    }}

让生产速度较慢,使得读取操作阻塞等待插入数据才执行。

在这里插入图片描述

public class Demo4 {    public static void main(String[] args) {        MyBlockingQueue queue = new MyBlockingQueue();        Thread t1 = new Thread(() -> {            while(true) {                try {                    System.out.println("消费元素" + queue.take());                    Thread.sleep(100);                } catch (InterruptedException e) {                    throw new RuntimeException(e);                }            }        });        Thread t2 = new Thread(() -> {            int count = 1;            while(true) {                try {                    queue.put(count + "");                    System.out.println("生产元素" + count);                    count++;                } catch (InterruptedException e) {                    throw new RuntimeException(e);                }            }        });        t1.start();        t2.start();    }}

在这里插入图片描述

让生产 put 操作进入阻塞等待状态。

来源地址:https://blog.csdn.net/m0_73888323/article/details/132862839

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

【JavaEE】多线程案例-阻塞队列

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Java多线程案例之阻塞队列详解

阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.阻塞队列能是一种线程安全的数据结构。本文将通过一些示例为大家详细讲讲阻塞队列的原理与使用,感兴趣的小伙伴可以学习一下
2022-11-13

线程池02-LinkedBlockingQueue 阻塞队列

首先,我们先了解一下什么是阻塞队列:当队列满了时,队列会阻塞插入元素的线程,直到队列不满;当队列为空时,获取元素的线程会等待队列变成非空。常用到的方法上面是对阻塞队列的简单了解,下面重点分析一下LinkedBlockingQueue。源码分析Node节点可以看

	线程池02-LinkedBlockingQueue 阻塞队列
2021-10-24

Java多线程之多种锁和阻塞队列的示例分析

这篇文章给大家分享的是有关Java多线程之多种锁和阻塞队列的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。一、悲观锁和乐观锁1.1. 乐观锁顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以
2023-06-15

python 简单搭建阻塞式单进程,多进程,多线程服务的实例

我们可以通过这样子的方式去理解apache的工作原理 1 单进程TCP服务(堵塞式) 这是最原始的服务,也就是说只能处理个客户端的连接,等当前客户端关闭后,才能处理下个客户端,是属于阻塞式等待from socket import * ser
2022-06-04

python中队列基本操作和多线程队列的示例分析

这篇文章给大家分享的是有关python中队列基本操作和多线程队列的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。一、队列基本操作from queue import Queueq = Queue(5) #
2023-06-29

【Java系列】多线程案例学习——单例模式

个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【Java系列专栏】【JaveEE学习专栏】 本专栏旨在分享学习JavaEE的一点学习心得,欢迎大家
【Java系列】多线程案例学习——单例模式
2023-12-23

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录