我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Python下定时任务框架APSched

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Python下定时任务框架APSched

今天准备实现一个功能需要用到定时执行任务,所以就看到了Python的一个定时任务框架APScheduler,试了一下感觉还不错。

1.APScheduler简介: 

APScheduler是Python的一个定时任务框架,可以很方便的满足用户定时执行或者周期执行任务的需求,它提供了基于日期date、固定时间间隔interval 、以及类似于Linux上的定时任务crontab类型的定时任务。并且该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,所以使用起来非常方便。

2.APScheduler安装:

APScheduler的安装相对来说也非常简单,可以直接利用pip安装,如果没有pip可以下载源码,利用源码安装。

1).利用pip安装:(推荐)

# pip install apscheduler

2).基于源码安装:https://pypi.python.org/pypi/APScheduler/

# python setup.py install

3.基本概念

APScheduler有四种组件及相关说明:

 1) triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,除了他们自己初始化配置外,触发器完全是无状态的。

 2)job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,其它作业存储器可以将任务作业保存到各种数据库中,支持MongoDB、Redis、SQLAlchemy存储方式。当对作业任务进行持久化存储的时候,作业的数据将被序列化,重新读取作业时在反序列化。

3) executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行。当作业任务完成时,执行器将会通知调度器。对于执行器,默认情况下选择ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任务如比较消耗CPU的任务则可以选择ProcessPoolExecutor,当然根据根据实际需求可以同时使用两种执行器。

4) schedulers(调度器):调度器是将其它部分联系在一起,一般在应用程序中只有一个调度器,应用开发者不会直接操作触发器、任务存储以及执行器,相反调度器提供了处理的接口。通过调度器完成任务的存储以及执行器的配置操作,如可以添加。修改、移除任务作业。

 APScheduler提供了多种调度器,可以根据具体需求来选择合适的调度器,常用的调度器有:

     BlockingScheduler:适合于只在进程中运行单个任务的情况,通常在调度器是你唯一要运行的东西时使用。

     BackgroundScheduler: 适合于要求任何在程序后台运行的情况,当希望调度器在应用后台执行时使用。

     AsyncIOScheduler:适合于使用asyncio框架的情况

     GeventScheduler: 适合于使用gevent框架的情况

     TornadoScheduler: 适合于使用Tornado框架的应用

     TwistedScheduler: 适合使用Twisted框架的应用

     QtScheduler: 适合使用QT的情况

4.配置调度器

APScheduler提供了许多不同的方式来配置调度器,你可以使用一个配置字典或者作为参数关键字的方式传入。你也可以先创建调度器,再配置和添加作业,这样你可以在不同的环境中得到更大的灵活性。

1)下面一个简单的示例:

import time
from apscheduler.schedulers.blocking import BlockingScheduler
 
def test_job():
    print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
 
scheduler = BlockingScheduler()
'''
 #该示例代码生成了一个BlockingScheduler调度器,使用了默认的默认的任务存储MemoryJobStore,以及默认的执行器ThreadPoolExecutor,并且最大线程数为10。
'''
scheduler.add_job(test_job, 'interval', seconds=5, id='test_job')
'''
 #该示例中的定时任务采用固定时间间隔(interval)的方式,每隔5秒钟执行一次。
 #并且还为该任务设置了一个任务id
'''
scheduler.start()

2)如果想执行一些复杂任务,如上边所说的同时使用两种执行器,或者使用多种任务存储方式,并且需要根据具体情况对任务的一些默认参数进行调整。可以参考下面的方式。(http://apscheduler.readthedocs.io/en/latest/userguide.html)

 第一种方式:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

第二种方式:

from apscheduler.schedulers.background import BackgroundScheduler


# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

第三种方式:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

5.对任务作业的基本操作:

1).添加作业有两种方式:第一种可以直接调用add_job(),第二种使用scheduled_job()修饰器。

而add_job()是使用最多的,它可以返回一个apscheduler.job.Job实例,因而可以对它进行修改或者删除,而使用修饰器添加的任务添加之后就不能进行修改。

例如使用add_job()添加作业:

#!/usr/bin/env python
#-*- coding:UTF-8
import time
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def job1(f):
    print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), f

def job2(arg1, args2, f):
    print f, args1, args2

def job3(**args):
    print args

'''
APScheduler支持以下三种定时任务:
cron: crontab类型任务
interval: 固定时间间隔任务
date: 基于日期时间的一次性任务
'''
scheduler = BlockingScheduler()
#循环任务示例
scheduler.add_job(job1, 'interval', seconds=5, args=('循环',), id='test_job1')
#定时任务示例
scheduler.add_job(job1, 'cron', second='*/5', args=('定时',), id='test_job2')
#一次性任务示例
scheduler.add_job(job1, next_run_time=(datetime.datetime.now() + datetime.timedelta(seconds=10)), args=('一次',), id='test_job3')
'''
传递参数的方式有元组(tuple)、列表(list)、字典(dict)
注意:不过需要注意采用元组传递参数时后边需要多加一个逗号
'''
#基于list
scheduler.add_job(job2, 'interval', seconds=5, args=['a','b','list'], id='test_job4')
#基于tuple
scheduler.add_job(job2, 'interval', seconds=5, args=('a','b','tuple',), id='test_job5')
#基于dict
scheduler.add_job(job3, 'interval', seconds=5, kwargs={'f':'dict', 'a':1,'b':2}, id='test_job7')
print scheduler.get_jobs()
scheduler.start()

#带有参数的示例
scheduler.add_job(job2, 'interval', seconds=5, args=['a','b'], id='test_job4')
scheduler.add_job(job2, 'interval', seconds=5, args=('a','b',), id='test_job5')
scheduler.add_job(job3, 'interval', seconds=5, kwargs={'a':1,'b':2}, id='test_job6')
print scheduler.get_jobs()
scheduler.start()

或者使用scheduled_job()修饰器来添加作业:

@sched.scheduled_job('cron', second='*/5' ,id='my_job_id',)
def test_task():
    print("Hello world!")

2).获得任务列表:

可以通过get_jobs方法来获取当前的任务列表,也可以通过get_job()来根据job_id来获得某个任务的信息。并且apscheduler还提供了一个print_jobs()方法来打印格式化的任务列表。

例如:

scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')
print scheduler.get_job('my_job_id')
print scheduler.get_jobs()

3).修改任务:

修改任务任务的属性可以使用apscheduler.job.Job.modify()或者modify_job()方法,可以修改除了id的其它任何属性。

例如:

job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job' name='test_job')
job.modify(max_instances=5, name='my_job')

4).删除任务:

删除调度器中的任务有可以用remove_job()根据job ID来删除指定任务或者使用remove(),如果使用remove()需要事先保存在添加任务时返回的实例对象,任务删除后就不会在执行。

注意:通过scheduled_job()添加的任务只能使用remove_job()进行删除。

例如:

job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')
job.remove()

或者

scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')
scheduler.remove_job('my_job')

5).暂停与恢复任务:

暂停与恢复任务可以直接操作任务实例或者调度器来实现。当任务暂停时,它的运行时间会被重置,暂停期间不会计算时间。

暂停任务:

apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

恢复任务

apscheduler.job.Job.resume()
apscheduler.schedulers.BaseScheduler.resume_job()

6).启动调度器

可以使用start()方法启动调度器,BlockingScheduler需要在初始化之后才能执行start(),对于其他的Scheduler,调用start()方法都会直接返回,然后可以继续执行后面的初始化操作。

例如:

from apscheduler.schedulers.blocking import BlockingScheduler
 
def my_job():
    print "Hello world!"
 
scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=5)
scheduler.start()

7).关闭调度器:

使用下边方法关闭调度器:

scheduler.shutdown()

默认情况下调度器会关闭它的任务存储和执行器,并等待所有正在执行的任务完成,如果不想等待,可以进行如下操作:

scheduler.shutdown(wait=False)

注意:

当出现No handlers could be found for logger “apscheduler.scheduler”次错误信息时,说明没有

 logging模块的logger存在,所以需要添加上,对应新增内容如下所示(仅供参考):

import logging
 logging.basicConfig(level=logging.DEBUG,
            format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
           datafmt='%a, %d %b %Y %H:%M:%S',
            filename='/var/log/aaa.txt',
            filemode='a')


免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Python下定时任务框架APSched

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Python下定时任务框架APSched

今天准备实现一个功能需要用到定时执行任务,所以就看到了Python的一个定时任务框架APScheduler,试了一下感觉还不错。1.APScheduler简介: APScheduler是Python的一个定时任务框架,可以很方便的满足用户定
2023-01-31

详解定时任务框架Quartz的使用

Quartz是OpenSymphony开源组织在Job scheduling领域又一个开源项目,完全由Java开发,可以用来执行定时任务,本文就来带大家聊聊它的具体使用
2023-02-13

ThreadPoolTaskScheduler轻量级多线程定时任务框架

ThreadPoolTaskScheduler轻量级多线程定时任务框架 前言一、ThreadPoolTaskScheduler是什么?二、上干货1.ThreadPoolTaskScheduler常用的api介绍2.springboo
2023-08-30

Python定时任务

在项目中,我们可能遇到有定时任务的需求。其一:定时执行任务。例如每天早上 8 点定时推送早报。其二:每隔一个时间段就执行任务。比如:每隔一个小时提醒自己起来走动走动,避免长时间坐着。今天,我跟大家分享下 Python 定时任务的实现方法。1
2023-01-31

web分布式定时任务调度框架怎么使用

这篇文章主要讲解了“web分布式定时任务调度框架怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“web分布式定时任务调度框架怎么使用”吧!一、业务背景1.1 为什么需要使用定时任务调度
2023-06-04

springboot整合quartz定时任务框架的方法是什么

今天小编给大家分享一下springboot整合quartz定时任务框架的方法是什么的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下
2023-06-26

怎么使用golang定时任务框架修改数据库

使用Golang定时任务框架修改数据库,你可以按照以下步骤进行操作:1. 导入所需的包:goimport ("github.com/robfig/cron""database/sql"_ "github.com/go-sql-driver/
2023-10-20

Python实现定时任务

Python下实现定时任务的方式有很多种方式。下面介绍几种 循环sleep:这是一种最简单的方式,在循环里放入要执行的任务,然后sleep一段时间再执行。缺点是,不容易控制,而且sleep是个阻塞函数。def timer(n): '''''
2022-06-04

Linux下的定时任务和延时任务的详解

at at + timeat 17:23 at> touch /mnt/file{1..9} ##延迟动作 at> 键入ctrl+d ##表示发起动作 at -l | atq ##查看当前任务 at -d | atrm ##取消指定
2022-06-04

java中常用的定时任务框架单体是怎样的

本篇文章为大家展示了java中常用的定时任务框架单体是怎样的,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。一、Timer+TimerTask这是jdk自带的java.util.Timer类,这个类允
2023-06-22

python任务调度框架怎么使用

Python任务调度框架最常用的是APScheduler,下面是APScheduler的使用步骤:1. 安装APScheduler:在终端中运行`pip install apscheduler`来安装APScheduler库。2. 导入AP
2023-09-21

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录