我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RocketMQ设计之故障规避机制的示例分析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RocketMQ设计之故障规避机制的示例分析

这篇文章给大家分享的是有关RocketMQ设计之故障规避机制的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端。故障规避机制就是用来解决当Broker出现故障,Producer不能及时感知而导致消息发送失败的问题。默认不开启,如果开启,消息发送失败的时候会将失败的Broker暂时排除在队列选择列表外

MQFaultStrategy类的:

public class MQFaultStrategy {    private final static InternalLogger log = ClientLogger.getLog();    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();    private boolean sendLatencyFaultEnable = false;    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};    public long[] getNotAvailableDuration() {        return notAvailableDuration;    }    public void setNotAvailableDuration(final long[] notAvailableDuration) {        this.notAvailableDuration = notAvailableDuration;    }    public long[] getLatencyMax() {        return latencyMax;    }    public void setLatencyMax(final long[] latencyMax) {        this.latencyMax = latencyMax;    }    public boolean isSendLatencyFaultEnable() {        return sendLatencyFaultEnable;    }    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {        this.sendLatencyFaultEnable = sendLatencyFaultEnable;    }    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {        //是否开启故障延迟机制        if (this.sendLatencyFaultEnable) {            try {                int index = tpInfo.getSendWhichQueue().getAndIncrement();                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                    if (pos < 0)                        pos = 0;                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                    //判断Queue是否可用                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                            return mq;                    }                }                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);                if (writeQueueNums > 0) {                    final MessageQueue mq = tpInfo.selectOneMessageQueue();                    if (notBestBroker != null) {                        mq.setBrokerName(notBestBroker);                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                    }                    return mq;                } else {                    latencyFaultTolerance.remove(notBestBroker);                }            } catch (Exception e) {                log.error("Error occurred when selecting message queue", e);            }            return tpInfo.selectOneMessageQueue();        }        //默认轮询        return tpInfo.selectOneMessageQueue(lastBrokerName);    }    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {        if (this.sendLatencyFaultEnable) {            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);        }    }    private long computeNotAvailableDuration(final long currentLatency) {        for (int i = latencyMax.length - 1; i >= 0; i--) {            if (currentLatency >= latencyMax[i])                return this.notAvailableDuration[i];        }        return 0;    }}

在选择查找路由时,选择消息队列的关键步骤:

  • 先按轮询算法选择一个消息队列

  • 从故障列表判断该消息队列是否可用

LatencyFaultToleranceImpl中判断是否可用:

@Overridepublic boolean isAvailable(final String name) {    final FaultItem faultItem = this.faultItemTable.get(name);    if (faultItem != null) {        return faultItem.isAvailable();    }    return true;}public boolean isAvailable() {            return (System.currentTimeMillis() - startTimestamp) >= 0;        }
  • 判断是否在故障列表中,不在故障列表中代表可用。

  • 在故障列表中判断当前时间是否大于等于故障规避的开始时间startTimestamp

在消息发送结束后和发送出现异常时调用updateFaultItem()方法来更新故障列表,computeNotAvailableDuration()根据响应时间来计算故障周期时长,响应时间越长故障周期越长。网络异常、Broker异常、客户端异常都是固定响应时长30s,它们故障周期时长为10分钟。消息发送成功或线程中断异常响应时间在100毫秒以内,故障周期时长为0。

LatencyFaultToleranceImpl类的updateFaultItem方法:

@Overridepublic void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {    FaultItem old = this.faultItemTable.get(name);    if (null == old) {        final FaultItem faultItem = new FaultItem(name);        faultItem.setCurrentLatency(currentLatency);        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);        //加入故障列表        old = this.faultItemTable.putIfAbsent(name, faultItem);        if (old != null) {            old.setCurrentLatency(currentLatency);            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);        }    } else {        old.setCurrentLatency(currentLatency);        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);    }}

FaultItem存储Broker名称、响应时长、故障规避开始时间,最重要的是故障规避开始时间,用来判断Queue是否可用

感谢各位的阅读!关于“RocketMQ设计之故障规避机制的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

RocketMQ设计之故障规避机制的示例分析

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RocketMQ设计之故障规避机制的示例分析

这篇文章给大家分享的是有关RocketMQ设计之故障规避机制的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端。故障规避机
2023-06-29

RocketMQ设计之同步刷盘的示例分析

这篇文章主要介绍RocketMQ设计之同步刷盘的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件。CommitLog的handleD
2023-06-29

Web登录认证类漏洞分析和安全验证机制设计的示例分析

本篇文章为大家展示了Web登录认证类漏洞分析防御总结和安全验证机制设计的示例分析,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。web登录认证方面,从子功能上可以划分为登录框登录、忘记密码(密码重置)
2023-06-17

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录