我的编程空间,编程开发者的网络收藏夹
学习永远不晚

APScheduler的使用是怎么样的

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

APScheduler的使用是怎么样的

今天就跟大家聊聊有关APScheduler的使用是怎么样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

  1.简介

  APScheduler 是一款Python开发的定时任务工具, 跨平台运行, 不依赖Linux系统的crontab服务, 在windows上也可以运行

  官方文档的地址是 https://apscheduler.readthedocs.io/en/latest/index.html

  简单介绍

  APScheduler具有四种组件

  触发器(triggers) 指定定时任务的执行的时机

  存储器(job stores) 可以定时持久化存储, 可以保存在数据库中或redis

  # 存储在redis中

  from apscheduler.jobstores.redis import RedisJobStore

  # 存储在mongo中

  from apscheduler.jobstores.mongodb import MongoDBJobStore

  # 存储在数据库中

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  执行器(executors) 在定时任务执行时, 进程或者线程的方式执行任务

  调度器(schedulers)

  # 以后台的方式运行

  from apscheduler.schedulers.background import BackgroundScheduler

  # 以阻塞的方式运行, 前台运行

  from apscheduler.schedulers.background import BlockingScheduler

  对添加的任务可以做持久保存

  2.安装

  pip install apscheduler

  3. 触发器 Trigger

  date在特定的时间日期执行

  from datetime import date

  from apscheduler.schedulers.blocking import BlockingScheduler

  sched = BlockingScheduler()

  def my_job(text):

  print(text)

  # 在2019年11月6日00:00:00执行

  sched.add_job(my_job, 'date', run_date=date(2019, 11, 6))

  # 在2019年11月6日16:30:05, 可以指定运行的详细时间

  sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5))

  # 运行时间也可以是字符串的形式

  sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text])

  # 立即执行

  sched.add_job(my_job, 'date')

  sched.start()

  interval:以固定的时间间隔运行作业时使用

  weeks (int) – 间隔的周数

  days (int) – 间隔的天数

  hours (int) – 间隔的小时

  minutes (int) –间隔的分钟

  seconds (int) – 间隔的秒

  start_date (datetime|str) – 间隔时间的起点

  end_date (datetime|str) – 间隔时间的结束点

  timezone (datetime.tzinfo|str) – 时区

  jitter (int|None) – 将作业执行 延迟的时间

  from datetime import datetime

  # 每两小时执行一次

  sched.add_job(job_function, 'interval', hours=2)

  # 在2018年10月10日09:30:00 到2019年6月15日11:00:00的时间内,每两小时执行一次

  sched.add_job(job_function, 'interval', hours=2, start_date='2018-10-10 09:30:00', end_date='2019-06-15 11:00:00')

  cron:在一天中的特定时间定期运行作业时使用

  常见的参数

  year (int|str) – 4位数的年份

  month (int|str) – month (1-12)

  day (int|str) – day (1-31)

  week (int|str) – ISO week (1-53)

  day_of_week (int|str) –工作日的编号或名称(0-6或周一,周二,周三,周四,周五,周六,周日)

  hour (int|str) – 小时(0-23)

  minute (int|str) – 分钟 (0-59)

  second (int|str) – 秒 (0-59)

  start_date (datetime|str) –最早触发的日期/时间(包括)

  end_date (datetime|str) – 结束触发的日期/时间(包括)

  timezone (datetime.tzinfo|str) – 时区

  jitter (int|None) – 将执行作业延迟几秒执行

  常见的表达式类型

  # 在6、7、8、11、12月的第三个周五的00:00, 01:00, 02:00和03:00 执行

  sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

  # 在2014年5月30日前的周一到周五的5:30执行

  sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

  # 执行的方式 用装饰器的形式, 每个月的最后一个星期日执行

  @sched.scheduled_job('cron', id='my_job_id', day='last sun')

  def some_decorated_task():

  print("I am printed at 00:00:00 on the last Sunday of every month!")

  # 可以使用标准的crontab表达式执行

  sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))

  # 延迟120秒执行

  sched.add_job(job_function, 'cron', hour='*', jitter=120)

  calendarinterval:在一天的特定时间以日历为基础的间隔运行作业时使用

  参数和 interval 中的参数设置相同

  from datetime import datetime

  from apscheduler.schedulers.blocking import BlockingScheduler

  def job_function():

  print("Hello World")

  sched = BlockingScheduler()

  # 每个月的15:36:00 执行这个任务

  sched.add_job(job_function, 'calendarinterval', months=1, hour=15, minute=36)

  # 从今天开始 每两个月的 15点36分执行, 时间范围是 2019-6-16到 2020-3-26

  sched.add_job(job_function, 'calendarinterval', months=2, start_date='2019-06-16',

  end_date='2020-03-16', hour=15, minute=36)

  sched.start()

  4. 储存器

  REDIS_CONF = {

  "password": "xxxxx",

  "host": "192.168.137.120",

  "port": 6379,

  "db": 0}

  from apscheduler.jobstores.redis import RedisJobStore

  from apscheduler.jobstores.mongodb import MongoDBJobStore

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  # 存储器

  job_stores = {

  # 使用redis存储

  'redis': RedisJobStore(jobs_key=jobs_key, run_times_key=run_times_key, **REDIS_CONF),

  # 使用mongo存储

  'mongo': MongoDBJobStore(),

  # 数据库存储

  'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

  }

  # 执行器

  executors = {

  'default': ThreadPoolExecutor(20), # 20个线程

  'processpool': ProcessPoolExecutor(5) # 5个进程

  }

  job_defaults = {

  'coalesce': False, # 相同任务触发多次

  'max_instances': 3 # 每个任务最多同时触发三次

  }

  # 使用配置, 启动

  scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

  5. 执行器 在定时任务该执行时,以进程或线程方式执行任务

  # 线程的方式执行

  from apscheduler.executors.pool import ThreadPoolExecutor

  executors = {

  'default': ThreadPoolExecutor(20) # 最多20个线程同时执行

  }

  scheduler = BackgroundScheduler(executors=executors)

  # 进程的方式

  executors = {

  'default': ProcessPoolExecutor(5) # 最多5个进程同时执行

  }

  6.调度器

  BlockingScheduler: 作为独立进程时使用

  from apscheduler.schedulers.blocking import BlockingScheduler

  scheduler = BlockingScheduler()

  scheduler.start()

  # 此处程序会发生阻塞复制代码

  BackgroundScheduler 后台运行, 在框架中使用

  from apscheduler.schedulers.background import BackgroundScheduler

  scheduler = BackgroundScheduler()

  scheduler.start()

  # 此处程序不会发生阻塞复制代码

  AsyncIOScheduler : 当你的程序使用了asyncio的时候使用。

  GeventScheduler : 当你的程序使用了gevent的时候使用。

  TornadoScheduler : 当你的程序基于Tornado的时候使用。

  TwistedScheduler : 当你的程序使用了Twisted的时候使用

  QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。

  7. 配置的三中方法

  方法1

  from pytz import utc

  from apscheduler.schedulers.background import BackgroundScheduler

  from apscheduler.jobstores.mongodb import MongoDBJobStore

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

  jobstores = {

  'mongo': MongoDBJobStore(),

  'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

  }

  executors = {

  'default': ThreadPoolExecutor(20), # 最大线程数

  'processpool': ProcessPoolExecutor(5) # 最大进程数

  }

  job_defaults = {

  'coalesce': False,

  'max_instances': 3 # 同一个任务启动实例的最大个数

  }

  # 配置的使用方式

  scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)郑州妇科检查哪家好 http://www.zzkdfk.com/

  方法2

  from apscheduler.schedulers.background import BackgroundScheduler

  # 使用字典的形式添加配置

  scheduler = BackgroundScheduler({

  'apscheduler.jobstores.mongo': {

  'type': 'mongodb'

  },

  'apscheduler.jobstores.default': {

  'type': 'sqlalchemy',

  'url': 'sqlite:///jobs.sqlite'

  },

  'apscheduler.executors.default': {

  'class': 'apscheduler.executors.pool:ThreadPoolExecutor',

  'max_workers': '20'

  },

  'apscheduler.executors.processpool': {

  'type': 'processpool',

  'max_workers': '5'

  },

  'apscheduler.job_defaults.coalesce': 'false',

  'apscheduler.job_defaults.max_instances': '3',

  'apscheduler.timezone': 'UTC',

  })

  方法3

  from pytz import utc

  from apscheduler.schedulers.background import BackgroundScheduler

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  from apscheduler.executors.pool import ProcessPoolExecutor

  jobstores = {

  'mongo': {'type': 'mongodb'},

  'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

  }

  executors = {

  'default': {'type': 'threadpool', 'max_workers': 20},

  'processpool': ProcessPoolExecutor(max_workers=5)

  }

  job_defaults = {

  'coalesce': False,

  'max_instances': 3

  }

  scheduler = BackgroundScheduler()

  # 使用调度器对象的 configure属性增加 存储器, 执行器 存储器 的配置

  scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

  8. 定时任务启动

  scheduler.start()

  对于BlockingScheduler ,程序会阻塞在这,防止退出,作为独立进程时使用。

  对于BackgroundScheduler,可以在应用程序中使用。不再以单独的进程使用。

  9. 任务管理

  方式1

  job = scheduler.add_job(myfunc, 'interval', minutes=2) # 添加任务

  job.remove() # 删除任务

  job.pause() # 暂定任务

  job.resume() # 恢复任务

  job.shutdown() # 关闭调度

  job.shutdown(wait=False) # 不等待正在运行的任务

  方式2

  scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 添加任务

  scheduler.remove_job('my_job_id') # 删除任务

  scheduler.pause_job('my_job_id') # 暂定任务

  scheduler.resume_job('my_job_id') # 恢复任务

  修改调度, 修改调度的配置属性

  job.modify(max_instances=6, name='Alternate name')

  # 更改触发器

  scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

  获取作业列表 get_jobs() 方法, 返回的是Job实例列表

  10.日志的使用

  项目中没有使用日志记录,

  import logging

  logging.basicConfig()

  logging.getLogger('apscheduler').setLevel(logging.DEBUG)

  集成到项目中的日志中

  logger = logging.getLogger("django")

  ......

  scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults)

  scheduler._logger = logger

  11.完整的例子

  REDIS_CONF = {

  "password": "xxxxx",

  "host": "192.168.137.120",

  "port": 6379,

  "db": 0}

  logger = logging.getLogger("django")

  jobs_key = 'collection_api_apscheduler.jobs'

  run_times_key = 'collection_api_apscheduler.run_times'

  job_stores = {

  'default': RedisJobStore(jobs_key=jobs_key, run_times_key=run_times_key, **REDIS_CONF)

  }

  executors = {

  'default': {'type': 'threadpool', 'max_workers': 60}

  }

  job_defaults = {

  'coalesce': True, # 相同任务同时触发多次时,只运行一次

  'max_instances': 3,

  'misfire_grace_time': 30, # 过期30秒依然执行该任务

  }

  scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults)

  scheduler._logger = logger

  # 如果持久化的调度器中作业列表, 调度器继续执行

  if scheduler.get_jobs():

  scheduler.resume()

  # 添加定时任务

  scheduler.add_job(handle_news_task, 'date', id='handle_news_task', replace_existing=True)

  scheduler.add_job(......)

  scheduler.start()

看完上述内容,你们对APScheduler的使用是怎么样的有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注编程网行业资讯频道,感谢大家的支持。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

APScheduler的使用是怎么样的

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

APScheduler的使用是怎么样的

今天就跟大家聊聊有关APScheduler的使用是怎么样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。  1.简介  APScheduler 是一款Python开发的定时任务工具,
2023-06-02

Python的函数使用是怎么样的

今天就跟大家聊聊有关Python的函数使用是怎么样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。1 跳出循环-breakpython提供了一种方便快捷的跳出循环的方法-break,
2023-06-22

Git的使用方法是怎么样的

这篇文章将为大家详细讲解有关Git的使用方法是怎么样的,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。在傻瓜式部署方式出现问题之后,我们该通过什么方法搭建网站呢?其实京东云擎本身已经给出了答案
2023-06-10

Sublime Text的使用体验是怎么样的

本篇文章为大家展示了Sublime Text的使用体验是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。说服我使用一种新编辑器是很难的,但鉴于听到Sublime Text获得如此多的赞扬,我最终决
2023-06-17

linux的awk的使用是怎样的

这篇文章将为大家详细讲解有关linux的awk的使用是怎样的,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。awk是一个强大的文本分析工具,相对于grep的查找,sed的编辑,awk在其对数据
2023-06-06

awk中的使用循环是怎么样的

awk中的使用循环是怎么样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。导读awk 脚本有三个主要部分:BEGIN 和 END 函数(都可选),用户自己写的每次要执行的函数。
2023-06-05

Python第三方模块apscheduler安装和使用的方法是什么

这篇“Python第三方模块apscheduler安装和使用的方法是什么”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Pyt
2023-07-05

Elasticsearch的安装使用是怎样的

本篇文章给大家分享的是有关Elasticsearch的安装使用是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。导读ElasticSearch是一个基于Lucene的搜索服
2023-06-04

Python中怎么使用apscheduler定时执行任务

今天小编给大家分享一下Python中怎么使用apscheduler定时执行任务的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
2023-06-29

PHP中select2的使用是怎样的

这期内容当中小编将会给大家带来有关PHP中select2的使用是怎样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。普通的select下拉选框效果单调不好看。有时候客户的需求总是很特别的编写。又要支持下拉
2023-06-04

使用pbdigg的心得感受是怎么样的

使用pbdigg的心得感受是怎么样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。接触PBdIGG到今天已经是第六天了,有很多心得分享给大家,首先我觉得它是一款非常y优秀的ph
2023-06-12

ElasticSearch使用过程是怎样的

ElasticSearch使用过程是怎样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。这里介绍ElasticSearch的必备知识:从入门、索引管理到映射详解
2023-06-15

java中TestNG使用是怎样的

java中TestNG使用是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。一、TestNG介绍TestNG是Java中的一个测试框架, 类似于JUnit 和NUnit,
2023-06-22

RK3399 /RK3288 ADB使用是怎样的

RK3399 /RK3288 ADB使用是怎样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。RK3399 /RK3288 ADB使用Linux安装adb1. 安装ad
2023-06-05

使用APScheduler怎么实现一个定时任务

使用APScheduler怎么实现一个定时任务?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一、安装apschedulerpip install apschedulerpi
2023-06-14

Docker容器使用过程是怎么样的

今天就跟大家聊聊有关Docker容器使用过程是怎么样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。Docker 容器使用Docker 客户端docker 客户端非常简单 ,我们可以
2023-06-06

Java filter中的chain.doFilter使用是怎样的

Java filter中的chain.doFilter使用是怎样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。filter中的chain.doFilter使用
2023-06-25

Java Keytool 命令使用是怎样的

Java Keytool 命令使用是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。Java Keytool 命令使用如果没有Keytool工具,请先安装jre/jdk(如
2023-06-04

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录