BlockingQueue接口及ArrayBlockingQueue实现类的方法
这篇文章主要介绍“BlockingQueue接口及ArrayBlockingQueue实现类的方法”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“BlockingQueue接口及ArrayBlockingQueue实现类的方法”文章能帮助大家解决问题。
队列是一种 FIFO
(先进先出)的数据结构,本文要讲的 BlockingQueue
也是一种队列,而且强调了线程安全的特性。
BlockingQueue
全称:java.util.concurrent.BlockingQueue
。它是是一个线程安全的队列接口,多个线程能够以并发的方式从队列中插入数据,取出数据的同时不会出现线程安全的问题。
生产者和消费者例子
BlockingQueue
通常用于消费者线程向队列存入数据,消费者线程从队列中取出数据,具体如下
生产者线程不停的向队列中插入数据,直到队列满了,生产者线程被阻塞
消费者线程不停的从队列中取出数据,直到队列为空,消费者线程被阻塞
(推荐教程:Java教程)
BlockingQueue 方法
BlockingQueue
提供 4 种不同类型的方法用于插入数,取出数据以及检查数据,具体如下
操作失败,抛出异常
无论成功/失败,立即返回
true/false
如果队列为空/满,阻塞当前线程
如果队列为空/满,阻塞当前线程并有超时机制插入
add(o)
offer(o)
put(o)
offer(o, timeout, timeunit)
取出remove(o)
poll()
take()
poll(timeout, timeunit)
检查element()
peek()
BlockingQueue 的具体实现类
BlockingQueue
只是一个接口,在实际开发中有如下的类实现了该接口。
ArrayBlockingQueue
DelayQueue
LinkedBlockingQueue
PriorityBlockingQueue
SynchronousQueue
ArrayBlockingQueue 的使用
这里以 BlockingQueue
接口的具体实现类 ArrayBlockingQueue
举例。通过 ArrayBlockingQueue
实现一个消费者和生产者多线程模型。
核心内容如下:
以
ArrayBlockingQueue
作为生产者和消费者的数据容器通过
ExecutorService
启动 3 个线程,2 两个生产者,1 个消费者指定数据总量
生产者线程
ArrayBlockingQueueProducer
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.atomic.AtomicInteger;public class ArrayBlockingQueueProducer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueProducer.class); // 容器 private ArrayBlockingQueue<String> queue; // 生产指定的数量 private AtomicInteger numberOfElementsToProduce; public ArrayBlockingQueueProducer(ArrayBlockingQueue<String> queue, AtomicInteger numberOfElementsToProduce) { this.queue = queue; this.numberOfElementsToProduce = numberOfElementsToProduce; } @Override public void run() { try { while (numberOfElementsToProduce.get() > 0) { try { // 向队列中存入任务 String task = String.format("task_%s", numberOfElementsToProduce.getAndUpdate(x -> x-1)); queue.put(task); logger.info("thread {}, produce task {}", Thread.currentThread().getName(), task); // 任务为0,生产者线程退出 if (numberOfElementsToProduce.get() == 0) { break; } } catch (Exception e) { e.printStackTrace(); } } } catch (Exception e) { logger.error(this.getClass().getName().concat(". has error"), e); } }}
消费者线程
ArrayBlockingQueueConsumer
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.atomic.AtomicInteger;public class ArrayBlockingQueueConsumer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueConsumer.class); private ArrayBlockingQueue<String> queue; private AtomicInteger numberOfElementsToProduce; public ArrayBlockingQueueConsumer(ArrayBlockingQueue<String> queue, AtomicInteger numberOfElementsToProduce) { this.queue = queue; this.numberOfElementsToProduce = numberOfElementsToProduce; } @Override public void run() { try { while (!queue.isEmpty() || numberOfElementsToProduce.get() >= 0) { // 从队列中获取任务,并执行任务 String task = queue.take(); logger.info("thread {} consume task {}", Thread.currentThread().getName(),task); // 队列中数据为空,消费者线程退出 if (queue.isEmpty()) { break; } } } catch (Exception e) { logger.error(this.getClass().getName().concat(". has error"), e); } }}
测试TestBlockingQueue
import com.ckjava.synchronizeds.appCache.WaitUtils;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicInteger;public class TestBlockingQueue { public static void main(String[] args) { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(10); ExecutorService executorService = Executors.newFixedThreadPool(3); // 最多生产 5 个数据 AtomicInteger numberOfElementsToProduce = new AtomicInteger(5); // 2 个生产者线程 executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce)); executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce)); // 1 个消费者线程 executorService.submit(new ArrayBlockingQueueConsumer(arrayBlockingQueue, numberOfElementsToProduce)); executorService.shutdown(); WaitUtils.waitUntil(() -> executorService.isTerminated(), 1000L); }}
输出如下:
13:54:17.884 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_513:54:17.884 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_513:54:17.884 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_413:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_413:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_213:54:17.887 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_313:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_313:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_113:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_213:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_1
关于“BlockingQueue接口及ArrayBlockingQueue实现类的方法”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注编程网行业资讯频道,小编每天都会为大家更新不同的知识点。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341