我的编程空间,编程开发者的网络收藏夹
学习永远不晚

.Net实现延迟队列

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

.Net实现延迟队列

介绍

具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

使用场景

延时队列在项目中的应用还是比较多的,尤其像电商类平台:

  • 订单成功后,在30分钟内没有支付,自动取消订单
  • 外卖平台发送订餐通知,下单成功后60s给用户推送短信。
  • 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
  • 淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

该介绍来自其他文章

方案

下面的例子没有进行封装,所以代码仅供参考

Redis过期事件

注意:

不保证在设定的过期时间立即删除并发送通知,数据量大的时候会延迟比较大

不保证一定送达

发送即忘策略,不包含持久化

但是比如有些场景,对这个时间不是那么看重,并且有其他措施双层保障,该实现方案是比较简单。

redis自2.8.0之后版本提供Keyspace Notifications功能,允许客户订阅Pub / Sub频道,以便以某种方式接收影响Redis数据集事件。

配置

需要修改配置启用过期事件,比如在windows客户端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改内容是:

-- 取消注释
notify-keyspace-events Ex

-- 注释
#notify-keyspace-events ""

然后重新启动服务器,比如windows

 .\redis-server.exe  .\redis.windows.conf

或者linux中使用docker-compose重新部署redis

  redis:
    container_name: redis
    image: redis
    hostname: redis
    restart: always
    ports: 
      - "6379:6379"
    volumes: 
      - $PWD/redis/redis.conf:/etc/redis.conf
      - /root/common-docker-compose/redis/data:/data
    command: 
      /bin/bash -c "redis-server /etc/redis.conf" #启动执行指定的redis.conf文件

然后使用客户端订阅事件

-- windows
.\redis-cli
 
-- linux
docker exec -it 容器标识 redis-cli
 
psubscribe __keyevent@0__:expired

控制台订阅

使用StackExchange.Redis组件订阅过期事件

var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);

db.StringSet("orderno:123456", "订单创建", TimeSpan.FromSeconds(10));
Console.WriteLine("开始订阅");

var subscriber = connectionMultiplexer.GetSubscriber();

//订阅库0的过期通知事件
subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
{
    Console.WriteLine($"key过期 channel:{channel} key:{key}");
});

Console.ReadLine();

输出结果:

key过期 channel:keyevent@0:expired key:orderno:123456

如果启动多个客户端监听,那么多个客户端都可以收到过期事件。

WebApi中订阅

创建RedisListenService继承自:BackgroundService

public class RedisListenService : BackgroundService
{
    private readonly ISubscriber _subscriber;

    public RedisListenService(IServiceScopeFactory serviceScopeFactory)
    {
        using var scope = serviceScopeFactory.CreateScope();
        var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();

        var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
        var db = connectionMultiplexer.GetDatabase(0);
        _subscriber = connectionMultiplexer.GetSubscriber();
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        //订阅库0的过期通知事件
        _subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
        {
            Console.WriteLine($"key过期 channel:{channel} key:{key}");
        });

        return Task.CompletedTask;
    }
}

注册该后台服务

services.AddHostedService<RedisListenService>();

启用项目,给redis指定库设置值,等过期后会接收到过期通知事件。

RabbitMq延迟队列

版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2

要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的

插件介绍:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下:

docker run -d --name myrabbit -p 9005:15672 -p 5672:5672  -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez  rabbitmq:3-management-alpine

进入容器

docker exec -it 容器名称/标识 bash

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看是否启用

rabbitmq-plugins list

[E]和[e]表示启用,然后重启服务

rabbitmq-server restart

然后在管理界面添加交换机可以看到

生产消息

发送的消息类型是:x-delayed-message

[HttpGet("send/delay")]
public string SendDelayedMessage()
{
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",//IP地址
        Port = 5672,//端口号
        UserName = "admin",//用户账号
        Password = "123456",//用户密码
        VirtualHost = "customer"
    };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    var exchangeName = "delay-exchange";
    var routingkey = "delay.delay";
    var queueName = "delay_queueName";

    //设置Exchange队列类型
    var argMaps = new Dictionary<string, object>()
    {
        {"x-delayed-type", "topic"}
    };
    //设置当前消息为延时队列
    channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
    channel.QueueDeclare(queueName, true, false, false, argMaps);
    channel.QueueBind(queueName, exchangeName, routingkey);

    var time = 1000 * 5;
    var message = $"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}";
    var body = Encoding.UTF8.GetBytes(message);
    var props = channel.CreateBasicProperties();
    //设置消息的过期时间
    props.Headers = new Dictionary<string, object>()
            {
                {  "x-delay", time }
            };
    channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
    Console.WriteLine("成功发送消息:" + message);

    return "success";
}

消费消息

消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理

public class RabbitmqDelayedHostService : BackgroundService
{
    private readonly IModel _channel;
    private readonly IConnection _connection;

    public RabbitmqDelayedHostService()
    {
        var connFactory = new ConnectionFactory//创建连接工厂对象
        {
            HostName = "localhost",//IP地址
            Port = 5672,//端口号
            UserName = "admin",//用户账号
            Password = "123456",//用户密码
            VirtualHost = "customer"
        };
        _connection = connFactory.CreateConnection();
        _channel = _connection.CreateModel();

        //交换机名称
        var exchangeName = "exchangeDelayed";
        var queueName = "delay_queueName";
        var routingkey = "delay.delay";
        var argMaps = new Dictionary<string, object>()
        {
            {"x-delayed-type", "topic"}
        };
        _channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
        _channel.QueueDeclare(queueName, true, false, false, argMaps);
        _channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
        //声明为手动确认
        _channel.BasicQos(0, 1, false);
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var queueName = "delay_queueName";

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            var routingKey = ea.RoutingKey;
            Console.WriteLine($"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");

            //手动确认
            _channel.BasicAck(ea.DeliveryTag, true);
        };
        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

        return Task.CompletedTask;
    }

    public override void Dispose()
    {
        _connection.Dispose();
        _channel.Dispose();
        base.Dispose();
    }
}

注册该后台任务

services.AddHostedService<RabbitmqDelayedHostService>();

输出结果

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

其他方案

  • Hangfire延迟队列
BackgroundJob.Schedule(
  () => Console.WriteLine("Delayed!"),
   TimeSpan.FromDays(7));
  • 时间轮
  • Redisson DelayQueue
  • 计时管理器

到此这篇关于.Net实现延迟队列的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持编程网。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

.Net实现延迟队列

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

如何实现 Java 延迟队列?(java延迟队列怎么实现)

在Java开发中,延迟队列是一种非常实用的数据结构,它允许我们在指定的延迟时间后才执行队列中的元素。本文将详细介绍Java延迟队列的实现方式,帮助你更好地理解和使用它。一、延迟队列的概念延迟队列是一种特殊的队列,其中的元
如何实现 Java 延迟队列?(java延迟队列怎么实现)
Java2024-12-20

怎么在Redis中实现延迟队列和分布式延迟队列

这篇文章给大家介绍怎么在Redis中实现延迟队列和分布式延迟队列,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。1. 实现一个简单的延迟队列。  我们知道目前JAVA可以有DelayedQueue,我们首先开一个Dela
2023-06-15

RabbitMQ 如何实现延迟队列?

实现 RabbitMQ 延迟队列目前主流的实现方式,是采用官方提供的延迟插件来实现。而延迟插件需要先下载插件、然后配置并重启 RabbitMQ 服务,之后就可以通过编写代码的方式实现延迟队列了。

redis延迟队列如何实现

redis 延迟队列的实现采用有序集合,将任务以分数(时间戳)存储,定期检索已到期的任务,删除并执行。步骤如下:创建有序集合 delayed_queue,将任务以分数(时间戳)存储。检索已到期的任务,分数介于 0 到当前时间戳之间。删除已到
redis延迟队列如何实现
2024-06-12

Redis如何实现延迟队列

目录Redis实现延迟队列Redis延迟队列Redis实现延时队列的优化方案延时队列的应用延时队列的实现总结Redis实现延迟队列Redis延迟队列Redis 是通过有序集合(ZSet)的方式来实现延迟消息队列的,ZSet 有一个 Sc
2023-04-28

说说MQ延迟队列实现原理?

延迟队列(Delay Queue)是一种特殊类型的队列,它的主要特点是可以让进入队列的元素在指定的延迟时间之后才被取出进行处理。

Java如何实现异步延迟队列

这篇文章主要介绍“Java如何实现异步延迟队列”,在日常操作中,相信很多人在Java如何实现异步延迟队列问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java如何实现异步延迟队列”的疑惑有所帮助!接下来,请跟
2023-07-05

使用Redis怎么实现延迟队列

本篇文章给大家分享的是有关使用Redis怎么实现延迟队列,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。方案一:采用通过定时任务采用数据库/非关系型数据库轮询方案。优点:1. 实
2023-06-15

Java 延迟队列的实现方式究竟有哪些?(java延迟队列的实现方式是什么)

在Java开发中,延迟队列是一种非常实用的数据结构,它允许我们在指定的延迟时间后再取出队列中的元素。这种特性在许多场景下都非常有用,比如任务调度、消息队列等。那么,Java延迟队列的实现方式究竟有哪些呢?一、DelayQueue简介
Java 延迟队列的实现方式究竟有哪些?(java延迟队列的实现方式是什么)
Java2024-12-14

Redis实现延迟队列的方案总结

Redis实现的延迟队列适用于处理一些比较简单的业务,如发送邮件、发送通知等,对于复杂的业务不适用于Redis的延迟任务方案。​

RabbitMQ实现延迟队列的技术探讨

RabbitMQ提供了灵活的消息处理机制,使得实现延迟队列成为可能。通过使用rabbitmq-delayed-message-exchange插件或利用RabbitMQ的TTL和死信队列功能,你可以根据实际需求选择适合的方案来实现延迟队列。

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录