我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Java并发编程之阻塞队列(BlockingQueue)详解

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Java并发编程之阻塞队列(BlockingQueue)详解

大家好,我是小黑,一个在互联网苟且偷生的农民工。

队列

学过数据结构的同学应该都知道,队列是数据结构中一种特殊的线性表结构,和平时使用的List,Set这些数据结构相比有点特殊,它的特殊之处在于它只允许在队列的头部(Head)进行删除操作,在尾部(Tail)进行插入操作,这种方式的队列我们称之为先进先出队列(FIFO)。

在JDK1.5中推出了队列这一数据结构的具体实现,接口Queue是对于队列的定义,并有一些列具有特殊功能的队列实现。

在Queue接口中定义了队列的如下方法:

其中add(E)并非Queue接口新定义,而是从Collection接口继承而来的。

阻塞队列

BlockingQueue接口也是在JDK1.5中推出,存放在java.util.concurrent包中,继承自Queue,所以在BlockingQueue中有Queue的所有方法。

从名字就可以看出BlockingQueue是一种阻塞队列,它支持在检索元素时如果队列为空可以一直阻塞等待直到有元素可以获取,同样在添加元素时如果队列已满会阻塞等待队列中有空闲的存储空间。

BlockingQueue的方法可以归纳为四类:

  • 在操作时如不能立即满足,会直接抛出异常
  • 在操作时如不能立即满足,则返回特殊的值,如插入、移除方法会返回false,检查方法会返回null
  • 在操作时如不能立即满足,则会阻塞等待,直到操作成功
  • 在操作时如不能立即满足,则会阻塞等待给定的时间长度,时间到达后如果还不能满足则返回null

这四类方法总结如下。

因为在BlockingQueue的一些方法中,会通过null表示某种操作的失败,所以不允许在BlockingQueue中存放null值元素,会在操作时抛出NullPointerExection异常。

BlockingQueue因为是一个容器嘛,所以它也有容量的限制,在具体实现类中有可以设置容量的实现类,也有不可以设置容量的实现类,不能设置容量的实现类容量默认为Integer.MAX_VALUE。

BlockingQueue是定义在java.util.concurrent包中,那么它在并发情况下到底是不是线程安全的呢?

在JDK提供的BlockingQueue的具体实现类中,上面表格中的方法实现都是线程安全的,在内部都使用了锁或者其他形式的并发控制保证操作的原子性。

但是有一点要注意,就是一些批量处理的方法例如addAll、containsAll、retainAll和removeAll这些方法并不一定是线程安全的,使用时注意。

说完BlockingQueue接口我们接下来看看它都有哪些具体的实现呢?以及在它们内部是如何做到线程安全和阻塞的呢?

ArrayBlockingQueue

ArrayBlockingQueue是一个底层由数组支持额有界阻塞队列。

重要属性

先来看看ArrayBlockingQueue中都有哪些属性。


// 存放元素的数组
final Object[] items;
// 用来记录取元素的下标,用于下一次在take,poll,remove,peek方法中使用
int takeIndex;
// 用来记录添加元素的下标,用于下一次put,offer,add等方法使用
int putIndex;
// 记录队列中元素数量
int count;
// 用于控制并发访问时保证线程安全的锁
final ReentrantLock lock;
// 用于队列空时阻塞和唤醒等待线程的条件
private final Condition notEmpty;
// 用于队列满时阻塞和唤醒等待线程的条件
private final Condition notFull;

我们通过这些队列中的属性基本可以知道ArrayBlockingQueue中都有哪些重要信息,可以看出ArrayBlockingQueue就是使用Object[]来存放元素的。

那么应该如何创建一个ArrayBlockingQueue呢?

构造方法


public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

默认的构造方法需要传入一个int类型的capacity表示该队列的容量。在该构造方法中会调用另一个构造方法,传入一个默认值false。


public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

从这个方法我们看出传入的false表示会在内部用于创建一个ReentrantLock对象,我们都知道ReentrantLock支持公平和非公平的实现,我们猜想一下,这里的这个fair值是不是表示该阻塞队列对于阻塞排队的线程支持公平和非公平的策略呢?这里先卖个关子,在后面的方法中我们具体说。

除了这两种创建的方式,ArrayBlockingQueue还支持传入一个Collection集合。


public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
    // 先创建一个ArrayBlockingQueue实例
    this(capacity, fair);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = 0;
        try {
            // 循环将collection中的元素放入queue中
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            // 如果collection的元素个数超出queue的容量大小,会抛出异常
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

添加元素

先来看看添加一个新元素到ArrayBlockingQueue是如何实现的,怎样保证线程安全的。

add(e)


public boolean add(E e) {
    // 调用父类中的add(e)方法
    return super.add(e);
}
public boolean add(E e) {
    // 这里会直接调用offer(e)方法,如果offer方法返回false,则直接抛出异常
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

add方法的实现逻辑本质上是对offer方法套了一层壳,如果offer方法返回false时,抛出异常。所以我们直接看offer方法的实现就好。

offer(e)


public boolean offer(E e) {
    // 这里先判断空,如果e为空会抛出空指针异常
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
	// 加锁,保证入队操作的原子性
    lock.lock();
    try {
        // 队列满时直接返回false
        if (count == items.length)
            return false;
        else {
            // 元素入队
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

可以看到offer方法的逻辑还是比较简单的,先检查入参不能为空,然后加锁保证入队操作的原子性,在获取锁成功后入队,如果队列已满则直接返回false,所以offer方法并不会阻塞。

put(e)


public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
	// 可被中断方式获取锁
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 队列满时会阻塞
            notFull.await();
        // 入队
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

put方法和offer方法唯一的区别,就是会在队列满的时候使用Condition条件对象notFull阻塞等待。


private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 入队成功,唤醒等待的移除元素操作线程
    notEmpty.signal();
}

在enqueue方法中才会完成对队列中的数组元素的赋值动作,完成之后唤醒阻塞等待的移除元素操作线程。

offer(e,time,unit)


public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    checkNotNull(e);
    // 加锁之前先获取需要等待的时间值
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            // 时间小于等于0时,返回false
            if (nanos <= 0)
                return false;
            // 阻塞等待指定时间
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

offer(e,time,unit)方法与offer(e)方法相比,主要时多了一个等待时间,会在时间到达时如果没有空间添加元素返回false。

移除元素

ArrayBlockingQueue中移除元素的方法主要有remove(),poll(),take(),poll(time,unit)四个。这几个方法的实现逻辑都比较简单,这里不在单独贴代码 。我们来看一下阻塞方法take()的实现即可。

take()


public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
	// 加锁
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 如果元素数量==0,表示队列中为空,则阻塞等待
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

dequeue()


private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 取出元素之后,唤醒其他等待线程。
    notFull.signal();
    return x;
}

LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表结构的阻塞队列,可以在创建时指定边界大小,也可以不指定,在不指定边界时容量为Integer.MAX_VALUE。

重要属性

我们先来看看在LinkedBlockingQueue中都有哪些重要的属性。


// 内部类Node节点,用来存放链表中的元素
static class Node<E> {
    // 节点元素
	E item;
	// 当前节点的下一个节点,如果为空表示没有下一个节点
	Node<E> next;
	Node(E x) { item = x; }
}
// 队列的容量
private final int capacity;
// 队列中元素的数量
private final AtomicInteger count = new AtomicInteger();
// 头节点
transient Node<E> head;
// 最后一个节点
private transient Node<E> last;
// 获取元素时控制线程安全的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 添加元素时控制线程安全的锁
private final ReentrantLock putLock = new ReentrantLock();
// 控制消费者的条件
private final Condition notEmpty = takeLock.newCondition();
// 控制生产者的条件
private final Condition notFull = putLock.newCondition();

在LinkedBlockingQueue中使用Node来存放元素,和指向下一个节点的链表指针。

构造方法

在LinkedBlockingQueue的构造方法中,会创建一个创建一个不存放元素的Node对象赋值给head和last。


public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 创建一个不存放元素的Node对象赋值给head和last
    last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                // 入队
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

添加元素

offer(e)


public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    // 使用putLock加锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            // 入队
            enqueue(node);
            // 数量+1
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                // 唤醒一个生产者线程
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        // 唤醒消费者线程
        signalNotEmpty();
    // 入队失败情况会返回false
    return c >= 0;
}

对于链表结构的LinkedBlockingQueue来说,入队操作要简单很多,只需要将node节点挂在最后一个节点last的next,然后将自己赋值给last。


private void enqueue(Node<E> node) {
    last = last.next = node;
}

put(e)


public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    // 使用putLock加锁
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            // 如果队列容量已使用完则阻塞
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

对比结果也和我们最开始的方法汇总表格一样,offer(e)方法会在入队时如果队列已满直接返回false,而put(e)会一直阻塞等待,知道入队成功。

add(e)方法和offer(e,time,unit)方法实现逻辑上没有特殊之处,这里不再放源码。

移除元素

poll()


public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
    // 使用takeLock加锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    // 还有元素时唤醒一个生产者线程
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            // 唤醒生产者线程
            signalNotFull();
        return x;
    }

poll()方法会在元素出队时如果没有元素则直接返回null。


// 出队方法
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; 
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

take()


public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    // 使用takeLock加锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            //阻塞等待
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            // 还有元素时唤醒一个消费者线程
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        // 唤醒生产者线程
        signalNotFull();
    return x;
}

同样,take方法会在没有元素时一直等待。

对比

我们来对比一下ArrayBlockingQueue和LinkedBlockingQueue都有哪些区别。

  • ArrayBlockingQueue基于数组实现,LinkedBlockingQueue基于链表实现
  • ArrayBlockingQueue在添加和移除元素的操作中共用一把锁,LinkedBlockingQueue使用takeLock和putLock两把锁
  • ArrayBlockingQueue在添加和移除元素时直接使用元素的类型处理,LinkedBlockingQueue需要转成Node对象
  • ArrayBlockingQueue创建时必须指定容量,LinkedBlockingQueue可以不指定,默认容量为Integer.MAX_VALUE

由于LinkedBlockingQueue使用两把锁将入队操作和出队操作分离,这会大大提高队列的吞吐量,在高并发情况下生产者和消费者可以并行处理,提高并发性能。

但是LinkedBlockingQueue默认是无界队列,要小心内存溢出风险,所以最好在创建时指定容量大小。

BlockingQueue接口的实现类除了本期介绍的这两种,还有PriorityBlockingQueue,SynchronousQueue,LinkedBlockingDeque等,每一个都有它独特的特性和使用场景,后面我们再单独深入解析。

总结

本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注编程网的更多内容!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Java并发编程之阻塞队列(BlockingQueue)详解

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

java 中 阻塞队列BlockingQueue详解及实例

java 中 阻塞队列BlockingQueue详解及实例BlockingQueue很好的解决了多线程中数据的传输,首先BlockingQueue是一个接口,它大致有四个实现类,这是一个很特殊的队列,如果BlockQueue是空的,从Blo
2023-05-31

详解Java阻塞队列(BlockingQueue)的实现原理

阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取
2023-05-31

Java多线程案例之阻塞队列详解

阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.阻塞队列能是一种线程安全的数据结构。本文将通过一些示例为大家详细讲讲阻塞队列的原理与使用,感兴趣的小伙伴可以学习一下
2022-11-13

linux中编写自己的并发队列类(Queue 并发阻塞队列)

设计并发队列#include #include using namespace std; template class Queue { public: Queue( )
2022-06-04

Java阻塞队列必看类:BlockingQueue快速了解大体框架和实现思路

这篇文章主要介绍了Java阻塞队列必看类:BlockingQueue快速了解大体框架和实现思路,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2022-11-13

Java并发编程之LinkedBlockingQueue队列怎么使用

这篇文章主要介绍了Java并发编程之LinkedBlockingQueue队列怎么使用的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java并发编程之LinkedBlockingQueue队列怎么使用文章都会有
2023-06-30

详解Java并发编程中的优先级队列PriorityBlockingQueue

PriorityBlockingQueue是Java中实现了堆数据结构的线程安全的有界阻塞队列。本文将会深入解读PriorityBlockingQueue的源码实现,感兴趣的可以了解一下
2023-05-18

Java多线程之多种锁和阻塞队列的示例分析

这篇文章给大家分享的是有关Java多线程之多种锁和阻塞队列的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。一、悲观锁和乐观锁1.1. 乐观锁顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以
2023-06-15

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录