详解Python利用APScheduler框架实现定时任务
背景
最近在做一些python工具的时候,常常会碰到定时器问题,总觉着使用threading.timer或者schedule模块非常不优雅。所以这里给自己做个记录,也分享一个定时任务框架APScheduler。具体的架构原理就不细说了,用个例子说明一下怎么简易的使用。
样例代码
先上样例代码,如下:
#!/user/bin/env python
# coding=utf-8
"""
@project : csdn
@author : 剑客阿良_ALiang
@file : apschedule_tool.py
@ide : PyCharm
@time : 2022-03-02 17:34:17
"""
from apscheduler.schedulers.background import BackgroundScheduler
from multiprocessing import Process, Queue
import time
import random
# 具体工作实现
def do_job(q: Queue):
while True:
if not q.empty():
_value = q.get(False)
print('{} poll -> {}'.format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), _value))
else:
break
def put_job(q: Queue):
while True:
_value = str(random.randint(1, 10))
q.put(_value)
print('{} put -> {}'.format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), _value))
time.sleep(1)
if __name__ == '__main__':
q = Queue()
scheduler = BackgroundScheduler()
# 每隔5秒运行一次
scheduler.add_job(do_job, trigger='cron', second='*/5', args=(q,))
scheduler.start()
Process(target=put_job, args=(q,)).start()
代码详解
1、调度器的选择主要取决于编程环境以及 APScheduler 的用途。主要有以下几种调度器:
apscheduler.schedulers.blocking.BlockingScheduler:当调度器是程序中唯一运行的东西时使用,阻塞式。
apscheduler.schedulers.background.BackgroundScheduler:当调度器需要后台运行时使用。
apscheduler.schedulers.asyncio.AsyncIOScheduler:当程序使用 asyncio 框架时使用。
apscheduler.schedulers.gevent.GeventScheduler:当程序使用 gevent 框架时使用。
apscheduler.schedulers.tornado.TornadoScheduler:当构建 Tornado 程序时使用
apscheduler.schedulers.twisted.TwistedScheduler:当构建 Twisted 程序时使用
apscheduler.schedulers.qt.QtScheduler:当构建 Qt 程序时使用
个人觉着BackgroundScheduler已经很够用了,在后台启动定时任务,也不会阻塞进程。
2、trigger后面跟随的类似linux系统下cron写法,样例代码中是每5秒执行一次。
3、这里加了一个多进程通讯的队列multiprocessing.Queue,主要是样例代码解决的场景是我实际工作中常碰到的,举个栗子:多个进程间通讯,其中一个进程需要定时获取另一个进程中的数据。可以参考样例代码。
执行结果
2022-03-02 19:31:27 put -> 4
2022-03-02 19:31:28 put -> 10
2022-03-02 19:31:29 put -> 1
2022-03-02 19:31:30 poll -> 4
2022-03-02 19:31:30 poll -> 10
2022-03-02 19:31:30 poll -> 1
2022-03-02 19:31:30 put -> 2
2022-03-02 19:31:31 put -> 1
2022-03-02 19:31:32 put -> 6
2022-03-02 19:31:33 put -> 4
2022-03-02 19:31:34 put -> 8
2022-03-02 19:31:35 poll -> 2
2022-03-02 19:31:35 poll -> 1
2022-03-02 19:31:35 poll -> 6
2022-03-02 19:31:35 poll -> 4
2022-03-02 19:31:35 poll -> 8
2022-03-02 19:31:35 put -> 8
2022-03-02 19:31:36 put -> 10
2022-03-02 19:31:37 put -> 7
2022-03-02 19:31:38 put -> 2
2022-03-02 19:31:39 put -> 3
2022-03-02 19:31:40 poll -> 8
2022-03-02 19:31:40 poll -> 10
2022-03-02 19:31:40 poll -> 7
2022-03-02 19:31:40 poll -> 2
2022-03-02 19:31:40 poll -> 3
2022-03-02 19:31:40 put -> 5
Process finished with exit code -1
知识点补充
APScheduler(advanceded python scheduler)基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个Python定时任务系统。
它有以下三个特点:
- 类似于 Liunx Cron 的调度程序(可选的开始/结束时间)
- 基于时间间隔的执行调度(周期性调度,可选的开始/结束时间)
- 一次性执行任务(在设定的日期/时间运行一次任务)
APScheduler有四种组成部分:
- 触发器(trigger) 包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。
- 作业存储(job store) 存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。
- 执行器(executor) 处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
- 调度器(scheduler) 是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。通过配置executor、jobstore、trigger,使用线程池(ThreadPoolExecutor默认值20)或进程池(ProcessPoolExecutor 默认值5)并且默认最多3个(max_instances)任务实例同时运行,实现对job的增删改查等调度控制
示例代码:
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
# 输出时间
def job():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# BlockingScheduler
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')
sched.start()
到此这篇关于详解Python利用APScheduler框架实现定时任务的文章就介绍到这了,更多相关Python定时任务内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341