我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RocketMQ producer容错机制源码分析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RocketMQ producer容错机制源码分析

这篇文章主要介绍“RocketMQ producer容错机制源码分析”,在日常操作中,相信很多人在RocketMQ producer容错机制源码分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ producer容错机制源码分析”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

1. 前言

这里有两个点,一个是关于消息的处理,一个是关于broker的处理,比如说发送消息到broker-a的broker失败了,我们可能下次就不想发送到这个broker-a,这就涉及到一个选择broker的问题,也就是选择MessageQueue的问题。

2. 失败重试

其实失败重试我们在介绍RocketMQ消息生产者发送消息的时候介绍过了,其实同步发送与异步发送都会失败重试的,比如说我发送一个消息,然后超时了,这时候在MQProducer层就会进行控制重试,默认是重试2次的,加上你发送那次,一共是发送3次,如果重试完还是有问题的话,这个时候就会抛出异常了。

我们来看下这一块的代码实现( DefaultMQProducerImpl 类sendDefaultImpl方法):

RocketMQ producer容错机制源码分析

这块其实就是用for循环实现的,其实不光RocketMQ,分布式远程调用框架Dubbo的失败重试也是用for循环实现的。

3. 延迟故障

我们都知道,在RocketMQ中一个topic其实是有多个MessageQueue这么一个概念的,然后这些MessageQueue可能对应着不同的broker name,比如说id是0和1的MessageQueue 对应的broker name是 broker-a ,然后id是2和3的MessageQueue对应的broker name 是broker-b

我们发送消息的时候,其实涉及到发送给哪个MessageQueue这么一个问题,当然我们可以在发送消息的时候指定这个MessageQueue,如果你不指定的话,RocketMQ就会根据MQFaultStrategy 这么一个策略类给选择出来一个MessageQueue。

我们先来看下是在哪里选择的,其实就是在我们重试的循环中: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

...// 重试发送for (; times < timesTotal; times++) {    String lastBrokerName = null == mq ? null : mq.getBrokerName();    // todo 选择message queue    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);    ...

我们可以看到,它会把topicPublishInfo 与 lastBrokerName 作为参数传进去,topicPublishInfo 里面其实就是那一堆MessageQueue, 然后这个lastBrokerName 是上次我们选择的那个broker name , 这个接着我们来看下这个selectOneMessageQueue实现:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {    // todo    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);}

可以看到它调用了MQFaultStrategy 这个类的selectOneMessageQueue 方法,我们接着进去:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {    // 发送延迟故障启用,默认为false    if (this.sendLatencyFaultEnable) {        try {            // 获取一个index            int index = tpInfo.getSendWhichQueue().getAndIncrement();            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                if (pos < 0)                    pos = 0;                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                // 选取的这个broker是可用的 直接返回                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))                    return mq;            }            // 到这里 找了一圈 还是没有找到可用的broker            // todo 选择 距离可用时间最近的            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);            if (writeQueueNums > 0) {                final MessageQueue mq = tpInfo.selectOneMessageQueue();                if (notBestBroker != null) {                    mq.setBrokerName(notBestBroker);                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                }                return mq;            } else {                latencyFaultTolerance.remove(notBestBroker);            }        } catch (Exception e) {            log.error("Error occurred when selecting message queue", e);        }        return tpInfo.selectOneMessageQueue();    }    // todo    return tpInfo.selectOneMessageQueue(lastBrokerName);}

这种延迟故障策略其实是由sendLatencyFaultEnable来控制的,它默认是关闭的。

3.1 最普通的选择策略

我们先来看下最普通的选择策略,可以看到调用了TopicPublishInfo 的selectOneMessageQueue方法:

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {    // 消息第一个发送的时候 还没有重试 也没有上一个brokerName    if (lastBrokerName == null) {        return selectOneMessageQueue();    } else {        // 这个 出现在重试的时候        for (int i = 0; i < this.messageQueueList.size(); i++) {            int index = this.sendWhichQueue.getAndIncrement();            int pos = Math.abs(index) % this.messageQueueList.size();            if (pos < 0)                pos = 0;            MessageQueue mq = this.messageQueueList.get(pos);            // 避开 上次发送的brokerName            if (!mq.getBrokerName().equals(lastBrokerName)) {                return mq;            }        }        // todo 到最后 没有避开  只能随机选一个        return selectOneMessageQueue();    }}

它这里里面分成了2部分,一个是没有 这个lastBroker的,也就是这个这个消息还没有被重试过,这是第一次发送这个消息,这个时候它的lastBrokerName就是null,然后他就会直接走selectOneMessageQueue 这个无参方法。

public MessageQueue selectOneMessageQueue() {    // 相当于 某个线程轮询    int index = this.sendWhichQueue.getAndIncrement();    int pos = Math.abs(index) % this.messageQueueList.size();    if (pos < 0)        pos = 0;    return this.messageQueueList.get(pos);}

先是获取这个index ,然后使用index % MessageQueue集合的大小获得一个MessageQueue集合值的一个下标(索引),这个index 其实某个线程内自增1的,这样就形成了某个线程内轮询的效果。这个样子的话,同步发送其实就是单线程的轮询,异步发送就是多个线程并发发送,然后某个线程内轮询,我们看下他这个单个线程自增1效果是怎样实现的。

public class ThreadLocalIndex {    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();    private final Random random = new Random();    public int getAndIncrement() {        Integer index = this.threadLocalIndex.get();        // 如果不存在就创建  然后设置到threadLocalIndex中        if (null == index) {            index = Math.abs(random.nextInt());            this.threadLocalIndex.set(index);        }        index = Math.abs(index + 1);        this.threadLocalIndex.set(index);        return index;    }}

可以看到这个sendWhichQueue 是用ThreadLocal实现的,然后这个样子就可以一个线程一个index,而且不会出现线程安全问题。

好了这里我们就把这个消息第一次发送时候MessageQueue看完了,然后我们再来看下它其他重试的时候是怎样选择的,也就是lastBrokerName不是null的时候:

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {    // 消息第一个发送的时候 还没有重试 也没有上一个brokerName    if (lastBrokerName == null) {        return selectOneMessageQueue();    } else {        // 这个 出现在重试的时候        for (int i = 0; i < this.messageQueueList.size(); i++) {            int index = this.sendWhichQueue.getAndIncrement();            int pos = Math.abs(index) % this.messageQueueList.size();            if (pos < 0)                pos = 0;            MessageQueue mq = this.messageQueueList.get(pos);            // 避开 上次发送的brokerName            if (!mq.getBrokerName().equals(lastBrokerName)) {                return mq;            }        }        // todo 到最后 没有避开  只能随机选一个        return selectOneMessageQueue();    }}

这里其实就是选择一个不是lastBrokerName 的MessageQueue,可以看到它是循环 MessageQueue 集合大小数个,这样可能把所有的MessageQueue都看一遍,注意 这个循环只是起到选多少次的作用,具体的选择还是要走某线程轮询的那一套,到最后是在是选不出来了,也就是没有这一堆MessageQueue都是在lastBrokerName上的,只能调用selectOneMessageQueue轮询选一个了。

到这我们就把最普通的选择一个MessageQueue介绍完了。

3.2 延迟故障的实现

下面我们再来介绍下那个延迟故障的实现,这个其实就是根据你这个broker 的响应延迟时间的大小,来影响下次选择这个broker的权重,他不是绝对的,因为根据它这个规则是在找不出来的话,他就会使用那套普通选择算法来找个MessageQueue。

它是这样一个原理:

  • 在每次发送之后都收集一下它这次的一个响应延迟,比如我10点1分1秒200毫秒给broker-a了一个消息,然后到了10点1分1秒900毫秒的时候才收到broker-a 的一个sendResult也就是响应,这个时候他就是700ms的延迟,它会跟你就这个300ms的延迟找到一个时间范围,他就认为你这个broker-a 这个broker 在某个时间段内,比如说30s内是不可用的。然后下次选择的时候,他在第一轮会找那些可用的broker,找不到的话,就找那些上次不是这个broker的,还是找不到的话,他就绝望了,用最普通的方式,也就是上面说的那种轮询算法找一个MessageQueue出来。

接下来我们先来看下它的收集延迟的部分,是这个样子的,还是在这个失败重试里面,然后它会在响应后或者异常后面都加一行代码来收集这些延迟:

...// todo 进行发送sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();// todo isolation 参数为false(看一下异常情况)this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);...

这是正常响应后的,注意它的isolation 参数,也就是隔离 是false,在看下异常的

...catch (RemotingException e) {    endTimestamp = System.currentTimeMillis();    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);    log.warn(msg.toString());    exception = e;    continue;}...

他这个isolation 参数就是true ,也就是需要隔离的意思。

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {    // todo    this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);}

可以看到是调用了mqFaultStrategy 的updateFaultItem 方法:

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {    // 是否开启延迟故障容错    if (this.sendLatencyFaultEnable) {        // todo 计算不可用持续时间        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);        // todo 存储        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);    }}

先是判断是否开启了这个延迟故障的这么一个配置,默认是不启动的,但是你可以自己启动set下就可以了setSendLatencyFaultEnable(true)

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("127.0.0.1:9876");producer.setSendLatencyFaultEnable(true);

首先是计算这个它认为broker不可用的这么一个时间,参数就是你那个响应延迟,熔断的话就配置30000毫秒, 否则的话就是正常的那个响应时间

private long computeNotAvailableDuration(final long currentLatency) {    // latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};    // notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};    // 倒着遍历    for (int i = latencyMax.length - 1; i >= 0; i--) {        // 如果延迟大于某个时间,就返回对应服务不可用时间,可以看出来,响应延迟100ms以下是没有问题的        if (currentLatency >= latencyMax[i])            return this.notAvailableDuration[i];    }    return 0;}

他这个计算规则是这个样子的,他有两个数组,一个是响应延迟的,一个是不可使用的时间,两个排列都是从小到大的顺序,倒着先找响应延迟,如果你这个延迟大于某个时间,就找对应下标的不可使用的时间,比如说响应延迟700ms,这时候他就会找到30000ms不可使用时间。

计算完这个不可使用时间后接着调用了latencyFaultTolerance的updateFaultItem方法,这个方法其实就是用来存储的:

public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {    // 从缓存中获取    FaultItem old = this.faultItemTable.get(name);    // 缓存没有的情况    if (null == old) {        final FaultItem faultItem = new FaultItem(name);        // 设置延迟        faultItem.setCurrentLatency(currentLatency);        // 设置启用时间        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);        // 设置faultItemTable 中        old = this.faultItemTable.putIfAbsent(name, faultItem);        // 如果已经有了,拿到 老的进行更新        if (old != null) {            old.setCurrentLatency(currentLatency);            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);        }    } else {        // 缓存中已经有了,直接拿老的进行更新        old.setCurrentLatency(currentLatency);        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);    }}

他有个faultItemTable 这个缓存,记录着 每个broker的FaultItem的项,这个FaultItem就是保存它能够使用的一个时间(当前时间戳+不可使用时间),其实这个方法就是做更新或者插入操作。

好了到这我们就把它这个收集响应延迟指标与计算可用时间这快就解析完了,再回头看下那个选择MessageQueue的方法:

RocketMQ producer容错机制源码分析

可以看到它先是找那种可用的,然后不是上一个broker的那个,如果好几轮下来没有找到的话就选择一个

public String pickOneAtLeast() {    // 将map中里面的放到tmpList 中    final Enumeration<FaultItem> elements = this.faultItemTable.elements();    List<FaultItem> tmpList = new LinkedList<FaultItem>();    while (elements.hasMoreElements()) {        final FaultItem faultItem = elements.nextElement();        tmpList.add(faultItem);    }    // 如果不是null    if (!tmpList.isEmpty()) {        // 洗牌算法        Collections.shuffle(tmpList);        // 排序        Collections.sort(tmpList);        final int half = tmpList.size() / 2;        // 没有 2台机器        if (half <= 0) {            // 选择第一个            return tmpList.get(0).getName();        } else {            // 有2台机器及以上,某个线程内随机选排在前半段的broker            final int i = this.whichItemWorst.getAndIncrement() % half;            return tmpList.get(i).getName();        }    }    return null;}

先是排序,然后将所有的broker/2 ,如果是小于等于0的话,说明就2个broker以下,选第一个,如果是2台以上,就轮询选一个

先来看下排序规则:

class FaultItem implements Comparable<FaultItem> {    // 条目唯一键,这里是brokerName    private final String name;    // todo currentLatency 和startTimestamp  被volatile修饰    // 本次消息发送的延迟时间    private volatile long currentLatency;    // 故障规避的开始时间    private volatile long startTimestamp;    public FaultItem(final String name) {        this.name = name;    }    @Override    public int compareTo(final FaultItem other) {        // 将能提供服务的放前面        if (this.isAvailable() != other.isAvailable()) {            if (this.isAvailable())                return -1;            if (other.isAvailable())                return 1;        }        // 找延迟低的 放前面        if (this.currentLatency < other.currentLatency)            return -1;        else if (this.currentLatency > other.currentLatency) {            return 1;        }        // 找最近能提供服务的  放前面        if (this.startTimestamp < other.startTimestamp)            return -1;        else if (this.startTimestamp > other.startTimestamp) {            return 1;        }        return 0;    }

它是把能提供服务的放前面,然后没有,就找那种延迟低的放前面,也没有的话就找最近能提供服务的放前头。 找到这个broker 之后然后根据这个broker name 获取写队列的个数,其实你这个写队列个数有几个,然后你这个broker对应的MessageQueue就有几个,如果write size >0的话,然后这个broker 不是null,就找一个mq,然后设置上它的broker name 与queue id

如果write<=0,直接移除这个broker对应FaultItem,最后实在是找不到就按照上面那种普通方法来找了。

到此,关于“RocketMQ producer容错机制源码分析”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

RocketMQ producer容错机制源码分析

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RocketMQ producer容错机制源码分析

这篇文章主要介绍“RocketMQ producer容错机制源码分析”,在日常操作中,相信很多人在RocketMQ producer容错机制源码分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketM
2023-07-05

RocketMQ producer容错机制源码解析

这篇文章主要为大家介绍了RocketMQ producer容错机制源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-03-19

RocketMQ producer同步发送和单向发送源码分析

这篇文章主要介绍“RocketMQ producer同步发送和单向发送源码分析”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“RocketMQ producer同步发送和单向发送源码分析”文章能帮助大
2023-07-05

RocketMQ源码分析之Broker过期消息清理机制

这篇文章主要为大家介绍了RocketMQ源码分析之Broker过期消息清理机制示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

DolphinScheduler容错Master源码分析

这篇文章主要为大家介绍了DolphinScheduler容错Master源码分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-02-03

RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

这篇文章主要为大家介绍了RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

如何进行HashMap扩容机制源码分析

这期内容当中小编将会给大家带来有关如何进行HashMap扩容机制源码分析,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。具体看源码之前,我们先简单的说一下HashMap的底层数据结构  1、HashMap底
2023-06-02

DolphinScheduler容错源码分析之Worker

这篇文章主要为大家介绍了DolphinScheduler容错源码分析之Worker,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-02-06

RocketMQ消息存储文件的加载与恢复机制源码分析

这篇文章主要介绍了RocketMQ源码分析之消息存储文件的加载与恢复机制详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

源码分析Android的消息机制

一、引言 ​Android消息机制主要指的是Handler的运行机制,是一块很有意思,也很有研究意义的内容。本文计划在较短的篇幅内,通过一定的源码,分析Android消息机制,并在结尾说点”题外话“,帮助我们理解消息机制在安卓应用中的作用。
2022-06-06

SpringCloud @RefreshScope刷新机制源码分析

今天小编给大家分享一下SpringCloud @RefreshScope刷新机制源码分析的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了
2023-07-05

Vue3响应式机制源码分析

本篇内容介绍了“Vue3响应式机制源码分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!什么是响应式响应式一直都是 Vue 的特色功能之一;
2023-07-06

【Android】事件分发机制源码解析

文章目录1. 分发顺序2.源码分析2.1 Activity中的分发流程dispatchTouchEventonTouchEvent总结2.2 ViewGroup中的分发流程dispatchTouchEventonInterceptTouch
2022-06-06

Android Handler消息派发机制源码分析

注:这里只是说一下sendmessage的一个过程,post就类似的 如果我们需要发送消息,会调用sendMessage方法public final boolean sendMessage(Message msg) {return send
2022-06-06

怎么用Hadoop源码分析心跳机制

这篇文章将为大家详细讲解有关怎么用Hadoop源码分析心跳机制,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。一.心跳机制1. hadoop集群是master/slave模式,master包括
2023-06-17

Android源码分析——ViewGroup的事件分发机制(二)

通过前一篇博客View的事件分发机制,从dispatchTouchEvent说起(一)的介绍相信大家对 Android View 事件的分发机制有了很深的理解。我们知道 Android 中 View 是存在于 Activity。 今天我们继
2022-06-06

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录