我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Go语言dolphinscheduler任务调度处理

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Go语言dolphinscheduler任务调度处理

目录

简介

自动化

使用

例子

任务结果检查

测试连接

重跑任务

小结

简介

dolphinscheduler是一个可视化DAG工作流任务调度平台,在大数据领域做任务调用非常流行

提供了类似azkaban工作流调度,比azkaban更强的可视化DAG,支持大数据领域flink,spark,shell,python,java,scala,http等各种类型任务

官网传送门: https://dolphinscheduler.apache.org/zh-cn/

自动化

为什么需要自动化任务处理,当你的dolphinscheduler有几百上千个任务,管理是非常耗时的,如果每个任务都配置邮件告警,那一有问题整天都在救火

此时就需要任务结果监控和任务重跑来解决 失败任务和任务自动重跑,避免浪费过多时间在维护dolphinscheduler任务上

使用

在调用api之前需要为用户申请token,按图操作

dolphinscheduler提供类似swagge接口UI工具,访问doc地址访问

http://ip:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn

例子

该demo还是使用了http请求包(HttpRequest),json数据搜索包(go-jmespath)

任务结果检查

填坑说明

日期处理: 使用了%20转译空格,使用Sprintf方法拼接字符串

多种数据类型: 使用interface{}来支持int,string等多种数据类型

数据转换1: 将byte数据转成json格式,方便搜索

数据转换2: 将interface{}数据转成字符串切片,方便使用

该方法可以做成周期性任务运行,将失败的job查出来,后续是要告警通知,还是根据job名称查出对应id进行重跑任务

package main
import (
   "encoding/json"
   "fmt"
   "github.com/jmespath/go-jmespath"
   "github.com/kirinlabs/HttpRequest"
   "time"
)
var (
   url = "http://ip:12345/dolphinscheduler"
   token = "xxxxxxx"
   req *HttpRequest.Request
)
func init() {
   req = HttpRequest.NewRequest().Debug(true).SetTimeout(time.Second*5).
      SetHeaders(map[string]string{
         "token":token,
      })
}
func main() {
   //testConn()
   jobCheck()
}
func jobCheck()  {
   //获取日期
   today := time.Now().Format("2006-01-02")
   tomorrow := time.Now().AddDate(0, 0, +1).Format("2006-01-02")
   //拼接日期 %20是空格的转译
   fmt.Println(fmt.Sprintf("%v%v",today,"%2000:00:00"))
   fmt.Println(fmt.Sprintf("%v%v",tomorrow,"%2000:00:00"))
   //需要检查的项目名称
   projects := []string{"jdOrder","jdPlay"}
   //需要检查的时间段 页码是int类型,日期是string类型
   m := make(map[string]interface{})
   m["pageNo"] = 1
   m["pageSize"] = 22
   m["stateType"] = "FAILURE"
   m["startDate"] = fmt.Sprintf("%v%v",today,"%2000:00:00")
   m["endDate"] = fmt.Sprintf("%v%v",tomorrow,"%2000:00:00")
   for _, project := range projects {
      resp, _ := req.Get(url+"/projects/"+project+"/task-instance/list-paging",m)
      if resp.StatusCode() != 200 {
         fmt.Println("job检查状态码不符期望: ",resp.StatusCode())
         return
      }
      fmt.Println("resp",resp)
      //将返回数据从byte转成json格式
      body, _ := resp.Body()
      var i interface{}
      var s []string
      _ = json.Unmarshal(body, &i)
      //搜索出需要的字段对应数据
      processInstanceNames, _ := jmespath.Search("data.totalList[*].processInstanceName", i)
      //将interface转成[]string
      for _,v := range processInstanceNames.([]interface{}) {
         s = append(s,v.(string))
      }
      //打印出结果
      for _,v := range s {
         fmt.Println(v)
      }
   }
}
测试连接

如果上小节任务跑不成功,可以先运行该方法,测试连接正确性

func testConn() {
   resp, _ := req.Get(url + "/projects/query-project-list")
   fmt.Println("resp",resp)
   body, _ := resp.Body()
   var i interface{}
   _ = json.Unmarshal(body, &i)
   fmt.Println("i",i)
}
重跑任务

重跑任务其实就是再次启动任务,直接调用start_job既可

项目名称和ID需要通过该接口获取,这个是固定的

http://ip:12345/dolphinscheduler/projects/monitor/process/list-paging

调用示例: startJob("ads_jd_order",678)

func startJob(projectName string,projectId int)  {
   m := make(map[string]interface{})
   m["failureStrategy"] = "CONTINUE"
   m["warningGroupId"] = 0
   m["warningType"] = "NONE"
   m["runMode"] = "RUN_MODE_SERIAL"
   m["processInstancePriority"] = "MEDIUM"
   m["workerGroup"] = "default"
   m["processDefinitionId"] = projectId
   resp, _ := req.JSON().Post(url+"projects/" + projectName+"/executors/start-process-instance",m)
   if resp.StatusCode() != 200 {
      fmt.Println("job开始状态码不符期望: ",resp.StatusCode())
      return
   }
}
小结

dolphinscheduler api调用有文档,不太复杂,但网上资料较少,需要自行摸索,以上就是Go语言dolphinscheduler任务调度处理的详细内容,更多关于Go语言dolphinscheduler任务调度的资料请关注编程网其它相关文章!


免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Go语言dolphinscheduler任务调度处理

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Go语言dolphinscheduler任务调度处理

目录简介自动化使用例子任务结果检查测试连接重跑任务小结简介dolphinscheduler是一个可视化DAG工作流任务调度平台,在大数据领域做任务调用非常流行 提供了类似azkaban工作流调度,比azkaban更强的可视化DAG,支持大数
2022-06-08

在Go语言中如何处理并发任务调度的问题?

在Go语言中如何处理并发任务调度的问题?随着计算机技术的快速发展,处理并发任务的需求越来越多。Go语言作为一种以并发编程为核心特性的编程语言,提供了丰富的并发编程模型和工具,能够很好地解决并发任务调度的问题。在Go语言中,可以使用gorou
2023-10-22

如何处理Go语言中的并发任务的任务调度和任务优先级问题?

如何处理Go语言中的并发任务的任务调度和任务优先级问题?随着计算机硬件的发展和多核处理器的普及,处理并发任务已成为程序开发过程中的重要环节。Go语言作为一门支持原生并发的编程语言,其并发模型的设计可以有效地处理并发任务。然而,在实际开发中,
2023-10-22

如何处理Go语言中的并发任务的任务调度和任务执行报告问题?

如何处理Go语言中的并发任务的任务调度和任务执行报告问题?引言:并发任务的任务调度和任务执行报告是Go语言中常见的问题之一。在实际开发中,我们经常需要同时处理多个任务,但是如何高效地调度和执行这些任务,并且能够准确地知道任务的执行情况,对于
2023-10-22

怎么用C语言实现任务调度

这篇文章主要介绍“怎么用C语言实现任务调度”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“怎么用C语言实现任务调度”文章能帮助大家解决问题。任务调度模式结构整体上的结构属于线性结构,结合链表和定时器来
2023-07-05

如何解决Go语言中的并发任务的任务依赖和任务调度图问题?

如何解决Go语言中的并发任务的任务依赖和任务调度图问题?在Go语言中,通过并发方式执行任务可以显著提高程序的性能和效率。然而,当任务之间存在依赖关系并且需要按照特定的顺序执行时,我们则需要解决并发任务中的任务依赖和任务调度图问题。本文将介绍
2023-10-22

如何在go语言中实现分布式任务调度的功能

在Go语言中实现分布式任务调度功能可以借助一些开源项目和库来简化开发过程。以下是一个基本的实现步骤:1. 使用Go语言编写任务执行代码:首先,编写任务执行代码。这些代码定义了要执行的任务逻辑。可以使用Go语言中的goroutine和chan
2023-10-12

在Go语言中如何解决并发任务分布式调度问题?

在Go语言中如何解决并发任务分布式调度问题?随着云计算和大数据的发展,分布式系统的应用越来越广泛。在分布式系统中,并发任务的调度是一个非常重要的问题。Go语言作为一种高效的并发编程语言,为解决并发任务分布式调度问题提供了很好的支持。在Go语
2023-10-22

在Go语言中如何解决并发任务分布式调度问题

在Go语言中,可以使用以下几种方式来解决并发任务分布式调度问题:1. 使用goroutine和channel:在Go语言中,可以使用goroutine来并发执行任务,并使用channel来进行任务的分发和结果的收集。可以创建一个任务队列,将
2023-10-09

在Go语言中如何解决并发任务优先级调度问题?

在Go语言中如何解决并发任务优先级调度问题?在日常的开发中,我们经常会遇到并发任务优先级调度的问题。例如,当我们需要同时处理多个任务时,有些任务可能比其他任务更重要,需要优先执行。在Go语言中,我们可以使用goroutine和channel
2023-10-22

如何解决Go语言中的并发任务的分布式任务队列和任务调度策略问题?

如何解决Go语言中的并发任务的分布式任务队列和任务调度策略问题?引言:在分布式系统中,任务的分发和调度是一个关键问题。在Go语言中,通过使用并发技术可以有效地管理和执行任务。本文将介绍如何使用分布式任务队列和任务调度策略来解决Go语言中的并
2023-10-22

Go语言中如何处理并发任务重试问题?

Go语言中如何处理并发任务重试问题?在并发编程中,任务重试是一个常见的问题。当一个任务执行失败后,我们可能希望重新执行该任务直到成功为止。Go语言的并发模型使得处理并发任务重试问题变得相对简单。本文将介绍如何在Go语言中处理并发任务重试问题
2023-10-22

Go语言中如何处理并发任务重试问题

Go语言中可以使用goroutine和channel来处理并发任务重试问题。首先,创建一个goroutine来执行任务,并使用channel来通信任务的结果。可以使用select语句来处理任务的结果,如果任务成功完成,则将结果发送到一个成功
2023-10-09

在Go语言中如何解决并发任务的优先级调度问题

在Go语言中,可以使用goroutine和channel来解决并发任务的优先级调度问题。首先,我们需要定义一个包含任务的结构体,包括任务的名称和优先级。例如:```gotype Task struct {Name stringPr
2023-10-09

在Go语言中如何解决并发任务的优先级调度问题?

在Go语言中如何解决并发任务的优先级调度问题?Go语言提供了丰富的并发相关的特性,使得我们可以轻松地实现并发任务的优先级调度。在Go语言中,我们可以使用goroutine和channel来完成任务的并发执行和通信。本文将介绍如何使用goro
2023-10-22

如何处理Go语言中的并发任务的任务丢失和任务重复问题?

如何处理Go语言中的并发任务的任务丢失和任务重复问题?在Go语言中,使用并发可以提高程序的运行效率,但同时也带来了一些问题,其中最常见的就是任务丢失和任务重复问题。当多个goroutine并发执行任务时,有可能出现某些任务被丢失,或者某些任
2023-10-22

如何处理Go语言中的并发任务的任务丢失和任务重复问题

在Go语言中,可以使用以下几种方法来处理并发任务的任务丢失和任务重复问题:1. 使用带缓冲的通道:可以使用带缓冲的通道来存储任务,当任务到达时,先将任务放入通道中,然后再进行处理。这样可以避免任务丢失,并且可以灵活地控制任务的处理速度。如果
2023-10-09

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录