我的编程空间,编程开发者的网络收藏夹
学习永远不晚

详解Redis Stream做消息队列

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

详解Redis Stream做消息队列

List

众所周知redis数据结构中的list的lpush与rpop可以用于常规消息队列,从集合的最左端写入,最右端弹出消费。并且支持多个生产者与多个消费者并发拿数据,数据只能由一个消费者拿到。

但这个方案并不能保证消费者消费消息后是否成功处理的问题(服务挂掉或处理异常等),机制属于点对点模式不能做广播模式(发布/订阅模式)

Pub/sub

于是redis提供了相应的发布订阅功能,为了解除点对点的强绑定模式引入了Channel管道

当生产者向管道中发布消息,订阅了该管道的消费者能够同时接收到该消息,而且为了简化订阅多个管道需要显式关注多个名称提供了pattern能力。

详解Redis Stream做消息队列

通过名称匹配如果接收消息的频道wmyskxz.chat,consumer3也会收到消息。

但这个方案也有很大的诟病就是不会持久化,如果服务挂掉重启数据就全丢弃了,也没有提供ack机制,不保证数据可靠性,不管有没有消费成功发后既忘。

Stream

stream的话结构很像kafka的设计思想,提供了consumer group和offset机制,结构上感觉跟kafka的topic差不多,只是没有对应partation副本机制,而是一个追加消息的链表结构。客户端调用XADD时候自动创建stream。每个消息都会持久化并存在唯一的id标识

详解Redis Stream做消息队列

Consumer Group

消费者组的概念跟kafka的消费者概念如出一辙,消费者既可以用XREAD命令进行独立消费,也可以多个消费者同时加入一个消费者组。一条消息只能由一个消费者组中的一个消费者消费。这样可以在分布式系统中保证消息的唯一性。

其实这个特性我后来仔细琢磨了一下当时自认为无懈可击的流式图表为了保证分布式系统消息唯一做了redis分布式锁。有点鸡肋,明明消费者组已经保证了数据的唯一性。只能说加锁可以压缩资源成本

last_delivered_id

用于标识消费者组消费在stream上消费位置的游标,每个消费者组都有一个stream内唯一的名称,消费者组不会自动创建,需要用XGROUP CREATE显式创建。

pending_ids

每个消费者内部都有一个状态变量。用来表示已经被客户端消费但没有ack的消费。目的是为了保证客户端至少消费了消息一次(atleastonce)。如果消费者收到了消息处理完了但是没有回复ack,就会导致列表不断增长,如果有很多消费组的话,那么这个列表占用的内存就会放大

curd

  • xadd 追加消息
  • xdel 删除消息,这里的删除仅仅是设置了标志位,不影响消息总长度
  • xrange 获取消息列表,会自动过滤已经删除的消息
  • xlen 消息长度
  • del 删除Stream

pending_ids如何避免消息丢失

在客户端消费者读取Stream消息时,Redis服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。

但是pending_ids里已经保存了发出去的消息ID。待客户端重新连上之后,可以再次收到pending_ids中的消息ID列表。

不过此时xreadgroup的起始消息必须是任意有效的消息ID,一般将参数设为0-0,表示读取所有的pending_ids消息以及自last_delivered_id之后的新消息。

嵌入SpringBoot

redis stream虽然还是有一些弊端,但是相比较而言用kafka之类的消息组件太重,redis用作消息队列已经很合适了。

这里简单提一下思路,本质上是提供一个管理消息的一个小功能,定义一个注解用于创建stream管道

详解Redis Stream做消息队列

创建一个注解类,标注该注解的类必须继承StreamListener<String, ObjectRecord<String, Object>>类且重写onMessage方法。方法上也加这个注解

创建一个config类实现BeanPostProcessor接口,重写bean声明周期postProcessAfterInitializationpostProcessBeforeInitialization方法。该方法会在spring启动流程里的refresh方法加载bean的声明周期中扫描到所有加了注解的bean。

通过线程池挨个创建stream的group组与stream的consumer监听连接,config类记得继承DisposableBean类在destroy方法里把连接关掉免得oom。

注册redis stream api提供的consumer容器

这里一定注意pollTimeout参数,看名字就知道默认拉取数据时间间隔,这个参数如果写的值很小或者写0,你就看你cpu高不高就完了。

@Bean("listenerContainer")
@DependsOn(value = "redisConnectionFactory")
public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> init() {
   StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>>
   options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
         .BATchSize(10)
         .serializer(new StringRedisSerializer())
         .executor(new ForkJoinPool())
         .pollTimeout(Duration.ofSeconds(3))
         .targetType(Object.class)
         .build();
   return StreamMessageListenerContainer.create(redisConnectionFactory, options);
}

创建消费者

private Subscription createSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) {
   StreamOperations<String, String, Object> streamOperations = this.stringRedisTemplate.opsForStream();

   if (stringRedisTemplate.hasKey(streamKey)) {
      StreamInfo.XInfoGroups groups = streamOperations.groups(streamKey);

      AtomicReference<Boolean> groupHasKey = new AtomicReference<>(false);

      groups.forEach(groupInfo -> {
         if (Objects.equals(group, groupInfo.getRaw().get("name"))) {
            groupHasKey.set(true);
         }
      });

      if (groups.isEmpty() || !groupHasKey.get()) {
         creatGroup(streamKey, group);
      } else {
         groups.stream().forEach(g -> {
            log.info("XInfoGroups:{}", g);
            StreamInfo.XInfoConsumers consumers = streamOperations.consumers(streamKey, g.groupName());
            log.info("XInfoConsumers:{}", consumers);
         });
      }
   } else {
      creatGroup(streamKey, group);
   }
   StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
   Consumer consumer = Consumer.from(group, consumerName);

   Subscription subscription = listenerContainer.receive(consumer, streamOffset, streamListener);
   listenerContainer.start();
   this.containerList.add(listenerContainer);
   return subscription;
}

到此这篇关于详解Redis Stream做消息队列的文章就介绍到这了,更多相关Redis Stream内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

详解Redis Stream做消息队列

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

redis怎么做消息队列

redis 消息队列Redis 是一个内存数据库,兼具消息代理的功能,可以用来构建高效的消息队列系统。如何使用 Redis 构建消息队列要使用 Redis 构建消息队列,需要执行以下步骤:创建队列列表和消息列表:使用 LPUSH 命令
redis怎么做消息队列
2024-05-21

redis stream 实现消息队列的实践

目录Redis 实现消息对列4中方法发布订阅list 队列zset 队列Stream 队列基本命令xadd 生产消息读取消息xgroup 消费者组xreadgroup 消费消息Pending 等待列表消息确认消息转移信息监控SpringBo
2022-08-10

java如何用redis做消息队列

本指南提供了如何在Java应用中使用Redis作为消息队列的全面教程。内容涵盖依赖项、Redis配置、连接与发送消息、订阅频道、数据结构、发布/订阅模式、持久化和最佳实践。通过遵循本指南,Java开发人员可以轻松将Redis集成到他们的应用中,实现高效、可扩展的消息传递。
java如何用redis做消息队列
2024-04-02

redis中怎么用list做消息队列

本文小编为大家详细介绍“redis中怎么用list做消息队列”,内容详细,步骤清晰,细节处理妥当,希望这篇“redis中怎么用list做消息队列”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。leftPush消息入
2023-06-29

详解Redis用链表实现消息队列

前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样。 链表实现消息队列Redis链表
2022-06-04

redis中的消息队列

这期内容当中的小编将会给大家带来有关redis中的消息队列介绍,以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。一、认识消息队列1.1 消息队列概念“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字
2022-11-30

详解redis是如何实现队列消息的ack

前言 由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列。 原生的redis中通过L/R PUSH/POP方式来实现队列的功能,这个当然是没办法满足需求的(没有ack功能),所以需
2022-06-04

redis消息队列丢失如何解决

要解决Redis消息队列丢失的问题,可以考虑以下几点:1. 持久化存储:Redis提供了持久化存储的功能,可以将消息在内存中的数据持久化到硬盘上,以防止数据丢失。可以使用Redis的RDB快照或者AOF日志来实现持久化存储。2. 设置合适的
2023-09-04

怎么使用redis消息队列

要使用Redis作为消息队列,你需要按照以下步骤进行操作:1. 安装并启动Redis服务。2. 在你的应用程序中引入Redis的客户端库,如redis-py(Python)、Jedis(Java)、StackExchange.Redis(C
2023-08-24

编程热搜

目录