python—Celery异步分布式
一、Celery异步分布式
Celery 是一个python开发的异步分布式任务调度模块,是一个消息传输的中间件,可以理解为一个邮箱,每当应用程序调用celery的异步任务时,会向broker传递消息,然后celery的worker从中取消息
Celery 用于存储消息以及celery执行的一些消息和结果
对于brokers,官方推荐是rabbitmq和redis
对于backend,也就是指数据库,为了简单一般使用redis
使用redis连接url格式:
redis://:password@hostname:port/db_number
1)定义连接脚本tasks.py
#!/usr/bin/env python
from celery import Celery
broker = "redis://192.168.2.230:6379/1"
backend = "redis://192.168.2.230:6379/2"
app = Celery("tasks", broker=broker, backend=backend)
@app.task
def add(x,y):
return x+y
2)安装启动celery
pip install celery
pip install redis
启动方式:celery -A huang tasks -l info #-l 等同于 --loglevel
3)执行测试 huang.py
#!/usr/bin/env python
from tasks import add
re = add.delay(10,20)
print(re.result) #任务返回值
print(re.ready) #如果任务被执行返回True,其他情况返回False
print(re.get(timeout=1)) #带参数的等待,最后返回结果
print(re.status) #任务当前状态
运行结果:
30
<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>
30
SUCCESS
4)根据成功返回的key或celery界面输出的信息,查看redis存储
说明:停止celery服务,执行完huang.py之后,再启动celery服务也是有保存数据的
二、celery多进程
1)配置文件 celeryconfig.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from kombu import Exchange,Queue
BROKER_URL = "redis://192.168.2.230:6379/3"
CELERY_RESULT_BACKEND = "redis://192.168.2.230:6379/4"
CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)
CELERY_ROUTES = {
'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}
}
2)tasks.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from celery import Celery
app = Celery()
app.config_from_object("celeryconfig")
@app.task
def taskA(x,y):
return x+y
@app.task
def taskB(x,y,z):
return x+y+z
3)启动celery
celery -A tasks worker --loglevel info
4)执行脚本huang2.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from tasks import taskA,taskB
re = taskA.delay(10,20)
print(re.result) #任务返回值
print(re.ready) #如果任务被执行返回True,其他情况返回False
print(re.get(timeout=1)) #带参数的等待,最后返回结果
print(re.status) #任务当前状态
re2 = taskB.delay(10,20,30)
print(re2.result)
print(re2.ready)
print(re2.get(timeout=1))
print(re2.status)
5)运行结果
None
<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>
30
SUCCESS
None
<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>
60
SUCCESS
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341