我的编程空间,编程开发者的网络收藏夹
学习永远不晚

运用.net core中实例讲解RabbitMQ

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

运用.net core中实例讲解RabbitMQ

一、RabbitMQ简介

是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang(高并发语言)语言来编写的,并且RabbitMQ是基于AMQP协议的。

(1) AMQP协议

Advanced Message Queuing Protocol(高级消息队列协议)

(2)AMQP专业术语

(多路复用->在同一个线程中开启多个通道进行操作)

  • Server:又称broker,接受客户端的链接,实现AMQP实体服务
  • Connection:连接,应用程序与broker的网络连接
  • Channel:网络信道,几乎所有的操作都在channel中进行,Channel是进行消息读写的通道。客户端可以建立多个channel,每个channel代表一个会话任务。
  • Message:消息,服务器与应用程序之间传送的数据,由Properties和Body组成.Properties可以对消息进行修饰,必须消息的优先级、延迟等高级特性;Body则是消息体内容。
  • virtualhost: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个virtual host里面可以有若干个Exchange和Queue,同一个Virtual Host 里面不能有相同名称的Exchange 或 Queue。
  • Exchange:交换机,接收消息,根据路由键转单消息到绑定队列
  • Binding: Exchange和Queue之间的虚拟链接,binding中可以包换routing key
  • Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息。(如负载均衡)

(3)RabbitMQ整体架构

ClientA(生产者)发送消息到Exchange1(交换机),同时带上RouteKey(路由Key),Exchange1找到绑定交换机为它和绑定传入的RouteKey的队列,把消息转发到对应的队列,消费者Client1,Client2,Client3只需要指定对应的队列名即可以消费队列数据。

交换机和队列多对多关系,实际开发中一般是一个交换机对多个队列,防止设计复杂化。

二、安装RabbitMQ

安装方式不影响下面的使用,这里用Docker安装

 #15672端口为web管理端的端口,5672为RabbitMQ服务的端口
docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management

输入:ip:5672访问验证。

建一个名为develop的Virtual host(虚拟主机)使用,项目中一般是一个项目建一个Virtual host用,能够隔离队列。

切换Virtual host

三、RabbitMQ六种队列模式在.NetCore中使用

(1)简单队列

最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式

描述:一个生产者 P 发送消息到队列 Q,一个消费者 C 接收

建一个RabbitMQHelper.cs类


/// <summary>
    /// RabbitMQ帮助类
    /// </summary>
    public class RabbitMQHelper
    {
        private static ConnectionFactory factory;
        private static object lockObj = new object();
        /// <summary>
        /// 获取单个RabbitMQ连接
        /// </summary>
        /// <returns></returns>
        public static IConnection GetConnection()
        {
            if (factory == null)
            {
                lock (lockObj)
                {
                    if (factory == null)
                    {
                         factory = new ConnectionFactory
                        {
                            HostName = "172.16.2.84",//ip
                            Port = 5672,//端口
                            UserName = "admin",//账号
                            Password = "123456",//密码
                            VirtualHost = "develop" //虚拟主机
                        };
                    }
                }
            }
            return factory.CreateConnection();
        }
    }

生产者代码

新建发送类Send.cs


public static void SimpleSendMsg()
        {
            string queueName = "simple_order";//队列名
            //创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {//创建队列
                    channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    for (var i = 0; i < 10; i++)
                    {
                        string message = $"Hello RabbitMQ MessageHello,{i + 1}";
                        var body = Encoding.UTF8.GetBytes(message);//发送消息
                        channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body);
                        Console.WriteLine($"发送消息到队列:{queueName},内容:{message}");
                    }
                }
            }
        }

创建队列参数解析:

durable:是否持久化。

exclusive:排他队列,只有创建它的连接(connection)能连,创建它的连接关闭,会自动删除队列。

autoDelete:被消费后,消费者数量都断开时自动删除队列。

arguments:创建队列的参数。

发送消息参数解析:

exchange:交换机,为什么能传空呢,因为RabbitMQ内置有一个默认的交换机,如果传空时,就会用默认交换机。

routingKey:路由名称,这里用队列名称做路由key。

mandatory:true告诉服务器至少将消息route到一个队列种,否则就将消息return给发送者;false:没有找到路由则消息丢弃。

执行效果:

队列产生10条消息。

消费者代码

新建Recevie.cs类


public static void SimpleConsumer()
        {
            string queueName = "simple_order";
            var connection = RabbitMQHelper.GetConnection();
            {
                //创建信道
                var channel = connection.CreateModel();
                {
                    //创建队列
                     channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    int i = 0;
                    consumer.Received += (model, ea) =>
                    {
                        //消费者业务处理
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},队列{queueName}消费消息长度:{message.Length}");
                        i++;
                    };
                    channel.BasicConsume(queueName, true, consumer);
                }
            }
        }

消费者只需要知道队列名就可以消费了,不需要Exchange和routingKey。

注:消费者这里有一个创建队列,它本身不需要,是预防消费端程序先执行,没有队列会报错。

执行效果:

消息已经被消费完。

(2)工作队列模式

一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式

生产者P发送消息到队列,多个消费者C消费队列的数据。

工作队列也称为公平性队列模式,循环分发,RabbitMQ将按顺序将每条消息发送给下一个消费者,每个消费者将获得相同数量的消息。

生产者

Send.cs代码:


/// <summary>
        /// 工作队列模式
        /// </summary>
        public static void WorkerSendMsg()
        {
            string queueName = "worker_order";//队列名
            //创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {
                    //创建队列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //消息持久化
                    for ( var i=0;i<10;i++)
                    {
                        string message = $"Hello RabbitMQ MessageHello,{i+1}";
                        var body = Encoding.UTF8.GetBytes(message);
                        //发送消息到rabbitmq
                        channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: properties, body);
                        Console.WriteLine($"发送消息到队列:{queueName},内容:{message}");
                    }
                }
            }
        }

参数durable:true,需要持久化,实际项目中肯定需要持久化的,不然重启RabbitMQ数据就会丢失了。

执行效果:

写入10条数据,有持久化标识D

消费端

Recevie代码:


public static void WorkerConsumer()
        {
            string queueName = "worker_order";
            var connection = RabbitMQHelper.GetConnection();
            {
                //创建信道
                var channel = connection.CreateModel();
                {
                    //创建队列
                    channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    //prefetchCount:1来告知RabbitMQ,不要同时给一个消费者推送多于 N 个消息,也确保了消费速度和性能
                    channel.BasicQos(prefetchSize: 0, prefetchCount:1, global: false);
                    int i = 1;
                    int index = new Random().Next(10);
                    consumer.Received += (model, ea) =>
                    {
                        //处理业务
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},消费者:{index},队列{queueName}消费消息长度:{message.Length}");
                         channel.BasicAck(ea.DeliveryTag, false); //消息ack确认,告诉mq这条队列处理完,可以从mq删除了               Thread.Sleep(1000);                        i++;
                    };
                    channel.BasicConsume(queueName,autoAck:false, consumer);
                }
            }
        }

BasicQos参数解析:

prefetchSize:每条消息大小,一般设为0,表示不限制。

prefetchCount:1,作用限流,告诉RabbitMQ不要同时给一个消费者推送多于N个消息,消费者会把N条消息缓存到本地一条条消费,如果不设,RabbitMQ会进可能快的把消息推到客户端,导致客户端内存升高。设置合理可以不用频繁从RabbitMQ 获取能提升消费速度和性能,设的太多的话则会增大本地内存,需要根据机器性能合理设置,官方建议设为30。

global:是否为全局设置。

这些限流设置针对消费者autoAck:false时才有效,如果是自动Ack的,限流不生效。

执行两个消费者,效果:

可以看到消费者号的标识,8,2,8,2是平均的,一个消费者5个,RabbitMQ上也能看到有2个消费者,Unacked数是2,因为每个客户端的限流数是1。

工作队列模式也是很常用的队列模式。

(3)发布订阅模式

Pulish/Subscribe,无选择接收消息,一个消息生产者,一个交换机(交换机类型为fanout),多个消息队列,多个消费者。称为发布/订阅模式

在应用中,只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。

生产者P只需把消息发送到交换机X,绑定这个交换机的队列都会获得一份一样的数据。

应用场景:适合于用同一份数据源做不同的业务。

生产者代码


/// <summary>
        /// 发布订阅, 扇形队列
        /// </summary>
        public static void SendMessageFanout()
        {
            //创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {
                    string exchangeName = "fanout_exchange";
                    //创建交换机,fanout类型
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
                    string queueName1 = "fanout_queue1";
                    string queueName2 = "fanout_queue2";
                    string queueName3 = "fanout_queue3";
                    //创建队列
                    channel.QueueDeclare(queueName1, false, false, false);
                    channel.QueueDeclare(queueName2, false, false, false);
                    channel.QueueDeclare(queueName3, false, false, false);

                    //把创建的队列绑定交换机,routingKey不用给值,给了也没意义的
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "");
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "");
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //消息持久化
                    //向交换机写10条消息
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Fanout {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "", null, body);
                        Console.WriteLine($"发送Fanout消息:{message}");
                    }
                }
            }
        }

执行代码:

向交换机发送10条消息,则绑定这个交换机的3个队列都会有10条消息。

消费端的代码和工作队列的一样,只需知道队列名即可消费,声明时要和生产者的声明一样。

(4)路由模式(推荐使用)

在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。

上图是一个结合日志消费级别的配图,在路由模式它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的direct模式。

生产者P发送数据是要指定交换机(X)和routing发送消息 ,指定的routingKey=error,则队列Q1和队列Q2都会有一份数据,如果指定routingKey=into,或=warning,交换机(X)只会把消息发到Q2队列。

生产者代码


/// <summary>
        /// 路由模式,点到点直连队列
        /// </summary>
        public static void SendMessageDirect()
        {
            //创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {
                    //声明交换机对象,fanout类型
                    string exchangeName = "direct_exchange";
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
                    //创建队列
                    string queueName1 = "direct_errorlog";
                    string queueName2 = "direct_alllog";
                    channel.QueueDeclare(queueName1, true, false, false);
                    channel.QueueDeclare(queueName2, true, false, false);

                    //把创建的队列绑定交换机,direct_errorlog队列只绑定routingKey:error
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "error");
                    //direct_alllog队列绑定routingKey:error,info
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "info");
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "error");
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //消息持久化
                    //向交换机写10条错误日志和10条Info日志
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Direct {i + 1} error Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "error", properties, body);
                        Console.WriteLine($"发送Direct消息error:{message}");

                        string message2 = $"RabbitMQ Direct {i + 1} info Message";
                        var body2 = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "info", properties, body2);
                        Console.WriteLine($"info:{message2}");

                    }
                }
            }
        }

这里创建一个direct类型的交换机,两个路由key,一个error,一个info,两个队列,一个队列只绑定error,一个队列绑定error和info,向error和info各发10条消息。

执行代码:

查看RabbitMQ管理界面,direct_errorlog队列10条,而direct_alllog有20条,因为direct_alllog队列两个routingKey的消息都进去了。

点进去看下两个队列绑定的交换机和routingKey

消费者代码

消费者和工作队列一样,只需根据队列名消费即可,这里只消费direct_errorlog队列作示例


public static void DirectConsumer()
        {
            string queueName = "direct_errorlog";
            var connection = RabbitMQHelper.GetConnection();
            {
                //创建信道
                var channel = connection.CreateModel();
                {
                    //创建队列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    ///prefetchCount:1来告知RabbitMQ,不要同时给一个消费者推送多于 N 个消息,也确保了消费速度和性能
                    ///global:是否设为全局的
                    ///prefetchSize:单条消息大小,通常设0,表示不做限制
                    //是autoAck=false才会有效
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    int i = 1;
                    consumer.Received += (model, ea) =>
                    {
                        //处理业务
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},队列{queueName}消费消息长度:{message.Length}");
                        channel.BasicAck(ea.DeliveryTag, false); //消息ack确认,告诉mq这条队列处理完,可以从mq删除了
                        i++;
                    };
                    channel.BasicConsume(queueName, autoAck: false, consumer);
                }
            }
        }

普通场景中推荐使用路由模式,因为路由模式有交换机,有路由key,能够更好的拓展各种应用场景。

(5)主题模式

topics(主题)模式跟routing路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由键 routingKey,类似于SQL中 = 和 like 的关系。

P 表示为生产者、 X 表示交换机、C1C2 表示为消费者,红色表示队列。

topics 模式与 routing 模式比较相近,topics 模式不能具有任意的 routingKey,必须由一个英文句点号“.”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词),比如 "lazy.orange.a"。topics routingKey 中可以存在两种特殊字符"*"与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

以上图为例:

如果发送消息的routingKey设置为:

aaa.orange.rabbit,那么消息会路由到Q1与Q2,

routingKey=aaa.orange.bb的消息会路由到Q1,

routingKey=lazy.aa.bb.cc的消息会路由到Q2;

routingKey=lazy.aa.rabbit的消息会路由到 Q2(只会投递给Q2一次,虽然这个routingKey 与 Q2 的两个 bindingKey 都匹配);

没匹配routingKey的消息将会被丢弃。

生产者代码


public static void SendMessageTopic()
        {
            //创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {
                    //声明交换机对象,fanout类型
                    string exchangeName = "topic_exchange";
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                    //队列名
                    string queueName1 = "topic_queue1";
                    string queueName2 = "topic_queue2";
                    //路由名
                    string routingKey1 = "*.orange.*";
                    string routingKey2 = "*.*.rabbit";
                    string routingKey3 = "lazy.#";
                    channel.QueueDeclare(queueName1, true, false, false);
                    channel.QueueDeclare(queueName2, true, false, false);

                    //把创建的队列绑定交换机,routingKey指定routingKey
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: routingKey1);
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey2);
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey3);
                    //向交换机写10条消息
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Direct {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "aaa.orange.rabbit", null, body);
                        channel.BasicPublish(exchangeName, routingKey: "lazy.aa.rabbit", null, body);
                        Console.WriteLine($"发送Topic消息:{message}");
                    }
                }
            }
        }

这里演示了 routingKey为aaa.orange.rabbit,和lazy.aa.rabbit的情况,第一个匹配到Q1和Q2,第二个匹配到Q2,所以应该Q1是10条,Q2有20条,

执行后看rabbitMQ界面:

(6)RPC模式

与上面其他5种所不同之处,该模式是拥有请求/回复的。也就是有响应的,上面5种都没有。

RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的处理业务,处理完后然后在A服务器继续执行下去,把异步的消息以同步的方式执行。

客户端(C)声明一个排他队列自己订阅,然后发送消息到RPC队列同时也把这个排他队列名也在消息里传进去,服务端监听RPC队列,处理完业务后把处理结果发送到这个排他队列,然后客户端收到结果,继续处理自己的逻辑。

RPC的处理流程:

  • 当客户端启动时,创建一个匿名的回调队列。
  • 客户端为RPC请求设置2个属性:replyTo:设置回调队列名字;correlationId:标记request。
  • 请求被发送到rpc_queue队列中。
  • RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
  • 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了。

服务端代码


public class RPCServer
    {
        public static void RpcHandle()
        {

            var connection = RabbitMQHelper.GetConnection();
            {
                var channel = connection.CreateModel();
                {
                    string queueName = "rpc_queue";
                    channel.QueueDeclare(queue: queueName, durable: false,
                      exclusive: false, autoDelete: false, arguments: null);
                    channel.BasicQos(0, 1, false);
                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume(queue: queueName,
                      autoAck: false, consumer: consumer);
                    Console.WriteLine("【服务端】等待RPC请求...");

                    consumer.Received += (model, ea) =>
                    {
                        string response = null;

                        var body = ea.Body.ToArray();
                        var props = ea.BasicProperties;
                        var replyProps = channel.CreateBasicProperties();
                        replyProps.CorrelationId = props.CorrelationId;

                        try
                        {
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine($"【服务端】接收到数据:{ message},开始处理");
                            response = $"消息:{message},处理完成";
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine("错误:" + e.Message);
                            response = "";
                        }
                        finally
                        {
                            var responseBytes = Encoding.UTF8.GetBytes(response);
                            channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                              basicProperties: replyProps, body: responseBytes);
                            channel.BasicAck(deliveryTag: ea.DeliveryTag,
                              multiple: false);
                        }
                    };
                }
            }
        }
       
    }

客户端


public class RPCClient
    {
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;

        public RPCClient()
        {
            connection = RabbitMQHelper.GetConnection();

            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new EventingBasicConsumer(channel);

            props = channel.CreateBasicProperties();
            var correlationId = Guid.NewGuid().ToString();
            props.CorrelationId = correlationId; //给消息id
            props.ReplyTo = replyQueueName;//回调的队列名,Client关闭后会自动删除

            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var response = Encoding.UTF8.GetString(body);
                //监听的消息Id和定义的消息Id相同代表这条消息服务端处理完成
                if (ea.BasicProperties.CorrelationId == correlationId)
                {
                    respQueue.Add(response);
                }
            };

            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);
        }

        public string Call(string message)
        {
            var messageBytes = Encoding.UTF8.GetBytes(message);
            //发送消息
            channel.BasicPublish(
                exchange: "",
                routingKey: "rpc_queue",
                basicProperties: props,
                body: messageBytes);
            //等待回复
            return respQueue.Take();
        }

        public void Close()
        {
            connection.Close();
        }
    }

执行代码


static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            //启动服务端,正常逻辑是在另一个程序
            RPCServer.RpcHandle();
            //实例化客户端
            var rpcClient = new RPCClient();
            string message = $"消息id:{new Random().Next(1, 1000)}";
            Console.WriteLine($"【客服端】RPC请求中,{message}");
            //向服务端发送消息,等待回复
            var response = rpcClient.Call(message);
            Console.WriteLine("【客服端】收到回复响应:{0}", response);
            rpcClient.Close();
            Console.ReadKey();
        }

测试效果:

z执行完,客服端close后,可以接着自己的下一步业务处理。

总结

以上便是RabbitMQ的6中模式在.net core中实际使用,其中(1)简单队列,(2)工作队列,(4)路由模式,(6)RPC模式的交换机类型都是direct,(3)发布订阅的交换机是fanout,(5)topics的交换机是topic。正常场景用的是direct,默认交换机也是direct类型的,推荐用(4)路由模式,因为指定交换机名比起默认的交换机会容易扩展场景,其他的交换机看业务场景所需使用。

下面位置可以看到交换机类型,amq.开头那几个是内置的,避免交换机过多可以直接使用。

到此这篇关于运用.net core中实例讲解RabbitMQ的文章就介绍到这了,更多相关.net core RabbitMQ内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

运用.net core中实例讲解RabbitMQ

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

.NET Core中RabbitMQ使用死信队列的实现

.NETCore中RabbitMQ死信队列用于存储因处理失败而被拒绝的消息。可通过设置x-dead-letter-exchange和x-dead-letter-routing-key属性或使用DeadLetterAdvancedConsumer特性实现。死信策略定义了消息被拒绝后的处理方式,包括拒绝退避、死信重路由或丢弃。最佳实践包括定义明确的死信策略、定期监视队列、使用发布者重试机制、定期清除队列以避免增长过大。
.NET Core中RabbitMQ使用死信队列的实现
2024-04-02

.NET Core中RabbitMQ使用死信队列如何实现

本篇内容介绍了“.NET Core中RabbitMQ使用死信队列如何实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!在.NET Core中
2023-07-05

Android中SharedPreference使用实例讲解

SharedPreference方面的内容还算是比较简单易懂的,在此还是主要贴上效果与代码,最后也是附上源码。 首先是输入账号admin,密码123,选择记住密码登陆。 登陆后就直接跳转页面。 随后再次打开app可以发现
2022-06-06

实例讲解Python中sys.argv[]的用法

sys.argv[]说白了就是一个从程序外部获取参数的桥梁,这个“外部”很关键,所以那些试图从代码来说明它作用的解释一直没看明白。因为我们从外部取得的参数可以是多个,所以获得的是一个列表(list),也就是说sys.argv其实可以看作是一
2022-06-02

实例讲解Android中SQLiteDatabase使用方法

SQLite数据库是android系统内嵌的数据库,小巧强大,能够满足大多数SQL语句的处理工作,而SQLite数据库仅仅是个文件而已。虽然SQLite的有点很多,但并不是如同PC端的mysql般强大,而且android系统中不允许通过JD
2022-06-06

Oracle中ROW_NUMBER()OVER()函数用法实例讲解

目录1. 说明:2. 原理:3.语法:4.示例一:5. 示例二总结oracle中ROW_NUMBER() OVER()函数用法1. 说明:ROW_NUMBER() OVER() 函数的作用:分组排序2. 原理:row_number(
2023-04-25

Log4net在.Net Winform项目中的使用实例详解

Log4net是一款开源的.NET日志记录库,用于记录应用程序的事件和错误信息。在Winform项目中使用Log4net分六步:引入Log4net库创建日志记录器配置日志记录记录日志消息查看日志输出探索其他设置(过滤器、滚动策略、异步记录等)Log4net提供了丰富的配置选项,可根据需要轻松定制日志记录行为。
Log4net在.Net Winform项目中的使用实例详解
2024-04-02

举例讲解Python中的身份运算符的使用方法

Python身份运算符 身份运算符用于比较两个对象的存储单元以下实例演示了Python所有身份运算符的操作:#!/usr/bin/pythona = 20 b = 20if ( a is b ):print "Line 1 - a and
2022-06-04

Android App在ViewPager中使用Fragment的实例讲解

据说Android最推荐的是在ViewPager中使用FragMent,即ViewPager中的页面不像前面那样用LayoutInflater直接从布局文件加载,而是一个个Fragment。注意这里的Fragment 是android.su
2022-06-06

实例讲解Android中ContentProvider组件的使用方法

ContentProvider基本使用 为了在应用程序之间交换数据,android提供了ContentProvider,ContentProvider是不同应用程序之间进行数据交换的标准API,当一个应用程序需要把自己的数据暴露给其他程序使
2022-06-06

实例讲解Vue中customRef函数的使用方法

Vue中如何使用customRef函数?下面本篇文章就来带大家了解一下VueJs中customRef函数的使用方法,希望对大家有所帮助!
2023-05-14

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录