我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Pulsar升级自动化:一键搞定集群升级与测试

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Pulsar升级自动化:一键搞定集群升级与测试

背景

由于我在公司内部负责维护 Pulsar,需要时不时的升级 Pulsar 版本从而和社区保持一致。

而每次升级过程都需要做相同的步骤:

  • 安装一个新版本的集群
  • 触发功能性测试
  • 触发性能测试
  • 查看监控是否正常
  • 应用有无异常日志
  • 流量是否正常
  • 各个组件的内存占用是否正常
  • 写入延迟是否正常

命令行工具

以上的流程步骤最好是全部一键完成,我们只需要人工检测下监控是否正常即可。

于是我便写了一个命令行工具,执行流程如下:

pulsar-upgrade-cli -h                                                                                                  ok | at 10:33:18 
A cli app for upgrading Pulsar

Usage:
  pulsar-upgrade-cli [command]

Available Commands:
  completion  Generate the autocompletion script for the specified shell
  help        Help about any command
  install     install a target version
  scale       scale statefulSet of the cluster

Flags:
      --burst-limit int                 client-side default throttling limit (default 100)
      --debug                           enable verbose output
  -h, --help                            help for pulsar-upgrade-cli
      --kube-apiserver string           the address and the port for the Kubernetes API server
      --kube-as-group stringArray       group to impersonate for the operation, this flag can be repeated to specify multiple groups.
      --kube-as-user string             username to impersonate for the operation

真实使用的 example 如下:

pulsar-upgrade-cli install \                                                   
        --values ./charts/pulsar/values.yaml \
        --set namespace=pulsar-test \
        --set initialize=true \
        --debug \
        --test-case-schema=http \
        --test-case-host=127.0.0.1 \
        --test-case-port=9999 \
    pulsar-test ./charts/pulsar -n pulsar-test

它的安装命令非常类似于 helm,也是直接使用 helm 的 value.yaml 进行安装;只是在安装成功后(等待所有的 Pod 都处于 Running 状态)会再触发 test-case 测试,也就是请求一个 endpoint。

这个 endpoint 会在内部处理所有的功能测试和性能测试,具体细节就在后文分析。

同时还提供了一个 scale(扩、缩容) 命令,可以用修改集群规模:

# 缩容集群规模为0
./pulsar-upgrade-cli scale --replicase 0 -n pulsar-test
# 缩容为最小集群
./pulsar-upgrade-cli scale --replicase 1 -n pulsar-test
# 恢复为最满集群
./pulsar-upgrade-cli scale --replicase 2 -n pulsar-test

这个需求是因为我们的 Pulsar 测试集群部署在了一个 servless 的 kubernetes 集群里,它是按照使用量收费的,所以在我不需要的使用的时候可以通过这个命令将所有的副本数量修改为 0,从而减少使用成本。

当只需要做简单的功能测试时便回将集群修改为最小集群,将副本数修改为只可以提供服务即可。

而当需要做性能测试时就需要将集群修改为最高配置。

这样可以避免每次都安装新集群,同时也可以有效的减少测试成本。

实现原理

require (  
    github.com/spf13/cobra v1.6.1  
    github.com/spf13/pflag v1.0.5   
    helm.sh/helm/v3 v3.10.2
)

这个命令行工具本质上是参考了 helm 的命令行实现的,所有主要也是依赖了 helm 和 cobra。

下面以最主要的安装命令为例,核心的是以下的步骤:

  • 执行 helm 安装(这里是直接使用的 helm 的源码逻辑进行安装)
  • 等待所有的 Pod 成功运行
  • 触发 test-case 执行
  • 等待测试用例执行完毕
  • 检测是否需要卸载安装的集群
func (e *installEvent) FinishInstall(cfg *action.Configuration, name string) error {  
    bar.Increment()  
    bar.Finish()  
  
    clientSet, err := cfg.KubernetesClientSet()  
    if err != nil {  
       return err  
    }  
    ctx := context.Background()  
    ip, err := GetServiceExternalIp(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-proxy", name))  
    if err != nil {  
       return err  
    }  
  
    token, err := GetPulsarProxyToken(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-token-proxy-admin", name))  
    if err != nil {  
       return err  
    }  
    // trigger testcase  
    err = e.client.Trigger(context.Background(), ip, token)  
    return err  
}

这里的 FinishInstall 需要获取到新安装的 Pulsar 集群的 proxy IP 地址和鉴权所使用的 token(GetServiceExternalIp()/GetPulsarProxyToken())。

将这两个参数传递给 test-case 才可以构建出 pulsar-client.

这个命令的核心功能就是安装集群和触发测试,以及一些集群的基本运维能力。

测试框架

而关于这里的测试用例也有一些小伙伴咨询过,如何对 Pulsar 进行功能测试。

其实 Pulsar 源码中已经包含了几乎所有我们会使用到的测试代码,理论上只要新版本的官方镜像已经推送了那就是跑了所有的单测,质量是可以保证的。

那为什么还需要做功能测试呢?

其实很很简单,Pulsar 这类基础组件官方都有提供基准测试,但我们想要用于生产环境依然需要自己做压测得出一份属于自己环境下的性能测试报告。

根本目的是要看在自己的业务场景下是否可以满足(包括公司的软硬件,不同的业务代码)。

所以这里的功能测试代码有一个很重要的前提就是:需要使用真实的业务代码进行测试。

也就是业务在线上使用与 Pulsar 相关的代码需要参考功能测试里的代码实现,不然有些问题就无法在测试环节覆盖到。

这里我就踩过坑,因为在功能测试里用的是官方的 example 代码进行测试的,自然是没有问题;但业务在实际使用时,使用到了一个 Schema 的场景,并没有在功能测试里覆盖到(官方的测试用例里也没有😂),就导致升级到某个版本后业务功能无法正常使用(虽然用法确实是有问题),但应该在我测试阶段就暴露出来。

实现原理

以上是一个集群的功能测试报告,这里我只有 8 个测试场景(结合实际业务使用),考虑到未来可能会有新的测试用例,所以在设计这个测试框架时就得考虑到扩展性。

AbstractJobDefine job5 =  
        new FailoverConsumerTest(event, "故障转移消费测试", pulsarClient, 20, admin);  
CompletableFuture c5 = CompletableFuture.runAsync(job5::start, EXECUTOR);  
AbstractJobDefine job6 = new SchemaTest(event,"schema测试",pulsarClient,20,prestoService);  
CompletableFuture c6 = CompletableFuture.runAsync(job6::start, EXECUTOR);  
AbstractJobDefine job7 = new VlogsTest(event,"vlogs test",pulsarClient,20, vlogsUrl);  
CompletableFuture c7 = CompletableFuture.runAsync(job7::start, EXECUTOR);  
  
CompletableFuture all = CompletableFuture.allOf(c1, c2, c3, c4, c5, c6, c7);  
all.whenComplete((___, __) -> {  
    event.finishAll();  
    pulsarClient.closeAsync();  
    admin.close();  
}).get();

对外提供的 trigger 接口就不贴代码了,重点就是在这里构建测试任务,然后等待他们全部执行完毕。

@Data
public abstract class AbstractJobDefine {
    private Event event;
    private String jobName;
    private PulsarClient pulsarClient;

    private int timeout;

    private PulsarAdmin admin;

    public AbstractJobDefine(Event event, String jobName, PulsarClient pulsarClient, int timeout, PulsarAdmin admin) {
        this.event = event;
        this.jobName = jobName;
        this.pulsarClient = pulsarClient;
        this.timeout = timeout;
        this.admin = admin;
    }

    public void start() {
        event.addJob();
        try {
            CompletableFuture.runAsync(() -> {
                StopWatch watch = new StopWatch();
                try {
                    watch.start(jobName);
                    run(pulsarClient, admin);
                } catch (Exception e) {
                    event.oneException(this, e);
                } finally {
                    watch.stop();
                    event.finishOne(jobName, StrUtil.format("cost: {}s", watch.getTotalTimeSeconds()));
                }
            }, TestCase.EXECUTOR).get(timeout, TimeUnit.SECONDS);
        } catch (Exception e) {
            event.oneException(this, e);
        }
    }


    
    public abstract void run(PulsarClient pulsarClient, PulsarAdmin admin) throws Exception;
}

核心代码就是这个抽象的任务定义类,其中的 start 函数用于定义任务执行的模版:

  • 添加任务:具体实现是任务计数器+1
  • 开始计时
  • 执行抽血的 run 函数,具体实现交给子类
  • 异常时记录事件
  • 正常执行完毕后也记录事件

下面来看一个普通用例的实现情况:

就是重写了 run() 函数,然后在其中实现具体的测试用例,断言测试结果。

这样当我们需要再添加用例的时候只需要再新增一个子类实现即可。

同时还需要定义一个事件接口,用于处理一些关键的节点:

public interface Event {  
  
      
    void addJob();  
  
      
    TestCaseRuntimeResponse getRuntime();  
  
      
    void finishOne(String jobName, String finishCost);  
  
      
    void oneException(AbstractJobDefine jobDefine, Exception e);  
  
      
    void finishAll();  
}

其中 getRuntime 接口是用于在 cli 那边查询任务是否执行完毕的接口,只有任务执行完毕之后才能退出 cli。

监控指标

当这些任务运行完毕后我们需要重点查看应用客户端和 Pulsar broker 端是否有异常日志。

同时还需要观察一些关键的监控面板:

包含但不限于:

  • 消息吞吐量
  • broker 写入延迟
  • Bookkeeper 的写入、读取成功率,以及延迟。

当然还有 zookeeper 的运行情况也需要监控,限于篇幅就不一一粘贴了。

以上就是测试整个 Pulsar 集群的流程,当然还有一些需要优化的地方。

比如使用命令行还是有些不便,后续可能会切换到网页上就可以操作。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Pulsar升级自动化:一键搞定集群升级与测试

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Pulsar升级自动化:一键搞定集群升级与测试

安装命令非常类似于 Helm,也是直接使用 helm 的 value.yaml 进行安装;只是在安装成功后(等待所有的 Pod 都处于 Running 状态)会再触发 test-case 测试,也就是请求一个 endpoint。

自动化测试再升级,大模型与软件测试相结合

大模型+测试正掀起软件测试智能化的浪潮。通过大模型的智能化技术,软件测试可以实现更高效、更准确的测试过程,提高软件质量和可靠性。
软件测试2024-11-30

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录