Python使用signal定时结束AsyncIOScheduler任务的问题
短信预约 信息系统项目管理师 报名、考试、查分时间动态提醒
在使用aiohttp结合apscheduler的AsyncIOScheduler模拟定点并发的时候遇到两个问题
- 在调度器scheduler.start()后,程序直接退出(在Jupiter中任务可以正常启动)
- 如何在指定时间调用scheduler.shutdown()? (因为程序直接退出了)
原调试代码如下:
from datetime import datetime, timedelta
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
url = 'https://httpbin.org/get?a=1'
async with session.get(url) as res:
print('get', res.status)
return await res.text()
async def post(session):
url = 'https://httpbin.org/post?b=2'
async with session.post(url) as res:
print('post', res.status)
return await res.text()
async def main():
async with aiohttp.ClientSession() as session:
await get(session)
await post(session)
if __name__ == '__main__':
jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
scheduler = AsyncIOScheduler(jobstores=jobstores)
for i in range(10): # 添加10个任务
job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
scheduler.start()
Google后发现AsyncIOScheduler的使用需要在scheduler启动后,需要自己调用asyncio.get_event_loop().run_forever()
来启动协程任务。
但是一旦run_forever()则就会阻塞至死。除非有KeyboardInterrupt, SystemExit等异常或者强杀来停止其运行。
此时想到使用Python的signal来定时发送信号,修改后程序如下,可以正常延迟停止(感觉有点像模拟Go的defer)。
# -*- coding: utf-8 -*-
"""
@Time : 2021/7/23
@Auth : hanzhichao
@Desc:
"""
from datetime import datetime, timedelta
import signal
import asyncio
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
url = 'https://httpbin.org/get?a=1'
async with session.get(url) as res:
print('get', res.status)
return await res.text()
async def post(session):
url = 'https://httpbin.org/post?b=2'
async with session.post(url) as res:
print('post', res.status)
return await res.text()
async def main():
async with aiohttp.ClientSession() as session:
await get(session)
await post(session)
if __name__ == '__main__':
jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
scheduler = AsyncIOScheduler(jobstores=jobstores)
for i in range(10): # 添加10个任务
job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
scheduler.start()
signal.alarm(20) # 20秒后终止程序
asyncio.get_event_loop().run_forever() # 永远运行
到此这篇关于Python使用signal定时结束AsyncIOScheduler任务的文章就介绍到这了,更多相关Python定时结束AsyncIOScheduler任务内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341