我的编程空间,编程开发者的网络收藏夹
学习永远不晚

使用celery怎么动态设置定时任务

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

使用celery怎么动态设置定时任务

使用celery怎么动态设置定时任务?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

celery的beat运行过程。

使用celery怎么动态设置定时任务

上图是beat的主要组成结构,beat中包含了一个service对象,service中包含了一个scheduler对象,scheduler中包含了一个schedule字典,schedule中key对应的的value才是真正的定时任务,是整个beat中最小的单元。

首先分别介绍一下各个对象和它们运行的过程,beat是celery.apps.beat.Beat类创建的对象,调用beat.run()方法就可以启动beat,下面是beat.run()方法的源码。

def run(self): print(str(self.colored.cyan( 'celery beat v{0} is starting.'.format(VERSION_BANNER)))) self.init_loader() self.set_process_title() self.start_scheduler()

重点是在run()方法里调用了start_scheduler()方法,而start_scheduler()方法本质上是创建了一个service对象(celery.beat.Service类),并调用service.start()方法,下面是beat.start_scheduler()方法的源码。

def start_scheduler(self): if self.pidfile: platforms.create_pidlock(self.pidfile) service = self.Service( app=self.app, max_interval=self.max_interval, scheduler_cls=self.scheduler_cls, schedule_filename=self.schedule, )  print(self.banner(service))  self.setup_logging() if self.socket_timeout: logger.debug('Setting default socket timeout to %r', self.socket_timeout) socket.setdefaulttimeout(self.socket_timeout) try: self.install_sync_handler(service) service.start() except Exception as exc: logger.critical('beat raised exception %s: %r', exc.__class__, exc, exc_info=True) raise

调用了service.start()之后,会进入一个死循环,先使用self.scheduler.tick()获取下一个任务a的定时点到现在时间的间隔,然后进入睡眠,睡眠结束之后判断如果self.scheduler里的下一个任务a可以执行,就立即执行,并获取self.scheduler里的下下一个任务b的定时点到现在时间的间隔,进入下一次循环。下面是service.start()的源码。

def start(self, embedded_process=False): info('beat: Starting...') debug('beat: Ticking with max interval->%s', humanize_seconds(self.scheduler.max_interval))  signals.beat_init.send(sender=self) if embedded_process: signals.beat_embedded_init.send(sender=self) platforms.set_process_title('celery beat')  try: while not self._is_shutdown.is_set(): interval = self.scheduler.tick() if interval and interval > 0.0: debug('beat: Waking up %s.', humanize_seconds(interval, prefix='in ')) time.sleep(interval) if self.scheduler.should_sync(): self.scheduler._do_sync() except (KeyboardInterrupt, SystemExit): self._is_shutdown.set() finally: self.sync()

service.scheduler默认是celery.beat.PersistentScheduler类的实例对象,而celery.beat.PersistentScheduler其实是celery.beat.Scheduler的子类,所以scheduler.schedule是celery.beat.Scheduler类中的字典,保存的是celery.beat.ScheduleEntry类型的对象。ScheduleEntry的实例对象保存了定时任务的名称、参数、定时信息、过期时间等信息。celery.beat.Scheduler类实现了对schedule的更新方法即update_from_dict(self, dict_)方法。下面是update_from_dict(self, dict_)方法的源码。

def _maybe_entry(self, name, entry): if isinstance(entry, self.Entry): entry.app = self.app return entry return self.Entry(**dict(entry, name=name, app=self.app)) def update_from_dict(self, dict_): self.schedule.update({ name: self._maybe_entry(name, entry) for name, entry in items(dict_) })

可以看到update_from_dict(self, dict_)方法实际上是向schedule中更新了self.Entry的实例对象,而self.Entry从celery.beat.Scheduler的源码知道是celery.beat.ScheduleEntry。

到这里整个流程就粗略的介绍完了,基本过程是这个样子。

使用celery怎么动态设置定时任务

但是从前面start_scheduler()的源码可以看到,beat在内部创建一个service之后,就直接进入死循环了,所以从外面无法拿到service对象,就不能对service里的scheduler对象操作,就不能对scheduler的schedule字典操作,所以就无法在beat运行的过程中动态添加定时任务。

方法介绍

前面介绍完原理,现在来讲一下解决思路。主要思路就是让start_scheduler方法中创建的service暴露出来。所以就想到手写一个类去继承Beat,重写start_scheduler()方法。

import socketfrom celery import platformsfrom celery.apps.beat import Beat  class MyBeat(Beat): ''' 继承Beat 添加一个获取service的方法 ''' def start_scheduler(self): if self.pidfile:  platforms.create_pidlock(self.pidfile) # 修改了获取service的方式 service = self.get_service()  print(self.banner(service))  self.setup_logging() if self.socket_timeout:  logger.debug('Setting default socket timeout to %r',    self.socket_timeout)  socket.setdefaulttimeout(self.socket_timeout) try:  self.install_sync_handler(service)  service.start() except Exception as exc:  logger.critical('beat raised exception %s: %r',    exc.__class__, exc,    exc_info=True)  raise  def get_service(self): ''' 这个是自定义的 目的是为了把service暴露出来,方便对service的scheduler操作,因为定时任务信息都存放在service.scheduler里 :return: ''' service = getattr(self, "service", None) if service is None:  service = self.Service(  app=self.app,  max_interval=self.max_interval,  scheduler_cls=self.scheduler_cls,  schedule_filename=self.schedule,  )  setattr(self, "service", service) return self.service

在MyBeat类中添加一个get_service()方法,如果beat没有servic对象就创建一个,如果有就直接返回,方便对service的scheduler操作。

然后在此基础上实现对定时任务的增删改查操作。

def add_cron_task(task_name: str, cron_task: str, minute='*', hour='*', day_of_week='*', day_of_month='*',   month_of_year='*', **kwargs): ''' 创建或更新定时任务 :param task_name: 定时任务名称 :param cron_task: task名称 :param minute: 以下是时间 :param hour: :param day_of_week: :param day_of_month: :param month_of_year: :param kwargs: :return: ''' service = beat.get_service() scheduler = service.scheduler entries = dict() entries[task_name] = { 'task': cron_task, 'schedule': crontab(minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month,    month_of_year=month_of_year, **kwargs), 'options': {'expires': 3600}} scheduler.update_from_dict(entries)  def del_cron_task(task_name: str): ''' 删除定时任务 :param task_name: :return: ''' service = beat.get_service() scheduler = service.scheduler if scheduler.schedule.get(task_name, None) is not None: del scheduler.schedule[task_name]  def get_cron_task(): ''' 获取当前所有定时任务的配置 :return: ''' service = beat.get_service() scheduler = service.scheduler ret = [{k: {"task": v.task, "crontab": v.schedule}} for k, v in scheduler.schedule.items()] return ret

但是仅仅是这样还不能解决问题,从前面的serive.start()的源码看到,beat启动后会进入一个死循环,如果直接在主线程启动beat,必然会阻塞在死循环中,所以需要为beat创建一个子线程,这样才影响主线程的其他操作。

flag = False beat = MyBeat(max_interval=10, app=celery_app, socket_timeout=30, pidfile=None, no_color=None,  loglevel='INFO', logfile=None, schedule=None, scheduler='celery.beat.PersistentScheduler',  scheduler_cls=None, # XXX use scheduler  redirect_stdouts=None,  redirect_stdouts_level=None)  # 设置主动启动beat是为了避免使用celery -A celery_demo worker 命令重复启动workerdef run(): ''' 启动Beat :return: ''' beat.run()  def new_thread(): ''' 创建一个线程启动Beat 最多只能创建一个 :return: ''' global flag if not flag: t = threading.Thread(target=run, daemon=True) t.start() # 启动成功2s后才能操作定时任务 否则可能会报错 time.sleep(2) flag = True

可能看到上面的代码有人会想,为什么不在主程序加载完成就启动为beat创建一个子线程,还非要写个函数等待主动调用?这是因为例如在使用django+celery组合时,一般启动django和启动celery woker是两个独立的进程,如果让django在加载代码的时候自动启动beat的子线程,那么在使用celery -A demo_name worker 启动celery时,会重新加载一边django的代码,因为celery需要扫描每个app下的tasks.py文件,加载异步任务函数,这时启动celery woker就会也启动一个beat子线程,可能会造成定时任务重复执行的情况。所以在这里设置成主动开启beat子线程,目的就是为了celery worker启动不重复创建beat线程。

完整的代码如下:

import socketimport timeimport threadingfrom celery import platformsfrom celery.schedules import crontabfrom celery.apps.beat import Beatfrom celery.utils.log import get_loggerfrom celery_demo import celery_app logger = get_logger('celery.beat')flag = False  class MyBeat(Beat): ''' 继承Beat 添加一个获取service的方法 ''' def start_scheduler(self): if self.pidfile:  platforms.create_pidlock(self.pidfile) # 修改了获取service的方式 service = self.get_service()  print(self.banner(service))  self.setup_logging() if self.socket_timeout:  logger.debug('Setting default socket timeout to %r',    self.socket_timeout)  socket.setdefaulttimeout(self.socket_timeout) try:  self.install_sync_handler(service)  service.start() except Exception as exc:  logger.critical('beat raised exception %s: %r',    exc.__class__, exc,    exc_info=True)  raise  def get_service(self): ''' 这个是自定义的 目的是为了把service暴露出来,方便对service的scheduler操作,因为定时任务信息都存放在service.scheduler里 :return: ''' service = getattr(self, "service", None) if service is None:  service = self.Service(  app=self.app,  max_interval=self.max_interval,  scheduler_cls=self.scheduler_cls,  schedule_filename=self.schedule,  )  setattr(self, "service", service) return self.service  beat = MyBeat(max_interval=10, app=celery_app, socket_timeout=30, pidfile=None, no_color=None,  loglevel='INFO', logfile=None, schedule=None, scheduler='celery.beat.PersistentScheduler',  scheduler_cls=None, # XXX use scheduler  redirect_stdouts=None,  redirect_stdouts_level=None)  # 设置主动启动beat是为了避免使用celery -A celery_demo worker 命令重复启动workerdef run(): ''' 启动Beat :return: ''' beat.run()  def new_thread(): ''' 创建一个线程启动Beat 最多只能创建一个 :return: ''' global flag if not flag: t = threading.Thread(target=run, daemon=True) t.start() # 启动成功2s后才能操作定时任务 否则可能会报错 time.sleep(2) flag = True  def add_cron_task(task_name: str, cron_task: str, minute='*', hour='*', day_of_week='*', day_of_month='*',   month_of_year='*', **kwargs): ''' 创建或更新定时任务 :param task_name: 定时任务名称 :param cron_task: task名称 :param minute: 以下是时间 :param hour: :param day_of_week: :param day_of_month: :param month_of_year: :param kwargs: :return: ''' service = beat.get_service() scheduler = service.scheduler entries = dict() entries[task_name] = { 'task': cron_task, 'schedule': crontab(minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month,    month_of_year=month_of_year, **kwargs), 'options': {'expires': 3600}} scheduler.update_from_dict(entries)  def del_cron_task(task_name: str): ''' 删除定时任务 :param task_name: :return: ''' service = beat.get_service() scheduler = service.scheduler if scheduler.schedule.get(task_name, None) is not None: del scheduler.schedule[task_name]  def get_cron_task(): ''' 获取当前所有定时任务的配置 :return: ''' service = beat.get_service() scheduler = service.scheduler ret = [{k: {"task": v.task, "crontab": v.schedule}} for k, v in scheduler.schedule.items()] return ret

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注编程网行业资讯频道,感谢您对编程网的支持。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

使用celery怎么动态设置定时任务

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

使用celery怎么动态设置定时任务

使用celery怎么动态设置定时任务?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。celery的beat运行过程。上图是beat的主要组成结构,beat中包含了
2023-06-08

如何使用Python Celery动态添加定时任务

本篇内容介绍了“如何使用Python Celery动态添加定时任务”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!一、背景实际工作中会有一些耗
2023-07-06

spring怎么动态设置定时任务

在Spring中,可以使用`TaskScheduler`来动态设置定时任务。首先,在Spring配置文件中配置`TaskScheduler`:```xml```然后,在需要动态设置定时任务的类中注入`TaskScheduler`:```ja
2023-08-30

SpringBoot如何设置动态定时任务

这篇文章主要介绍了SpringBoot如何设置动态定时任务的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot如何设置动态定时任务文章都会有所收获,下面我们一起来看看吧。之前写过文章记录怎么在Sp
2023-07-02

怎么用django-celery-beat搭建定时任务

本篇内容主要讲解“怎么用django-celery-beat搭建定时任务”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么用django-celery-beat搭建定时任务”吧!一、创建djan
2023-07-05

spring动态设置定时任务的方法是什么

在Spring中,可以使用`TaskScheduler`接口和`@Scheduled`注解来动态设置定时任务。1. 使用`TaskScheduler`接口:```java@Autowiredprivate TaskScheduler tas
2023-08-25

django怎么动态添加定时任务

在Django中,可以使用celery来实现动态添加定时任务。首先,需要安装Celery:```shellpip install celery```然后,在Django的settings.py文件中配置Celery:```python# s
2023-09-26

linux怎么设置定时任务

在Linux系统中,可以使用crontab命令来设置定时任务。1. 使用以下命令打开crontab编辑器:crontab -e2. 在编辑器中,每一行代表一个定时任务。每行的格式如下:* * * * * command其中,星号(*)代表所
2023-10-18

php定时任务怎么设置

在PHP中设置定时任务通常使用cron job来实现。Cron job是一个在Unix-like操作系统上用来定时执行任务的工具。以下是如何在Linux系统上设置一个PHP定时任务的步骤:打开终端并输入以下命令来编辑cron job表:cr
php定时任务怎么设置
2024-03-04

linux定时任务怎么设置

在Linux中,可以使用cron命令来设置定时任务。使用crontab -e命令来编辑定时任务:$ crontab -e编辑器会打开crontab文件,每行代表一个定时任务。每行的格式如下:* * * * * command其中*代表
2023-10-23

oracle怎么设置定时任务

oracle 定时任务设置指南oracle 数据库通过创建作业(jobs)设置定时任务,执行指定任务:创建作业: create job with procedure schedule every [day | hour | minut
oracle怎么设置定时任务
2024-04-08

Python3定时任务怎么设置

在Python中,可以使用`schedule`模块来设置定时任务。以下是一个示例代码,演示如何每隔一分钟打印一次当前时间:```pythonimport scheduleimport timedef job():print("Current
2023-08-17

celery异步定时任务怎么实现订单定时回滚

这篇文章主要介绍“celery异步定时任务怎么实现订单定时回滚”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“celery异步定时任务怎么实现订单定时回滚”文章能帮助大家解决问题。订单回滚用celer
2023-06-30

java定时任务时间怎么设置

在Java中,可以使用`java.util.Timer`和`java.util.TimerTask`类来设置定时任务的时间。首先,创建一个继承自`TimerTask`的子类,重写`run()`方法,定义定时执行的任务。然后,创建一个`Tim
2023-08-11

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录