我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Python定时库APScheduler的原理及用法

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Python定时库APScheduler的原理及用法

这篇文章主要讲解了“Python定时库APScheduler的原理及用法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Python定时库APScheduler的原理及用法”吧!

Python学习教程之定时库APScheduler的原理及用法:

APScheduler简介

APscheduler全称Advanced Python Scheduler

作用为在指定的时间规则执行指定的作业。

  • 指定时间规则的方式可以是间隔多久执行,可以是指定日期时间的执行,也可以类似Linux系统中Crontab中的方式执行任务。

  • 指定的任务就是一个Python函数。

APScheduler组件

APScheduler版本 3.6.3

APScheduler中几个重要的概念

Job 作业

作用

Job作为APScheduler最小执行单位。创建Job时指定执行的函数,函数中所需参数,Job执行时的一些设置信息。

构建说明

id:指定作业的唯一IDname:指定作业的名字trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的时间, 满足时将会执行executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此job的 执行器,执行job指定的函数max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数来确定是否可执行next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触发时间misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此jobcoalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行func:Job执行的函数args:Job执行函数需要的位置参数kwargs:Job执行函数需要的关键字参数

Trigger 触发器

Trigger绑定到Job,在scheduler调度筛选Job时,根据触发器的规则计算出Job的触发时间,然后与当前时间比较确定此Job是否会被执行,总之就是根据trigger规则计算出下一个执行时间。Trigger有多种种类,指定时间的DateTrigger,指定间隔时间的IntervalTrigger,像Linux的crontab一样的CronTrigger

目前APScheduler支持触发器:

DateTriggerIntervalTriggerCronTrigger

Executor 执行器

Executor在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor。 每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor找到实际的执行器对象,然后根据执行器对象执行JobExecutor的种类会根据不同的调度来选择,如果选择AsyncIO作为调度的库,那么选择AsyncIOExecutor,如果选择tornado作为调度的库,选择TornadoExecutor,如果选择启动进程作为调度,选择ThreadPoolExecutor或者ProcessPoolExecutor都可以Executor的选择需要根据实际的scheduler来选择不同的执行器

目前APScheduler支持的Executor:

AsyncIOExecutorGeventExecutorThreadPoolExecutorProcessPoolExecutorTornadoExecutorTwistedExecutor

Jobstore 作业存储

Jobstore在scheduler中初始化,另外也可通过scheduler的add_jobstore动态添加Jobstore。每个jobstore都会绑定一个alias,scheduler在Add Job时,根据指定的jobstore在scheduler中找到相应的jobstore,并将job添加到jobstore中。 Jobstore主要是通过pickle库的loads和dumps【实现核心是通过python的__getstate__和__setstate__重写实现】,每次变更时将Job动态保存到存储中,使用时再动态的加载出来,作为存储的可以是redis,也可以是数据库【通过sqlarchemy这个库集成多种数据库】,也可以是mongodb

目前APScheduler支持的Jobstore:

MemoryJobStoreMongoDBJobStoreRedisJobStoreRethinkDBJobStoreSQLAlchemyJobStoreZooKeeperJobStore

Event 事件

Event是APScheduler在进行某些操作时触发相应的事件,用户可以自定义一些函数来监听这些事件,当触发某些Event时,做一些具体的操作常见的比如。Job执行异常事件 EVENT_JOB_ERROR。Job执行时间错过事件 EVENT_JOB_MISSED。

目前APScheduler定义的Event

EVENT_SCHEDULER_STARTEDEVENT_SCHEDULER_STARTEVENT_SCHEDULER_SHUTDOWNEVENT_SCHEDULER_PAUSEDEVENT_SCHEDULER_RESUMEDEVENT_EXECUTOR_ADDEDEVENT_EXECUTOR_REMOVEDEVENT_JOBSTORE_ADDEDEVENT_JOBSTORE_REMOVEDEVENT_ALL_JOBS_REMOVEDEVENT_JOB_ADDEDEVENT_JOB_REMOVEDEVENT_JOB_MODIFIEDEVENT_JOB_EXECUTEDEVENT_JOB_ERROREVENT_JOB_MISSEDEVENT_JOB_SUBMITTEDEVENT_JOB_MAX_INSTANCES

Listener 监听事件

Listener表示用户自定义监听的一些Event,当Job触发了EVENT_JOB_MISSED事件时可以根据需求做一些其他处理。

Scheduler 调度器

Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。scheduler可根据自身的需求选择不同的组件,如果是使用AsyncIO则选择AsyncIOScheduler,使用tornado则选择TornadoScheduler。

目前APScheduler支持的Scheduler:

AsyncIOSchedulerBackgroundSchedulerBlockingSchedulerGeventSchedulerQtSchedulerTornadoSchedulerTwistedScheduler

这里前面一期的Python学习教程里有提到过!

Scheduler工作流程图

这里重点挑选两个重要的流程画一个简陋的流程图,来看一下scheduler的工作原理。其一个是添加add job,另一是scheduler每次唤醒调度时的执行过程

Scheduler添加job流程

Python定时库APScheduler的原理及用法

Scheduler调度流程

Python定时库APScheduler的原理及用法

APScheduler使用示例

AsyncIO调度示例

import asyncioimport datetimefrom apscheduler.events import EVENT_JOB_EXECUTEDfrom apscheduler.executors.asyncio import AsyncIOExecutorfrom apscheduler.jobstores.redis import RedisJobStore # 需要安装redisfrom apscheduler.schedulers.asyncio import AsyncIOSchedulerfrom apscheduler.triggers.interval import IntervalTriggerfrom apscheduler.triggers.cron import CronTrigger# 定义jobstore 使用redis 存储job信息default_redis_jobstore = RedisJobStore( db=2, jobs_key="apschedulers.default_jobs", run_times_key="apschedulers.default_run_times", host="127.0.0.1", port=6379, password="test")# 定义executor 使用asyncio是的调度执行规则first_executor = AsyncIOExecutor()# 初始化scheduler时,可以直接指定jobstore和executorinit_scheduler_options = { "jobstores": { # first 为 jobstore的名字,在创建Job时直接直接此名字即可 "default": default_redis_jobstore }, "executors": { # first 为 executor 的名字,在创建Job时直接直接此名字,执行时则会使用此executor执行 "first": first_executor }, # 创建job时的默认参数 "job_defaults": { 'coalesce': False, # 是否合并执行 'max_instances': 1 # 最大实例数 }}# 创建schedulerscheduler = AsyncIOScheduler(**init_scheduler_options)# 启动调度scheduler.start()second_redis_jobstore = RedisJobStore( db=2, jobs_key="apschedulers.second_jobs", run_times_key="apschedulers.second_run_times", host="127.0.0.1", port=6379, password="test")scheduler.add_jobstore(second_redis_jobstore, 'second')# 定义executor 使用asyncio是的调度执行规则second_executor = AsyncIOExecutor()scheduler.add_executor(second_executor, "second")# *********** 关于 APScheduler中有关Event相关使用示例 *************# 定义函数监听事件def job_execute(event): """ 监听事件处理 :param event: :return: """ print( "job执行job:\ncode => {}\njob.id => {}\njobstore=>{}".format( event.code, event.job_id, event.jobstore ))# 给EVENT_JOB_EXECUTED[执行完成job事件]添加回调,这里就是每次Job执行完成了我们就输出一些信息scheduler.add_listener(job_execute, EVENT_JOB_EXECUTED)# *********** 关于 APScheduler中有关Job使用示例 *************# 使用的是asyncio,所以job执行的函数可以是一个协程,也可以是一个普通函数,AsyncIOExecutor会根据配置的函数来进行调度,# 如果是协程则会直接丢入到loop中,如果是普通函数则会启用线程处理# 我们定义两个函数来看看执行的结果def interval_func(message): print("现在时间: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) print("我是普通函数") print(message)async def async_func(message): print("现在时间: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) print("我是协程") print(message)# 将上述的两个函数按照不同的方式创造触发器来执行# *********** 关于 APScheduler中有关Trigger使用示例 *************# 使用Trigger有两种方式,一种是用类创建使用,另一个是使用字符串的方式# 使用字符串指定别名, scheduler初始化时已将其定义的trigger加载,所以指定字符串可以直接使用if scheduler.get_job("interval_func_test", "default"): # 存在的话,先删除 scheduler.remove_job("interval_func_test", "default")# 立马开始 2分钟后结束, 每10s执行一次 存储到first jobstore second执行scheduler.add_job(interval_func, "interval", args=["我是10s执行一次,存放在jobstore default, executor default"], seconds=10, id="interval_func_test", jobstore="default", executor="default", start_date=datetime.datetime.now(), end_date=datetime.datetime.now() + datetime.timedelta(seconds=240))# 先创建tiggertrigger = IntervalTrigger(seconds=5)if scheduler.get_job("interval_func_test_2", "second"): # 存在的话,先删除 scheduler.remove_job("interval_func_test_2", "second")# 每隔5s执行一次scheduler.add_job(async_func, trigger, args=["我是每隔5s执行一次,存放在jobstore second, executor = second"], id="interval_func_test_2", jobstore="second", executor="second")# 使用协程的函数执行,且使用cron的方式配置触发器if scheduler.get_job("cron_func_test", "default"): # 存在的话,先删除 scheduler.remove_job("cron_func_test", "default")# 立马开始 每10s执行一次scheduler.add_job(async_func, "cron", args=["我是 每分钟 30s 时执行一次,存放在jobstore default, executor default"], second='30', id="cron_func_test", jobstore="default", executor="default")# 先创建tiggertrigger = CronTrigger(second='20,40')if scheduler.get_job("cron_func_test_2", "second"): # 存在的话,先删除 scheduler.remove_job("cron_func_test_2", "second")# 每隔5s执行一次scheduler.add_job(async_func, trigger, args=["我是每分钟 20s 40s时各执行一次,存放在jobstore second, executor = second"], id="cron_func_test_2", jobstore="second", executor="second")# 使用创建trigger对象直接创建print("启动: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))asyncio.get_event_loop().run_forever()

输出结果部分截取

启动之后,每隔5s运行一次的JOB

启动: 2019-12-05 14:13:11【这部分是定义的协程函数输出的内容】现在时间: 2019-12-05 14:13:16 我是协程我是每隔5s执行一次,存放在jobstore second, executor = second【这部分是监听job执行完成之后的回调输出】job执行job:code => 4096job.id => interval_func_test_2jobstore=>second

在20s和40s时各执行一次的Job

现在时间: 2019-12-05 14:13:20我是协程我是每分钟 20s 40s时各执行一次,存放在jobstore second, executor = secondjob执行job:code => 4096job.id => cron_func_test_2jobstore=>second

每隔10s执行一次的job

现在时间: 2019-12-05 14:13:21我是普通函数我是10s执行一次,存放在jobstore default, executor default现在时间: 2019-12-05 14:13:21我是协程我是每隔5s执行一次,存放在jobstore second, executor = secondjob执行job:code => 4096job.id => interval_func_testjobstore=>defaultjob执行job:code => 4096job.id => interval_func_test_2jobstore=>second

每隔5s执行一次的Job

现在时间: 2019-12-05 14:13:26我是协程我是每隔5s执行一次,存放在jobstore second, executor = secondjob执行job:code => 4096job.id => interval_func_test_2jobstore=>second

每分钟30s时执行一次

现在时间: 2019-12-05 14:13:30我是协程我是 每分钟 30s 时执行一次,存放在jobstore default, executor defaultjob执行job:code => 4096job.id => cron_func_testjobstore=>default

感谢各位的阅读,以上就是“Python定时库APScheduler的原理及用法”的内容了,经过本文的学习后,相信大家对Python定时库APScheduler的原理及用法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Python定时库APScheduler的原理及用法

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Python定时库APScheduler的原理及用法

这篇文章主要讲解了“Python定时库APScheduler的原理及用法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Python定时库APScheduler的原理及用法”吧!Python学
2023-06-02

Python定时库Apscheduler怎么用

小编给大家分享一下Python定时库Apscheduler怎么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧! 在Python中需要执行定时任务,可以使用Apscheduler。 Apsched
2023-06-25

Python metaclass的原理及应用

这篇文章主要介绍“Python metaclass的原理及应用”,在日常操作中,相信很多人在Python metaclass的原理及应用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Python metacl
2023-06-02

golang定时器Timer的用法和实现原理解析

这篇文章主要介绍了golang定时器Ticker,本文主要来看一下Timer的用法和实现原理,需要的朋友可以参考以下内容
2023-05-15

Python max函数中key的用法及原理解析

目录一、背景二、原理三、用法四、实例一、背景 起源于一个问题:怎样找到字符串中出现次数最多的字符 其实使用max函数就能很轻松的解决这个问题: 代码:str1 = "AAAaaa8888899sssss" print(max(str1, k
2022-06-02

LRUCache的实现原理及利用python实现的方法

简介 LRU(Least Recently Used)最近最少使用,最近有时间和空间最近的歧义,所以我更喜欢叫它近期最少使用算法。它的核心思想是,如果一个数据被访问过,我们有理由相信它在将来被访问的概率就越高。于是当LRU缓存达到设定的最大
2022-06-04

golang定时器Timer的用法和实现原理是什么

本篇内容介绍了“golang定时器Timer的用法和实现原理是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!TimerTimer是一种单
2023-07-06

python定时任务sched库用法简单实例

sched可用于定时任务,唯一需要注意的就是,这些任务在一个线程中运行,如果前面的任务耗时过长,则后面的任务将顺延执行,下面这篇文章主要给大家介绍了关于python定时任务sched库用法的相关资料,需要的朋友可以参考下
2023-01-11

python定时任务schedule库用法详细讲解

python中有一个轻量级的定时任务调度的库schedule,下面这篇文章主要给大家介绍了关于python定时任务schedule库用法的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
2023-01-11

python定时任务timeloop库用法实例详解

有些时候我们需要每隔一段时间就要执行一段程序,或者是往复循环执行某一个任务,下面这篇文章主要给大家介绍了关于python定时任务timeloop库用法的相关资料,需要的朋友可以参考下
2023-01-11

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录