Golang如何实现CronJob
这篇文章主要讲解了“Golang如何实现CronJob”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Golang如何实现CronJob”吧!
引言
最近做了一个需求,是定时任务相关的。以前定时任务都是通过 linux crontab 去实现的,现在服务上云(k8s)了,尝试了 k8s 的 CronJob,由于公司提供的是界面化工具,使用、查看起来很不方便。于是有了本文,通过一个单 pod 去实现一个常驻服务,去跑定时任务。
经过筛选,选用了cron这个库,它支持 linux cronjob 语法取配置定时任务,还支持@every 10s、@hourly 等描述符去配置定时任务,完全满足我们要求,比如下面的例子:
package mainimport ("fmt""github.com/natefinch/lumberjack""github.com/robfig/cron/v3""github.com/sirupsen/logrus")type CronLogger struct {clog *logrus.Logger}func (l *CronLogger) Info(msg string, keysAndValues ...interface{}) {l.clog.WithFields(logrus.Fields{"data": keysAndValues,}).Info(msg)}func (l *CronLogger) Error(err error, msg string, keysAndValues ...interface{}) {l.clog.WithFields(logrus.Fields{"msg": msg,"data": keysAndValues,}).Warn(err.Error())}func main() {logger := logrus.New()_logger := &lumberjack.Logger{Filename: "./test.log",MaxSize: 50,MaxAge: 15,MaxBackups: 5,}logger.SetOutput(_logger)logger.SetFormatter(&logrus.JSONFormatter{DisableHTMLEscape: true,})c := cron.New(cron.WithLogger(&CronLogger{clog: logger,}))c.AddFunc("*/5 * * * *", func() {fmt.Println("你的流量包即将过期了")})c.AddFunc("*/2 * * * *", func() {fmt.Println("你的转码包即将过期了")})c.Start()for {select {}}}
使用了 cronjob、并结合了 golang 的 log 组建,输出日志到文件,使用很方便。
但是,在使用过程中,发现还有些不足,缺少某些功能,比如我很想使用的查看任务列表。
类库介绍
扩展性强
此类库扩展性挺强,通过 JobWrapper 去包装一个任务,NewChain(w1, w2, w3).Then(job),相关实现如下:
type JobWrapper func(Job) Jobtype Chain struct { wrappers []JobWrapper}func NewChain(c ...JobWrapper) Chain { return Chain{c}}func (c Chain) Then(j Job) Job { for i := range c.wrappers { j = c.wrappers[len(c.wrappers)-i-1](j) } return j}
比如当前脚本如果还没有执行完,下次任务时间又到了,就可以通过如下默认提供的 wrapper 去避免继续执行。可以看到最后执行的任务 j.Run() 被包装在了一个函数闭包中,并且根据闭包中的 channel 去判断是否执行,避免重复执行。首次执行的时候,容量为 1 的 channel 中已经有数据了,重复执行时,channel 无数据,默认跳过,等上次任务执行完成后,又像 channel 中写入一条数据,下次 channel 可以读出数据,又可以执行任务了:
func SkipIfStillRunning(j Job) Job { var ch = make(chan struct{}, 1) ch <- struct{}{} return FuncJob(func() { select { case v := <-ch: defer func() { ch <- v }() j.Run() default: // "skip" } })}
主流程
cron 主流程是启动一个协程,里面有双重 for 循环,下面我们来一起分析一下。
定时器
第一层循环,首先计算下次最早执行任务的时间跟当前时间间隔 gap,然后设置定时器为 gap,这里很巧妙,定时器间隔不是 1s/次,而是跟下次任务的时间相关,这样就避免了无用的定时器循环,也让执行时间更精准,不存在设置小了浪费资源,设置大了误差大的情况。接下来进入第二层循环。
sort.Sort(byTime(c.entries))timer = time.NewTimer(c.entries[0].Next.Sub(now))
事件循环
事件循环中,包含了很多事件,比如 添加任务、停止、移除任务,当 cron 启动之后,这些任务都是异步的。比如添加任务,不会直接将任务信息写入内存中,而是进入事件循环,加入之后,重新计算第一二层循环,避免了正在修改任务信息,又执行任务信息,然后出错的情况。
有人可能会问了,为何不在事件中加锁,这样也能避免内存竞争。我想说,我们执行的是脚本任务,有的事件可能很长,可能会阻塞有些事件,所以这些事件都放在循环中,避免了加锁,也满足了要求。
for { select { case now = <-timer.C: // 执行任务 case newEntry := <-c.add: // 添加任务 case replyChan := <-c.snapshot: // 获取任务信息 case <-c.stop: // 停止任务 case id := <-c.remove: // 移除任务 } break}
类库改造
在了解了项目的基本情况之后,对项目做了部分改造,方便使用。
打印任务列表信息
在主循环汇总加入了信号量监听,当触发信号量 SIGUSR1,将任务信息输出到日志:
usrSig := make(chan os.Signal, 1)signal.Notify(usrSig, syscall.SIGUSR1)for {select {case <-usrSig:// 启动单独的协程去打印定时任务执行信息continue}break}
根据名称移除脚本
目前脚本只能根据脚本 id 去移除要执行的任务,执行过程中,也不能通过命令去移除任务,不是太方便。比如有个脚本马上要执行了,但是该脚本发现问题了,这时候生产环境的话,就需要更新代码,然后重启服务去下线脚本任务,这时候,黄花菜可能都凉了。
所以我也是通过信号量,来处理运行之后,运行中移除任务的问题,收到信号量之后,读取文件中的内容,根据命令去处理 runing 中的内存:
usrSig2 := make(chan os.Signal, 1)signal.Notify(usrSig2, syscall.SIGUSR2)......case <-usrSig2:actionByte, err := os.ReadFile("/tmp/cron.action")...... //校验命令正确性action := strings.Fields(string(actionByte))switch action[0] {case "removeTag":timer.Stop()now = c.now()c.removeEntryByTag(action[1])c.logger.Info("removedByTag", "tag", action[1])}......
改造效果
由于原项目已经 2 年多没有个更新过了,就算发起 pr 估计也不会被处理,所以 fork 一份放在了这里aizuyan/cron进行改造,下面是改进之后的代码:
package mainimport (// 加载配置文件"fmt""github.com/aizuyan/cron/v3")func main() {c := cron.New(cron.WithLogger(cron.DefaultLogger))c.AddFuncWithTag("流量包过期", "*/5 * * * *", func() {fmt.Println("你的流量包即将过期了")})c.AddFuncWithTag("转码包过期", "*/2 * * * *", func() {fmt.Println("你的转码包即将过期了")})c.Start()for {select {}}}
对每个定时任务增加了一个名称标识,当任务启动后,当我们执行 kill -SIGUSR1 <pid> 的时候,会看到 stdout 输出了运行的任务列表信息:
+----+------------+-------------+---------------------+---------------------+
| ID | TAG | SPEC | PREV | NEXT |
+----+------------+-------------+---------------------+---------------------+
| 2 | 转码包过期 | */2 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:22:00 |
| 1 | 流量包过期 | */5 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:25:00 |
+----+------------+-------------+---------------------+---------------------+
执行 kill -SIGUSR2 <pid>,移除转码包过期任务,避免了使用 ID 容易出错的问题。
cat /tmp/cron.action removeTag 转码包过期// {"data":["tag","转码包过期"],"level":"info","msg":"removedByTag","time":"2023-04-02T18:32:56+08:00"}
感谢各位的阅读,以上就是“Golang如何实现CronJob”的内容了,经过本文的学习后,相信大家对Golang如何实现CronJob这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341