我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Go语言k8s kubernetes使用leader election实现选举

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Go语言k8s kubernetes使用leader election实现选举

一、背景

在kubernetes的世界中,很多组件仅仅需要一个实例在运行,比如controller-manager或第三方的controller,但是为了高可用性,需要组件有多个副本,在发生故障的时候需要自动切换。因此,需要利用leader election的机制多副本部署,单实例运行的模式。应用程序可以使用外部的组件比如ZooKeeper或Etcd等中间件进行leader eleaction, ZooKeeper的实现是采用临时节点的方案,临时节点存活与客户端与ZooKeeper的会话期间,在会话结束后,临时节点会被立刻删除,临时节点被删除后,其他处于被动状态的服务实例会竞争生成临时节点,生成临时节点的客户端(服务实例)就变成Leader,从而保证整个集群中只有一个活跃的实例,在发生故障的时候,也能快速的实现主从之间的迁移。Etcd是一个分布式的kv存储组件,利用Raft协议维护副本的状态服务,Etcd的Revision机制可以实现分布式锁的功能,Etcd的concurrency利用的分布式锁的能力实现了选Leader的功能(本文更多关注的是k8s本身的能力,Etcd的concurrency机制不做详细介绍)。

kubernetes使用的Etcd作为底层的存储组件,因此我们是不是有可能利用kubernetes的API实现选leader的功能呢?其实kubernetes的SIG已经提供了这方面的能力,主要是通过configmap/lease/endpoint的资源实现选Leader的功能。

二、官网代码示例

kubernetes官方提供了一个使用的例子,源码在:github.com/kubernetes/…

选举的过程中,每个实例的状态有可能是:

  • 选择成功->运行业务代码
  • 等待状态,有其他实例成为了leader。当leader放弃锁后,此状态的实例有可能会成为新的leader
  • 释放leader的锁,在运行的业务代码退出

在稳定的环境中,实例一旦成为了leader,通常情况是不会释放锁的,会保持一直运行的状态,这样有利于业务的稳定和Controller快速的对资源的状态变化做成相应的操作。只有在网络不稳定或误操作删除实例的情况下,才会触发leader的重新选举。

kubernetes官方提供的选举例子详解如下:

package main
import (
  "context"
  "flag"
  "os"
  "os/signal"
  "syscall"
  "time"
  "github.com/google/uuid"
  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  clientset "k8s.io/client-go/kubernetes"
  "k8s.io/client-go/rest"
  "k8s.io/client-go/tools/clientcmd"
  "k8s.io/client-go/tools/leaderelection"
  "k8s.io/client-go/tools/leaderelection/resourcelock"
  "k8s.io/klog/v2"
)
func buildConfig(kubeconfig string) (*rest.Config, error) {
  if kubeconfig != "" {
    cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
      return nil, err
    }
    return cfg, nil
  }
  cfg, err := rest.InClusterConfig()
  if err != nil {
    return nil, err
  }
  return cfg, nil
}
func main() {
  klog.InitFlags(nil)
  var kubeconfig string
  var leaseLockName string
  var leaseLockNamespace string
  var id string
  // kubeconfig 指定了kubernetes集群的配置文文件路径
  flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
  // 锁的拥有者的ID,如果没有传参数进来,就随机生成一个
  flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
  // 锁的ID,对应kubernetes中资源的name
  flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
  // 锁的命名空间
  flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
  // 解析命令行参数
  flag.Parse()
  if leaseLockName == "" {
    klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
  }
  if leaseLockNamespace == "" {
    klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
  }
  // leader election uses the Kubernetes API by writing to a
  // lock object, which can be a LeaseLock object (preferred),
  // a ConfigMap, or an Endpoints (deprecated) object.
  // Conflicting writes are detected and each client handles those actions
  // independently.
  config, err := buildConfig(kubeconfig)
  if err != nil {
    klog.Fatal(err)
  }
  // 获取kubernetes集群的客户端,如果获取不到,就抛异常退出
  client := clientset.NewForConfigOrDie(config)
  // 模拟Controller的逻辑代码
  run := func(ctx context.Context) {
    // complete your controller loop here
    klog.Info("Controller loop...")
    // 不退出
    select {}
  }
  // use a Go context so we can tell the leaderelection code when we
  // want to step down
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
  // listen for interrupts or the Linux SIGTERM signal and cancel
  // our context, which the leader election code will observe and
  // step down
  // 处理系统的系统,收到SIGTERM信号后,会退出进程
  ch := make(chan os.Signal, 1)
  signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
  go func() {
    <-ch
    klog.Info("Received termination, signaling shutdown")
    cancel()
  }()
  // we use the Lease lock type since edits to Leases are less common
  // and fewer objects in the cluster watch "all Leases".
  
  // 根据参数,生成锁。这里使用的Lease这种类型资源作为锁
  lock := &resourcelock.LeaseLock{
    LeaseMeta: metav1.ObjectMeta{
      Name:      leaseLockName,
      Namespace: leaseLockNamespace,
    },
    // 跟kubernetes集群关联起来
    Client: client.CoordinationV1(),
    LockConfig: resourcelock.ResourceLockConfig{
      Identity: id,
    },
  }
  // start the leader election code loop
  
  // 注意,选举逻辑启动时候,会传入ctx参数,如果ctx对应的cancel函数被调用,那么选举也会结束
  leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    // 选举使用的锁
    Lock: lock,
    // IMPORTANT: you MUST ensure that any code you have that
    // is protected by the lease must terminate **before**
    // you call cancel. Otherwise, you could have a background
    // loop still running and another process could
    // get elected before your background loop finished, violating
    // the stated goal of the lease.
    //主动放弃leader,当ctx canceled的时候
    ReleaseOnCancel: true,
    LeaseDuration:   60 * time.Second,  // 选举的任期,60s一个任期,如果在60s后没有renew,那么leader就会释放锁,重新选举
    RenewDeadline:   15 * time.Second,  // renew的请求的超时时间
    RetryPeriod:     5 * time.Second, // leader获取到锁后,renew leadership的间隔。非leader,抢锁成为leader的间隔(有1.2的jitter因子,详细看代码)
    
    // 回调函数的注册
    Callbacks: leaderelection.LeaderCallbacks{
      
      // 成为leader的回调
      OnStartedLeading: func(ctx context.Context) {
        // we're notified when we start - this is where you would
        // usually put your code
        // 运行controller的逻辑
        run(ctx)
      },
      OnStoppedLeading: func() {
        // we can do cleanup here
        // 退出leader的
        klog.Infof("leader lost: %s", id)
        os.Exit(0)
      },
      OnNewLeader: func(identity string) {
        // 有新的leader当选
        // we're notified when new leader elected
        if identity == id {
          // I just got the lock
          return
        }
        klog.Infof("new leader elected: %s", identity)
      },
    },
  })
}

启动一个实例,观察日志输出和kubernetes集群上的lease资源,启动命令

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1

可以看到,日志有输出,id=1的实例获取到资源了。

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 I1023 17:00:21.670298 94227 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:00:21.784234 94227 leaderelection.go:258] successfully acquired lease default/example I1023 17:00:21.784316 94227 main.go:78] Controller loop...

在kubernetes的集群上,看到

我们接着启动一个实例,id=2,日志中输出

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 I1023 17:05:00.555145 95658 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:05:00.658202 95658 main.go:151] new leader elected: 1

可以看出,id=2的实例,没有获取到锁,并且观察到id=1的锁获取到了实例。接着我们尝试退出id=1的实例,观察id=2的实例是否会成为新的leader

三、锁的实现

kubernets的资源都可以实现Get/Create/Update的操作,因此,理论上所有的资源都可以作为锁的底层。kubernetes 提供了Lease/Configmap/Endpoint作为锁的底层。

锁的状态转移如下:

锁需要实现以下的接口

type Interface interface {
  // Get returns the LeaderElectionRecord
  Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
  // Create attempts to create a LeaderElectionRecord
  Create(ctx context.Context, ler LeaderElectionRecord) error
  // Update will update and existing LeaderElectionRecord
  Update(ctx context.Context, ler LeaderElectionRecord) error
  // RecordEvent is used to record events
  RecordEvent(string)
  // Identity will return the locks Identity
  Identity() string
  // Describe is used to convert details on current resource lock
  // into a string
  Describe() string
}

理论上,有Get/Create/Update三个方法,就可以实现锁的机制了。但是,需要保证update和create操作的原子性,这个就是kuberenetes的机制保证了。第二章的官网代码例子中,leaderelection.RunOrDie使用的RunOrDie接口,其实就是调用Run接口,而Run接口实现非常简单:

func (le *LeaderElector) Run(ctx context.Context) {
  defer runtime.HandleCrash()
  defer func() {
    le.config.Callbacks.OnStoppedLeading()
  }()
  // 获取锁,如果没有获取到,就一直等待
  if !le.acquire(ctx) {
    return // ctx signalled done
  }
  ctx, cancel := context.WithCancel(ctx)
  defer cancel()
  // 获取到锁后,需要调用回调函数中的OnStartedLeading,运行controller的代码
  go le.config.Callbacks.OnStartedLeading(ctx)
  
  // 获取到锁后,需要不断地进行renew操作
  le.renew(ctx)
}

LeaderElector关键是需要acquire和renew的操作,acquire和renew操作代码如下:

func (le *LeaderElector) acquire(ctx context.Context) bool {
  ctx, cancel := context.WithCancel(ctx)
  defer cancel()
  succeeded := false
  desc := le.config.Lock.Describe()
  klog.Infof("attempting to acquire leader lease %v...", desc)
  // 此接口会阻塞,利用定时的机制,获取锁,如果获取不到一直循环,除非ctx被取消。
  wait.JitterUntil(func() {
    // 获取锁
    succeeded = le.tryAcquireOrRenew(ctx)
    le.maybeReportTransition()
    if !succeeded {
      klog.V(4).Infof("failed to acquire lease %v", desc)
      return
    }
    le.config.Lock.RecordEvent("became leader")
    le.metrics.leaderOn(le.config.Name)
    klog.Infof("successfully acquired lease %v", desc)
    cancel()
  }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
  return succeeded
}
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(ctx context.Context) {
  ctx, cancel := context.WithCancel(ctx)
  defer cancel()
  // 循环renew机制,renew成功,不会返回true,导致Until会不断循环
  wait.Until(func() {
    //RenewDeadline的实现在这里,如果renew超过了RenewDeadline,会导致renew失败,主退出
    timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
    defer timeoutCancel()
    err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
      // renew锁
      return le.tryAcquireOrRenew(timeoutCtx), nil
    }, timeoutCtx.Done())
    le.maybeReportTransition()
    desc := le.config.Lock.Describe()
    if err == nil {
      klog.V(5).Infof("successfully renewed lease %v", desc)
      // renew成功
      return
    }
    le.config.Lock.RecordEvent("stopped leading")
    le.metrics.leaderOff(le.config.Name)
    klog.Infof("failed to renew lease %v: %v", desc, err)
    cancel()
  }, le.config.RetryPeriod, ctx.Done())
  // if we hold the lease, give it up
  if le.config.ReleaseOnCancel {
    le.release()
  }
}

关键的实现在于tryAcquireOrRenew,而tryAcquireOrRenew就是依赖锁的状态转移机制完成核心逻辑。

func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
  now := metav1.Now()
  leaderElectionRecord := rl.LeaderElectionRecord{
    HolderIdentity:       le.config.Lock.Identity(),
    LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
    RenewTime:            now,
    AcquireTime:          now,
  }
  // 1. obtain or create the ElectionRecord
  // 检查锁有没有
  oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
  if err != nil {
    // 没有锁的资源,就创建一个
    if !errors.IsNotFound(err) {
      klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
      return false
    }
    if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
      klog.Errorf("error initially creating leader election record: %v", err)
      return false
    }
    //对外宣称自己成为了leader
    le.setObservedRecord(&leaderElectionRecord)
    return true
  }
  // 2. Record obtained, check the Identity & Time
  if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
    // 这个机制很重要,会如果leader会不断正常renew这个锁,oldLeaderElectionRawRecord会一直发生变化,发生变化会更新le.observedTime
    le.setObservedRecord(oldLeaderElectionRecord)
    le.observedRawRecord = oldLeaderElectionRawRecord
  }
  // 如果还没超时并且此实例不是leader(leader是其他实例),那么就直接退出
  if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
    le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
    !le.IsLeader() {
    klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
    return false
  }
  // 3. We're going to try to update. The leaderElectionRecord is set to it's default
  // here. Let's correct it before updating.
  // 如果是leader,就更新时间RenewTime,保证其他实例(非主)可以观察到:主还活着
  if le.IsLeader() {
    leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
  } else {
  // 不是leader,那么锁就发生了转移
    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
  }
  // 更新锁
  // update the lock itself
  if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
    klog.Errorf("Failed to update lock: %v", err)
    return false
  }
  le.setObservedRecord(&leaderElectionRecord)
  return true
}

以上就是Go语言 k8s kubernetes 使用leader election的详细内容,更多关于Go k8s leader election选举的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Go语言k8s kubernetes使用leader election实现选举

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

怎么使用Go语言实现Spark

这篇文章主要介绍了怎么使用Go语言实现Spark的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇怎么使用Go语言实现Spark文章都会有所收获,下面我们一起来看看吧。为什么使用Go语言实现SparkGo语言的成长
2023-07-06

go语言使用什么实现的

go语言使用称为Go Runtime的虚拟机实现的。Go语言的虚拟机是由C语言实现的,它负责Go程序的运行和管理,这个虚拟机可以在不同的操作系统上运行,包括Linux、Windows、macOS等,还提供了垃圾回收机制,可以自动管理内存,减
2023-07-10

怎么使用Go语言实现时间轮

本文小编为大家详细介绍“怎么使用Go语言实现时间轮”,内容详细,步骤清晰,细节处理妥当,希望这篇“怎么使用Go语言实现时间轮”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。时间轮概述时间轮是一种基于时间概念的循环缓
2023-07-05

怎么使用Go语言实现Ping工具

这篇文章主要讲解了“怎么使用Go语言实现Ping工具”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么使用Go语言实现Ping工具”吧!Ping是一种众所周知的网络诊断工具,它通过向目标服务
2023-07-06

使用Go语言实现谷歌翻译功能

本指南详细介绍了如何使用Go语言集成Google翻译API,实现文本翻译和语言识别功能。通过安装依赖项、创建服务账户、配置客户端,可以轻松翻译文本,并识别输入文本的语言。文中提供了示例代码,展示了如何使用这些功能翻译文本并检测语言,帮助开发人员轻松实现翻译功能。
使用Go语言实现谷歌翻译功能
2024-04-02

使用go语言怎么实现数组比较

这篇文章给大家介绍使用go语言怎么实现数组比较,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。比较两个数组是否相等如果两个数组类型相同(包括数组的长度,数组中元素的类型)的情况下,我们可以直接通过较运算符(==和!=)来
2023-06-15

在 Go 语言中如何使用/实现切面?

小伙伴们有没有觉得学习Golang很有意思?有意思就对了!今天就给大家带来《在 Go 语言中如何使用/实现切面?》,以下内容将会涉及到,若是在学习中对其中部分知识点有疑问,或许看了本文就能帮到你!问题内容我想监控任意方法调用,并且我对方法调
在 Go 语言中如何使用/实现切面?
2024-04-04

如何使用Go语言实现的api网关

小编给大家分享一下如何使用Go语言实现的api网关,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!浏览器的请求去请求目标地址,然后获得结果它再发送给浏览器。对于Go语言来说,实现转发只需要简单的一行代码即可实现,如下所示:h
2023-06-21

使用Go语言创建Web应用:简便实现

有志者,事竟成!如果你在学习Golang,那么本文《使用Go语言创建Web应用:简便实现》,就很适合你!文章讲解的知识点主要包括,若是你对本文感兴趣,或者是想搞懂其中某个知识点,就请你继续往下看吧~Go语言是一种开源的编程语言,它以其高效的
使用Go语言创建Web应用:简便实现
2024-04-04

如何使用go语言实现字符串比较

今天小编给大家分享的是如何使用go语言实现字符串比较,相信很多人都不太了解,为了让大家更加了解,所以给大家总结了以下内容,一起往下看吧。一定会有所收获的哦。go语言比较字符串的方法:1、使用“==”运算符,语法“字符串1==字符串2”;2、
2023-06-15

Go语言使用组合的思想实现继承

这篇文章主要为大家详细介绍了在Go里面如何使用组合的思想实现“继承”,文中的示例代码讲解详细,对我们学习Go语言有一定的帮助,需要的可以了解一下
2022-12-16

如何使用Go语言实现Websocket心跳检测

如何使用Go语言实现Websocket心跳检测Websocket是一种在Web应用中实现双向通信的协议,它能够允许服务器主动向客户端推送数据。在一些实时性要求高的应用中,我们可能需要实时监测客户端的状态,确保连接的稳定性。为了实现这个目标,
如何使用Go语言实现Websocket心跳检测
2023-12-14

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录