【Python模块】rabbitMQ
RabbitMQ介绍:
父进程与子进程间,同一父继承可以用multiprocess的Manager模块来实现数据互访。
作用:RabbitMQ是为了实现相互独立的两个进程数据互访。
应用场景:不需要立即操作的数据。比如:发消息,发通知,发红包等。其它常见场景包括最终一致性、广播、错峰流控等等。
同类产品有:ActiveMQ、RabbitMQ、Kafka 、 ZeroMQ等。
RabbitMQ特点:
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
RabbitMQ 中的概念模型
消息模型:
所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。RabbitMQ 基本概念
上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念需要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。Connection
网络连接,比如一个TCP连接。Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。Broker
表示消息队列服务器实体。
AMQP 中的消息路由
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
direct
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
fanout
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
topic
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*只匹配一个单词。
#.a 可以匹配b.c.d.e.a , *.a只以可匹配b.a或c.a或e.a,而不能匹配a.b.a
RabbitMQ安装:
Windows7:
安装erlang(32bit or 64bit)。
设置erlang系统环境变量:添加系统变量ERLANG_HOME,路径:erlang安装路径。追加path -->路径%ERLANG_HOME%\bin(注:eshell版本:V10.0.1)
安装rabbitMQ。
设置rabbitMQ系统环境变量:添加系统变量RABBITMQ_HOME,路径:erlang安装路径。追加path -->路径%RABBITMQ_HOME%\sbin
开启rabbitmq服务:在cmd下运行,net start rabbitMQ
检查rabbitMQ是否安装成功:cmd下运行 rabbitctl list_users,没有报错,并显示用户名,成功
添加rabbitMQ的WEB管理插件:cmd下运行 rabbitmq-plusins enable rabbitmq_management
浏览器里输入:http://localhost:15672,访问WEB管理。
注:设置系统环境变量的目的在于,在任何目录下,都可以运行bat或exe文件。否则需要进入程序相应目录。
LINUX(在线安装):
当前linux版本--Centos 7,erlang版本19.0,rabbitmq版本3.6.6,(erlang与rabbitmq版本对应表:http://www.rabbitmq.com/which-erlang.html):
安装前需要安装gcc
安装前需要安装perl
安装wget:rpm -y install wget (wget是执行网络下载)
下载erlang: wget 地址:http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
安装erlang。rpm -ivh 包名
安装socat包,yum -y install socat
下载并安装rabbitmq。地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el6.noarch.rpm
安装rabbitmq。rpm -ivh 包名
设置环境变量,并指向rabbitmq的bin目录,可以不需要进入程序目录,并直接运行程序。
vi ~/.user_profile ,添加 export PATH=$PATH:/usr/lib/rabbitmq/bin
启动rabbitmq服务。service rabbitmq-server start
使用教程:https://blog.csdn.net/qq_22075041/article/details/78855708
RabbitMQ简单配置:
查看用户:
默认只有guest用户,一般供本地测试用。把guest从配置文件中的红色列表中删除[{rabbit, [{loopback_users, []}]}]可改变guest为外部用户。
查看当前rabbitmq用户:
rabbitmqctl list_users
添加用户:
rabbitmqctl add_user david david1
添加用户标识:
rabbitmqctl set_user_tags david administrator
添加用户权限:
rabbitmqctl set_permissions -p / david ".*" ".*" ".*"
/ 是默认的vhost名,权限<conf> <write> <read>
用正则表达式来匹配特定的资源,如'^(amq\.gen.*|amq\.default)$'可以匹配server生成的和默认的exchange,'^$'不匹配任何资源
添加一个新的vhost
rabbitmqctl add_vhost host1
外部连接都是需要指定一个vhost,所以要访问需要按上面步骤,给一个用户配置权限
重启rabbitmq服务
service rabbitmq-server restart
查看服务状态
service rabbitmq-server status
python中RabbitMQ相关的方法。
对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序来表示:1,2,3,4 等等。
方法或属性 参数 作用 示例 pika.PlianCredentials(name,pw) name:rabbitmq的用户名
pw:rabbitmq的密码
创建rabbitmq登陆凭证 credential=pika.PlianCredentials('david','123456') pika.ConnectionParameters(host,port,virtual_host,credential) host:rabbitmq服务器地址
prot:端口,5672
virtual_host:指定虚拟host
credential:登陆rabbitmq凭证
连接到rabbitmq时的参数设置
如果是本地的rabbitmq用下面语句:
pika.ConnectionParameters('localhost')
parameter=pika.ConnectionParameters('192.168.0.4',5672,'/',credential)
rabbitmq是本地只写地址:
parameter=pika.ConnnectionParameters('localhost')
pika.BlockingConnection(parameters) parameter: 阻塞式连接到rabbitmq connect=pika.BlockingConnection(parameter) pika.channel() 建立通道 channel = connect.channel() delivery_tag:该消息的index
requeue:默认True
更改为False,不把拒收的消息重新放入queue
单条消息拒收 basic_nack() delivery_tag:该消息的index
mutiple:是否批量,默认False
更改为True,一次性ack比delivery_tag小的queue
requeue:默认True
更改为False,不把拒收消息重新放入队列
消息拒收,可以多条 basic_qos() prefetch_size:
prefetch_count:默认0
值越高,处理数据条数越大,适合高性能电脑
all_channels:
改变均衡处理信息的条数。 queue_declare(queue,passive,durable,exclusive,auto_delete,arguments) queue:队列名
passive:
durable:True、开启持久化
exclusive:True、开启生成随机队列名
auto_delete:
argument:
声明一个队列。如果已存在不同属性的队列报错。 普通声明:
queue_declare(queue='test')
带持久化:
queue_declare(queue='test',durable=True)
自动生成queue名:
queue_declare(exclusive=True)
exchange_declare() exchange:名字
exchange_type:默认direct、
可选类型:topic\fanout\direct\
passive:
druable: True、持久化
auto_delete:
internal:
arguments:
声明一个exchange,如果存在不同属性的exchange会报错 默认direct类型:
exchange_declare(exchange='test1')
持久化:
exchange_declare(exchange='test2',durable=True)
queue_delete() queue:队列名
if_unused:
if_empty:
删除已声明的queue queue_delete(queue='test') delete_exchange() exchange:
if_unused:
删除已声明的exchange queue_delete(exchange='test1') method.queue 获取随机生成的queue名字 声明随机queue:
result = channel.queue_declare(exclusive = True)
获取queue name:
q_name = result.method.queue
queue_bind() queue:队列名
exchange:要绑定的exchange
routing_key:要绑定的routing_key
arguments:
把声明的随机queue绑定到指定的exchange或者routing_key上 把上面的随机Q绑定到exchange和使用routing_key过滤:
channel.queue_bind(queue=q_name,exchange='test1',routing_key='a')
basic_publish() exchange:目标exchange
routing_key:目标routing_key
body:消息正文
properties:发送的消息属性
mandatory:默认False
更改为True,服务器没有对应的queue,那么会调用basic.return方法将消息返还给生产者。
immediate:默认False
更改为True,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
把信息发送到rabbitmq channel=basic_publish(exchange='test1',routing_key = 'a',body='hello word!')
basic_publish的properties:
pika.BasicProperties()
content_type:
content_encoding:
headers:
delivery_mode:声明信息持久化
priority:
correlation_id:指定为回调函数参数props中correlation_id
reply_to:回调队列名
expiration:
meddage_id:
timestamp:
type:
user_id:
app_id:
cluster_id:
指定发送的信息属性
1、使信息持久化,需要声明queue持久化和delivery_mode=2信息持久化
2、要实现回调,客户端至少发送带有
reply_to
以及correlation_id
两个属性的信息basic_consume() consumer_callback:回调函数名
queue:接收的队列名
no_ack:自动消毁队列
exclusive:
consumer_tag:
argument:
basic_ack() delivery_tag:默认0
该消息的index
mutiple:是否批量,默认False
更改为True,一次性ack比delivery_tag小的queue
当consume的no_ack属性是False时,通知rabbitmq删除queue 回调函数参数 属性和方法 channel 包含channel的一切属性和方法 method consumer_tag:
delivery_tag:
exchange:
redelivered:
routing_key:
properties basic_publish通过properties传入的参数 body basic_publish发送的消息
每种方式Queue是必须有的,因为只有Queue才能读取数据。
轮询方式接收:publisher端直接send到Routing Key。RECV端直接从queue读取。
fanout:publisher端Send到Exchange。Consumer端声明Queue,绑定到Exchange,从Exchange读取。
direct:publisher端send到Exchange,并进行通过Routing_key匹配关键字。Consumer端声明Queue,绑定到Exchange,读取时写上完整的关键字,只读取关键字匹配内容。
topic:和direct模式一样,不过Routing_key支持模糊匹配。
Python中的RabbitMQ实例:
默认轮询方式:
生产者把生产的消息放入queue,多个消费者依次取出不同的消息。Q1,Q2两个消费者,生产者放入信息D1,D2时,Q1只能接收到D1,Q2只能接收到D2.
# 发送端
import pika
credential = pika.PlainCredentials('host_admin','111111') # 设置连接到Vhost的用户名和密码
connect = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.1.104',
5672, # web管理端口是15672,程序存取消息端口是5672
'host1', # vhost名字。
credential))
channel = connect.channel()
channel.queue_declare(queue = "say", durable = True) # durable消息持久化。
channel.basic_publish(exchange = '',
routing_key = 'say',
body = 'D1',
properties=pika.BasicProperties(delivery_mode=2) # delivery_mode=2 消息持久化,但是阅后即消。
)
# 发送第第二条数据,只是为了更明显的看出来试验结果。
channel.basic_publish(exchange = '',
routing_key = 'say',
body = 'D2',
properties=pika.BasicProperties(delivery_mode=2)
)
channel.close()
# 接收端
credential = pika.PlainCredentials('host_admin','111111')
connect = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.104',
5672,
'host1',
credential))
def callback(ch,method,properties,body):
print(method.queue)
channel = connect.channel()
channel.queue_declare(queue = "say",durable = True)
channel.basic_comsume(callback,
queue='say',
no_ack=False)
channel.start_consume()
fanout 广播方式
忽略routing_key,客户端接收时,只需绑定exchange就可以,所有客户端同时接到信息。
import pika
# 发送端
connect = pika.BlockingConnections(pika.ConnectionParameters('localhost')) # 本地方式连接,不用验证
channel = connect.channel()
channel.exchange_declare(exchange='fan_test',exchange_type='fanout')
channel.basick_publish(exchange='fantest',
routing_key='',
body='this is test!'
)
channel.close()
# 接收端
connect = pika.BlockingConnections(pika.ConnectionParameters('localhost'))
channel = connect.channel()
result=channel.queue_declare(exclusive=True) # 随机生成一个queue名字
q_name=result.method.queue # 获取随机生成的queue名字
channel.queue_bind(queue=q_name, # 把queue名字绑定到exchange
exchange='fantest')
def callback(ch,method,properties,body):
print(body)
channel.basic_consume(callback,
queue=q_name,
no_ack=True)
channel.start_consume()
direct方式
经过routing,只发送给完全匹配routing_key和queue
# 发送端
import pika
connect = pika.BlockingConnections(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.exchange_declare(exchange='ex_test',exchange_type='direct')
channel.basic_publish(exchange='ex_test',
routing_key='R1',
queue='',
body='this is test of the direct to R1!')
channel.basic_publish(exchange='ex_test',
routing_key='R2', # 更改了routing_key,客户端routing_key,R1接收不到
queue='',
body='this is a test of direct to R2!')
channel.close()
# 接收端
connect = pika.BlockingConnection(pika.Connectionparameters('localhost'))
channel = connect.channel()
result = channel.queue_declare(exclusive = True)
q_name = result.method.queue
channel.queue_bind(queue=q_name,
exchange='ex_test')
def callback(ch,method,properties,body):
print(method.routing_key,body)
channel.basic_consume(callback,
queue=q_name,
routing_key='R1')
channel.start_consume()
topic方式:
可以匹配不同名字的routing_key
# 发送端
import pika
routing_key = ["a.a","b.a","c.b.a","d.c","d.c.b"]
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.exchange_declare(exchange='ex_topic',exchange_type='topic')
for i in routing_key:
message = "routingkey:{}".format(i)
channel.basic_publish(exchange="ex_topic",
routing_key = i,
body = message)
channel.close()
# 接收端:
import pika
routing = ["*.a","#.a"]
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
def callback(ch,method,propreties,body):
print(method.queue,body)
channel.exchange_declare(exchange="ex_topic",exchange_type="topic")
result = channel.queue_declare(exclusive= True)
q_name = result.method.queue
for i in routing:
channel.queue_bind(exchange="ex_topic",routing_key=i,queue=q_name)
channel.basic_consume(callcack,
queue=q_name,
no_ack=True)
channel.start_consuming()
参考文章:https://www.jianshu.com/p/79ca08116d57
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341