我的编程空间,编程开发者的网络收藏夹
学习永远不晚

go实现一个分布式限流器的方法步骤

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

go实现一个分布式限流器的方法步骤

项目中需要对 api 的接口进行限流,但是麻烦的是,api 可能有多个节点,传统的本地限流无法处理这个问题。限流的算法有很多,比如计数器法,漏斗法,令牌桶法,等等。各有利弊,相关博文网上很多,这里不再赘述。

项目的要求主要有以下几点:

  • 支持本地/分布式限流,接口统一
  • 支持多种限流算法的切换
  • 方便配置,配置方式不确定

go 语言不是很支持 OOP,我在实现的时候是按 Java 的思路走的,所以看起来有点不伦不类,希望能抛砖引玉。

1. 接口定义

package ratelimit

import "time"

// 限流器接口
type Limiter interface {
    Acquire() error
    TryAcquire() bool
}

// 限流定义接口
type Limit interface {
    Name() string
    Key() string
    Period() time.Duration
    Count() int32
    LimitType() LimitType
}

// 支持 burst
type BurstLimit interface {
    Limit
    BurstCount() int32
}

// 分布式定义的 burst
type DistLimit interface {
    Limit
    ClusterNum() int32
}

type LimitType int32
const (
    CUSTOM LimitType = iota
    IP
)

Limiter 接口参考了 Google 的 guava 包里的 Limiter 实现。Acquire 接口是阻塞接口,其实还需要加上 context 来保证调用链安全,因为实际项目中并没有用到 Acquire 接口,所以没有实现完善;同理,超时时间的支持也可以通过添加新接口继承自 Limiter 接口来实现。TryAcquire 会立即返回。

Limit 抽象了一个限流定义,Key() 方法返回这个 Limit 的唯一标识,Name() 仅作辅助,Period() 表示周期,单位是秒,Count() 表示周期内的最大次数,LimitType()表示根据什么来做区分,如 IP,默认是 CUSTOM.

BurstLimit 提供突发的能力,一般是配合令牌桶算法。DistLimit 新增 ClusterNum() 方法,因为 mentor 要求分布式遇到错误的时候,需要退化为单机版本,退化的策略即是:2 节点总共 100QPS,如果出现分区,每个节点需要调整为各 50QPS

2. LocalCounterLimiter

package ratelimit

import (
    "errors"
    "fmt"
    "math"
    "sync"
    "sync/atomic"
    "time"
)

// todo timer 需要 stop
type localCounterLimiter struct {
    limit Limit

    limitCount int32 // 内部使用,对 limit.count 做了 <0 时的转换

    ticker *time.Ticker
    quit chan bool

    lock sync.Mutex
    newTerm *sync.Cond
    count int32
}

func (lim *localCounterLimiter) init() {
    lim.newTerm = sync.NewCond(&lim.lock)
    lim.limitCount = lim.limit.Count()

    if lim.limitCount < 0 {
        lim.limitCount = math.MaxInt32 // count 永远不会大于 limitCount,后面的写法保证溢出也没问题
    } else if lim.limitCount == 0  {
        // 禁止访问, 会无限阻塞
    } else {
        lim.ticker = time.NewTicker(lim.limit.Period())
        lim.quit = make(chan bool, 1)

        go func() {
            for {
                select {
                case <- lim.ticker.C:
                    fmt.Println("ticker .")
                    atomic.StoreInt32(&lim.count, 0)
                    lim.newTerm.Broadcast()

                    //lim.newTerm.L.Unlock()
                case <- lim.quit:
                    fmt.Println("work well .")
                    lim.ticker.Stop()
                    return
                }
            }
        }()
    }
}

// todo 需要机制来防止无限阻塞, 不超时也应该有个极限时间
func (lim *localCounterLimiter) Acquire() error {
    if lim.limitCount == 0 {
        return errors.New("rate limit is 0, infinity wait")
    }

    lim.newTerm.L.Lock()
    for lim.count >= lim.limitCount {
        // block instead of spinning
        lim.newTerm.Wait()
        //fmt.Println(count, lim.limitCount)
    }
    lim.count++
    lim.newTerm.L.Unlock()

    return nil
}

func (lim *localCounterLimiter) TryAcquire() bool {
    count := atomic.AddInt32(&lim.count, 1)
    if count > lim.limitCount {
        return false
    } else {
        return true
    }
}

代码很简单,就不多说了

3. LocalTokenBucketLimiter

golang 的官方库里提供了一个 ratelimiter,就是采用令牌桶的算法。所以这里并没有重复造轮子,直接代理了 ratelimiter。

package ratelimit

import (
    "context"
    "golang.org/x/time/rate"
    "math"
)

type localTokenBucketLimiter struct {
    limit Limit

    limiter *rate.Limiter // 直接复用令牌桶的
}

func (lim *localTokenBucketLimiter) init() {
    burstCount := lim.limit.Count()
    if burstLimit, ok := lim.limit.(BurstLimit); ok {
        burstCount = burstLimit.BurstCount()
    }

    count := lim.limit.Count()
    if count < 0 {
        count = math.MaxInt32
    }

    f := float64(count) / lim.limit.Period().Seconds()
    if f < 0 {
        f = float64(rate.Inf) // 无限
    } else if f == 0 {
        panic("为 0 的时候,底层实现有问题")
    }

    lim.limiter = rate.NewLimiter(rate.Limit(f), int(burstCount))
}

func (lim *localTokenBucketLimiter) Acquire() error {
    err := lim.limiter.Wait(context.TODO())
    return err
}

func (lim *localTokenBucketLimiter) TryAcquire() bool {
    return lim.limiter.Allow()
}

4. RedisCounterLimiter

package ratelimit

import (
    "math"
    "sync"
    "xg-go/log"
    "xg-go/xg/common"
)

type redisCounterLimiter struct {
    limit      DistLimit
    limitCount int32 // 内部使用,对 limit.count 做了 <0 时的转换

    redisClient *common.RedisClient

    once sync.Once // 退化为本地计数器的时候使用
    localLim Limiter

    //script string
}

func (lim *redisCounterLimiter) init() {
    lim.limitCount = lim.limit.Count()
    if lim.limitCount < 0 {
        lim.limitCount = math.MaxInt32
    }

    //lim.script = buildScript()
}

//func buildScript() string {
//  sb := strings.Builder{}
//
//  sb.WriteString("local c")
//  sb.WriteString("\nc = redis.call('get',KEYS[1])")
//  // 调用不超过最大值,则直接返回
//  sb.WriteString("\nif c and tonumber(c) > tonumber(ARGV[1]) then")
//  sb.WriteString("\nreturn c;")
//  sb.WriteString("\nend")
//  // 执行计算器自加
//  sb.WriteString("\nc = redis.call('incr',KEYS[1])")
//  sb.WriteString("\nif tonumber(c) == 1 then")
//  sb.WriteString("\nredis.call('expire',KEYS[1],ARGV[2])")
//  sb.WriteString("\nend")
//  sb.WriteString("\nif tonumber(c) == 1 then")
//  sb.WriteString("\nreturn c;")
//
//  return sb.String()
//}

func (lim *redisCounterLimiter) Acquire() error {
    panic("implement me")
}

func (lim *redisCounterLimiter) TryAcquire() (success bool) {
    defer func() {
        // 一般是 redis 连接断了,会触发空指针
        if err := recover(); err != nil {
            //log.Errorw("TryAcquire err", common.ERR, err)
            //success = lim.degradeTryAcquire()
            //return
            success = true
        }

        // 没有错误,判断是否开启了 local 如果开启了,把它停掉
        //if lim.localLim != nil {
        //  // stop 线程安全
        //  lim.localLim.Stop()
        //}
    }()

    count, err := lim.redisClient.IncrBy(lim.limit.Key(), 1)
    //panic("模拟 redis 出错")
    if err != nil {
        log.Errorw("TryAcquire err", common.ERR, err)
        panic(err)
    }

    // *2 是为了保留久一点,便于观察
    err = lim.redisClient.Expire(lim.limit.Key(), int(2 * lim.limit.Period().Seconds()))
    if err != nil {
        log.Errorw("TryAcquire error", common.ERR, err)
        panic(err)
    }

    // 业务正确的情况下 确认超限
    if int32(count) > lim.limitCount {
        return false
    }

    return true

    //keys := []string{lim.limit.Key()}
    //
    //log.Errorw("TryAcquire ", keys, lim.limit.Count(), lim.limit.Period().Seconds())
    //count, err := lim.redisClient.Eval(lim.script, keys, lim.limit.Count(), lim.limit.Period().Seconds())
    //if err != nil {
    //  log.Errorw("TryAcquire error", common.ERR, err)
    //  return false
    //}
    //
    //
    //typeName := reflect.TypeOf(count).Name()
    //log.Errorw(typeName)
    //
    //if count != nil && count.(int32) <= lim.limitCount {
    //
    //  return true
    //}
    //return false
}

func (lim *redisCounterLimiter) Stop() {
    // 判断是否开启了 local 如果开启了,把它停掉
    if lim.localLim != nil {
        // stop 线程安全
        lim.localLim.Stop()
    }
}

func (lim *redisCounterLimiter) degradeTryAcquire() bool {
    lim.once.Do(func() {
        count := lim.limit.Count() / lim.limit.ClusterNum()
        limit := LocalLimit {
            name: lim.limit.Name(),
            key: lim.limit.Key(),
            count: count,
            period: lim.limit.Period(),
            limitType: lim.limit.LimitType(),
        }

        lim.localLim = NewLimiter(&limit)
    })

    return lim.localLim.TryAcquire()
}

代码里回退的部分注释了,因为线上为了稳定,实习生的代码毕竟,所以先不跑。

本来原有的思路是直接用 lua 脚本在 redis 上保证原子操作,但是底层封装的库对于直接调 eval 跑的时候,会抛错,而且 source 是 go-redis 里面,赶 ddl 没有时间去 debug,所以只能用 incrBy + expire 分开来。

5. RedisTokenBucketLimiter

令牌桶的状态变量得放在一个 线程安全/一致 的地方,redis 是不二人选。但是令牌桶的算法核心是个延迟计算得到令牌数量,这个是一个很长的临界区,所以要么用分布式锁,要么直接利用 redis 的单线程以原子方式跑。一般业界是后者,即 lua 脚本维护令牌桶的状态变量、计算令牌。代码类似这种

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local intval = tonumber(ARGV[5])

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2) * intval

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
if allowed then
  new_tokens = filled_tokens - requested
end

redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

return { allowed, new_tokens }

到此这篇关于go实现一个分布式限流器的方法步骤的文章就介绍到这了,更多相关go 分布式限流器内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

go实现一个分布式限流器的方法步骤

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

基于Redis实现分布式应用限流的方法

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务。前几天在DD的公众号,看了一篇关于使用 瓜娃 实现单应用限流的方案 --》原文,参考《redis in action》 实
2023-05-30

Go语言开发实现分布式流式计算系统的方法与实践

Go语言是一种自由、开源的编程语言,它以其高效的并发模型和简洁的代码风格而广受开发者的喜爱。在分布式计算领域,Go语言也展现出了其强大的开发能力和适用性。本文将介绍使用Go语言开发实现分布式流式计算系统的方法与实践。一、分布式流式计算系统概
Go语言开发实现分布式流式计算系统的方法与实践
2023-11-20

实现一个响应式布局的原理与方法

页面响应式布局的原理与实现方法随着移动设备的普及和互联网的快速发展,越来越多的用户开始使用手机、平板等移动设备浏览网页。而传统的固定布局往往无法适应不同屏幕尺寸的设备,导致用户体验不佳。为了解决这个问题,响应式布局应运而生。响应式布局的
实现一个响应式布局的原理与方法
2024-01-29

阿里云一台跑两个服务器的实现方法与步骤

在云计算的环境中,一台服务器通常需要支持多个虚拟机或者容器来运行多个应用,以提高资源利用效率。本文将详细说明如何在阿里云上实现一台服务器跑两个服务器的操作步骤,以帮助读者更好地理解并使用云计算。正文:在阿里云上,一台服务器可以运行多个服务器的实例。这种操作方式被称为虚拟机的复制,即克隆虚拟机。克隆虚拟机不仅可以实
阿里云一台跑两个服务器的实现方法与步骤
2023-11-10

在Redis数据库中实现分布式速率限制的方法

问题 在许多应用中,对昂贵的资源的访问必须加以限制,此时速率限制是必不可少的。许多现代网络应用程序在多个进程和服务器上运行,状态需要被共享。一个理想的解决方案应该是高效、 快捷的,而不是依赖于被绑定到特定客户端的单个应用程序服务器(由于负载
2022-06-04

Spring Cloud分布式定时器之ShedLock的实现方法

这篇文章主要介绍了Spring Cloud分布式定时器之ShedLock的实现方法,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。ShedLockShedLock是一个在分布式
2023-06-08

Go语言开发实现分布式日志分析系统的方法与技巧

Go语言开发实现分布式日志分析系统的方法与技巧摘要:随着大数据时代的到来,日志分析成为了企业必不可少的一项工作。本文介绍了以Go语言为基础,开发实现分布式日志分析系统的方法与技巧。文章从系统架构设计、数据收集、分布式处理、数据存储与查询等方
Go语言开发实现分布式日志分析系统的方法与技巧
2023-11-20

Go语言开发实现分布式日志收集系统的方法与实践

随着互联网时代的到来,日志分析已经成为互联网公司的重要组成部分。日志的规模庞大,分散在多个服务器上,如何高效地收集并进行数据分析成为了互联网公司共同面对的问题。本文将介绍使用Go语言开发实现分布式日志收集系统的方法与实践。一、日志分析的重要
Go语言开发实现分布式日志收集系统的方法与实践
2023-11-20

Go语言开发实现分布式任务调度系统的方法与实践

Go语言开发实现分布式任务调度系统的方法与实践随着互联网的高速发展,大规模系统的任务调度成为了现代计算领域中的重要问题。传统的单机调度已经无法满足大规模任务的需求,而分布式任务调度系统的出现有效地解决了这一问题。本文将介绍如何使用Go语言开
Go语言开发实现分布式任务调度系统的方法与实践
2023-11-20

Go语言开发实现分布式消息队列系统的方法与实践

在当今互联网高并发和大规模数据处理的背景下,分布式消息队列系统作为一种重要的中间件技术日益受到关注。它可以有效地缓解系统压力,提高系统的可扩展性和可靠性。Go语言由于其并发模型、高性能和简洁的特点,在开发分布式消息队列系统中具有独特的优势。
Go语言开发实现分布式消息队列系统的方法与实践
2023-11-20

Golang 函数在分布式系统中实现数据一致性的方法

问题:如何在 go 中使用函数实现分布式系统数据一致性?答案:使用函数类型声明和使用函数:func(param_type_1 param_name_1, ..., param_type_n param_name_n) return_type
Golang 函数在分布式系统中实现数据一致性的方法
2024-04-19

Go语言开发实现分布式日志收集系统的方法与技巧

Go语言是近年来备受关注和推崇的一种编程语言,其简洁、高效、并发特性使其在分布式系统开发领域有着广泛的应用。本文将介绍使用Go语言开发实现分布式日志收集系统的方法与技巧。一、分布式日志收集系统的基本概念分布式日志收集系统主要用于收集多个服务
Go语言开发实现分布式日志收集系统的方法与技巧
2023-11-20

如何使用Python中的异步IO和协程实现一个高并发的分布式任务调度系统

如何使用Python中的异步IO和协程实现一个高并发的分布式任务调度系统在当今高速发展的信息时代,分布式系统变得越来越普遍。而高并发的任务调度系统也成为许多企业和组织中不可或缺的一部分。本文以Python为例,介绍了如何使用异步IO和协程来
2023-10-27

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录