我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RocketMQ broker启动流程是什么

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RocketMQ broker启动流程是什么

这篇文章主要介绍“RocketMQ broker启动流程是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“RocketMQ broker启动流程是什么”文章能帮助大家解决问题。

    1. 启动入口

    本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

    前面我们已经分析完了NameServerproducer,从本文开始,我们将分析Broker

    broker的启动类为org.apache.rocketmq.broker.BrokerStartup,代码如下:

    public class BrokerStartup {    ...    public static void main(String[] args) {        start(createBrokerController(args));    }    ...}

    main()方法中,仅有一行代码,这行代码包含了两个操作:

    • createBrokerController(...):创建BrokerController

    • start(...):启动Broker

    接下来我们就来分析这两个操作。

    2. 创建BrokerController

    创建BrokerController的方法为BrokerStartup#createBrokerController,代码如下:

    public static BrokerController createBrokerController(String[] args) {    ...    try {        //解析命令行参数        Options options = ServerUtil.buildCommandlineOptions(new Options());        commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),            new PosixParser());        if (null == commandLine) {            System.exit(-1);        }        // 处理配置        final BrokerConfig brokerConfig = new BrokerConfig();        final NettyServerConfig nettyServerConfig = new NettyServerConfig();        final NettyClientConfig nettyClientConfig = new NettyClientConfig();        // tls安全相关        nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,            String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));        // 配置端口        nettyServerConfig.setListenPort(10911);        // 消息存储的配置        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();        ...        // 将命令行中的配置设置到brokerConfig对象中        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);        // 检查环境变量:ROCKETMQ_HOME        if (null == brokerConfig.getRocketmqHome()) {            System.out.printf("Please set the %s variable in your environment to match                 the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);            System.exit(-2);        }        //省略一些配置        ...        // 创建 brokerController        final BrokerController controller = new BrokerController(            brokerConfig,            nettyServerConfig,            nettyClientConfig,            messageStoreConfig);        controller.getConfiguration().registerConfig(properties);        // 初始化        boolean initResult = controller.initialize();        if (!initResult) {            controller.shutdown();            System.exit(-3);        }        // 关闭钩子,在关闭前处理一些操作        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {            private volatile boolean hasShutdown = false;            private AtomicInteger shutdownTimes = new AtomicInteger(0);            @Override            public void run() {                synchronized (this) {                    if (!this.hasShutdown) {                        ...                        // 这里会发一条注销消息给nameServer                        controller.shutdown();                        ...                    }                }            }        }, "ShutdownHook"));        return controller;    } catch (Throwable e) {        e.printStackTrace();        System.exit(-1);    }    return null;}

    这个方法的代码有点长,但功能并不多,总的来说就三个功能:

    • 处理配置:主要是处理nettyServerConfignettyClientConfig的配置,这块就是一些配置解析的操作,处理方式与NameServer很类似,这里就不多说了。

    • 创建及初始化controller:调用方法controller.initialize(),这块内容我们后面分析。

    • 注册关闭钩子:调用Runtime.getRuntime().addShutdownHook(...),可以在jvm进程关闭前进行一些操作。

    2.1 controller实例化

    BrokerController的创建及初始化是在BrokerStartup#createBrokerController方法中进行,我们先来看看它的构造方法:

    public BrokerController(    final BrokerConfig brokerConfig,    final NettyServerConfig nettyServerConfig,    final NettyClientConfig nettyClientConfig,    final MessageStoreConfig messageStoreConfig) {    // 4个核心配置信息    this.brokerConfig = brokerConfig;    this.nettyServerConfig = nettyServerConfig;    this.nettyClientConfig = nettyClientConfig;    this.messageStoreConfig = messageStoreConfig;    // 管理consumer消费消息的offset    this.consumerOffsetManager = new ConsumerOffsetManager(this);    // 管理topic配置    this.topicConfigManager = new TopicConfigManager(this);    // 处理 consumer 拉消息请求的    this.pullMessageProcessor = new PullMessageProcessor(this);    this.pullRequestHoldService = new PullRequestHoldService(this);    // 消息送达的监听器    this.messageArrivingListener         = new NotifyMessageArrivingListener(this.pullRequestHoldService);    ...    // 往外发消息的组件    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);    ...}

    BrokerController的构造方法很长,基本都是一些赋值操作,代码中已列出关键项,这些包括:

    • 核心配置赋值:主要是brokerConfig/nettyServerConfig/nettyClientConfig/messageStoreConfig四个配置

    • ConsumerOffsetManager:管理consumer消费消息位置的偏移量,偏移量表示消费者组消费该topic消息的位置,后面再消费时,就从该位置后消费,避免重复消费消息,也避免了漏消费消息。

    • topicConfigManagertopic配置管理器,就是用来管理topic配置的,如topic名称,topic队列数量

    • pullMessageProcessor:消息处理器,用来处理消费者拉消息

    • messageArrivingListener:消息送达的监听器,当生产者的消息送达时,由该监听器监听

    • brokerOuterAPI:往外发消息的组件,如向NameServer发送注册/注销消息

    以上这些组件的用处,这里先混个脸熟,我们后面再分析。

    2.2 初始化controller

    我们再来看看初始化操作,方法为BrokerController#initialize

    public boolean initialize() throws CloneNotSupportedException {    // 加载配置文件中的配置    boolean result = this.topicConfigManager.load();    result = result && this.consumerOffsetManager.load();    result = result && this.subscriptionGroupManager.load();    result = result && this.consumerFilterManager.load();    if (result) {        try {            // 消息存储管理组件,管理磁盘上的消息            this.messageStore =                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,                     this.messageArrivingListener, this.brokerConfig);            // 启用了DLeger,就创建DLeger相关组件            if (messageStoreConfig.isEnableDLegerCommitLog()) {                ...            }            // broker统计组件            this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);            //load plugin            MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig,                 brokerStatsManager, messageArrivingListener, brokerConfig);            this.messageStore = MessageStoreFactory.build(context, this.messageStore);            this.messageStore.getDispatcherList().addFirst(                new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));        } catch (IOException e) {            result = false;            log.error("Failed to initialize", e);        }    }    // 加载磁盘上的记录,如commitLog写入的位置、消费者主题/队列的信息    result = result && this.messageStore.load();    if (result) {        // 处理 nettyServer        this.remotingServer = new NettyRemotingServer(            this.nettyServerConfig, this.clientHousekeepingService);        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);        this.fastRemotingServer = new NettyRemotingServer(            fastConfig, this.clientHousekeepingService);        // 创建线程池start... 这里会创建多种类型的线程池        ...        // 处理consumer pull操作的线程池        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(            this.brokerConfig.getPullMessageThreadPoolNums(),            this.brokerConfig.getPullMessageThreadPoolNums(),            1000 * 60,            TimeUnit.MILLISECONDS,            this.pullThreadPoolQueue,            new ThreadFactoryImpl("PullMessageThread_"));        ...        // 创建线程池end...        // 注册处理器        this.registerProcessor();        // 启动定时任务start... 这里会启动好多的定时任务        ...        // 定时将consumer消费到的offset进行持久化操作,即将数据保存到磁盘上        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {            @Override            public void run() {                try {                    BrokerController.this.consumerOffsetManager.persist();                } catch (Throwable e) {                    log.error("schedule persist consumerOffset error.", e);                }            }        }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);        ...        // 启动定时任务end...        ...        // 开启 DLeger 的一些操作        if (!messageStoreConfig.isEnableDLegerCommitLog()) {            ...        }        // 处理tls配置        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {            ...        }        // 初始化一些操作        initialTransaction();        initialAcl();        initialRpcHooks();    }    return result;}

    这个还是很长,关键部分都做了注释,该方法所做的工作如下:

    • 加载配置文件中的配置

    • 赋值与初始化操作

    • 创建线程池

    • 注册处理器

    • 启动定时任务

    这里我们来看下注册处理器的操作this.registerProcessor():

    2.2.1 注册处理器:BrokerController#registerProcessor

    this.registerProcessor()实际调用的方法是BrokerController#registerProcessor,代码如下:

    public void registerProcessor() {        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);    sendProcessor.registerSendMessageHook(sendMessageHookList);    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor,         this.sendMessageExecutor);    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,          this.sendMessageExecutor);    this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor,         this.sendMessageExecutor);    this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,         this.sendMessageExecutor);    ...        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor,         this.pullMessageExecutor);    this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);        ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);    replyMessageProcessor.registerSendMessageHook(sendMessageHookList);    ...}

    这个方法里注册了许许多多的处理器,这里仅列出了与消息相关的内容,如发送消息、回复消息、拉取消息等,后面在处理producer/consumer的消息时,就会用到这些处理器,这里先不展开分析。

    2.2.2 remotingServer注册处理器:NettyRemotingServer#registerProcessor

    我们来看下remotingServer注册处理器的操作,方法为NettyRemotingServer#registerProcessor

    public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {    ...    @Override    public void registerProcessor(int requestCode, NettyRequestProcessor processor,             ExecutorService executor) {        ExecutorService executorThis = executor;        if (null == executor) {            executorThis = this.publicExecutor;        }        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor,                 ExecutorService>(processor, executorThis);        // 注册到processorTable 中        this.processorTable.put(requestCode, pair);    }    ...}

    最终,这些处理器注册到了processorTable中,它是NettyRemotingAbstract的成员变量,定义如下:

    HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>

    这是一个hashMap的结构,keycodevaluePair,该类中有两个成员变量:NettyRequestProcessorExecutorServicecodeNettyRequestProcessor的映射关系就是在hashMap里存储的。

    2.3 注册关闭钩子:Runtime.getRuntime().addShutdownHook(...)

    接着我们来看看注册关闭钩子的操作:

    // 关闭钩子,在关闭前处理一些操作Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {    private volatile boolean hasShutdown = false;    private AtomicInteger shutdownTimes = new AtomicInteger(0);    @Override    public void run() {        synchronized (this) {            if (!this.hasShutdown) {                ...                // 这里会发一条注销消息给nameServer                controller.shutdown();                ...            }        }    }}, "ShutdownHook"));

    跟进BrokerController#shutdown方法:

    public void shutdown() {    // 调用各组件的shutdown方法    ...    // 发送注销消息到NameServer    this.unregisterBrokerAll();    ...    // 持久化consumer的消费偏移量    this.consumerOffsetManager.persist();    // 又是调用各组件的shutdown方法    ...

    这个方法里会调用各组件的shutdown()方法、发送注销消息给NameServer、持久化consumer的消费偏移量,这里我们主要看发送注销消息的方法BrokerController#unregisterBrokerAll:

    private void unregisterBrokerAll() {    // 发送一条注销消息给nameServer    this.brokerOuterAPI.unregisterBrokerAll(        this.brokerConfig.getBrokerClusterName(),        this.getBrokerAddr(),        this.brokerConfig.getBrokerName(),        this.brokerConfig.getBrokerId());}

    继续进入BrokerOuterAPI#unregisterBrokerAll

    public void unregisterBrokerAll(    final String clusterName,    final String brokerAddr,    final String brokerName,    final long brokerId) {    // 获取所有的 nameServer,遍历发送注销消息    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();    if (nameServerAddressList != null) {        for (String namesrvAddr : nameServerAddressList) {            try {                this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);                log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);            } catch (Exception e) {                log.warn("unregisterBroker Exception, {}", namesrvAddr, e);            }        }    }}

    这个方法里,会获取到所有的nameServer,然后逐个发送注销消息,继续进入BrokerOuterAPI#unregisterBroker方法:

    public void unregisterBroker(    final String namesrvAddr,    final String clusterName,    final String brokerAddr,    final String brokerName,    final long brokerId) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,         InterruptedException, MQBrokerException {    UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();    requestHeader.setBrokerAddr(brokerAddr);    requestHeader.setBrokerId(brokerId);    requestHeader.setBrokerName(brokerName);    requestHeader.setClusterName(clusterName);    // 发送的注销消息:RequestCode.UNREGISTER_BROKER    RemotingCommand request = RemotingCommand.createRequestCommand(            c, requestHeader);    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);    assert response != null;    switch (response.getCode()) {        case ResponseCode.SUCCESS: {            return;        }        default:            break;    }    throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);}

    最终调用的是RemotingClient#invokeSync进行消息发送,请求codeRequestCode.UNREGISTER_BROKER,这就与NameServer接收broker的注销消息对应上了。

    3. 启动Broker:start(...)

    我们再来看看Broker的启动流程,处理方法为BrokerController#start

    public void start() throws Exception {    // 启动各组件    // 启动消息存储相关组件    if (this.messageStore != null) {        this.messageStore.start();    }    // 启动 remotingServer,其实就是启动一个netty服务,用来接收producer传来的消息    if (this.remotingServer != null) {        this.remotingServer.start();    }    ...    // broker对外发放消息的组件,向nameServer上报存活消息时使用了它,也是一个netty服务    if (this.brokerOuterAPI != null) {        this.brokerOuterAPI.start();    }    ...    // broker 核心的心跳注册任务    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            try {                BrokerController.this.registerBrokerAll(true, false,                     brokerConfig.isForceRegister());            } catch (Throwable e) {                log.error("registerBrokerAll Exception", e);            }        }        // brokerConfig.getRegisterNameServerPeriod() 值为 1000 * 30,最终计算得到默认30秒执行一次    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)),             TimeUnit.MILLISECONDS);    ...}

    这个方法主要就是启动各组件了,这里列出了几大重要组件的启动:

    • messageStore:消息存储组件,在这个组件里,会启动消息存储相关的线程,如消息的投递操作、commitLog文件的flush操作、comsumeQueue文件的flush操作等

    • remotingServernetty服务,用来接收请求消息,如producer发送过来的消息

    • brokerOuterAPI:也是一个netty服务,用来对外发送消息,如向nameServer上报心跳消息

    • 启动定时任务:brokernameServer发送注册消息

    这里我们重点来看定时任务是如何发送心跳发送的。

    处理注册消息发送的时间间隔如下:

    Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)

    这行代码看着长,但意思就一句话:时间间隔可以自行配置,但不能小于10s,不能大于60s,默认是30s.

    处理消息注册的方法为BrokerController#registerBrokerAll(...),代码如下:

    public synchronized void registerBrokerAll(final boolean checkOrderConfig,         boolean oneway, boolean forceRegister) {    TopicConfigSerializeWrapper topicConfigWrapper             = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();    // 处理topic相关配置    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {        ...    }    // 这里会判断是否需要进行注册    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),        this.getBrokerAddr(),        this.brokerConfig.getBrokerName(),        this.brokerConfig.getBrokerId(),        this.brokerConfig.getRegisterBrokerTimeoutMills())) {        // 进行注册操作            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);    }}

    这个方法就是用来处理注册操作的,不过注册前会先验证下是否需要注册,验证是否需要注册的方法为BrokerController#needRegister, 代码如下:

    private boolean needRegister(final String clusterName,    final String brokerAddr,    final String brokerName,    final long brokerId,    final int timeoutMills) {    TopicConfigSerializeWrapper topicConfigWrapper         = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();    // 判断是否需要进行注册    List&lt;Boolean&gt; changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName,         brokerId, topicConfigWrapper, timeoutMills);    // 有一个发生了变化,就表示需要注册了        boolean needRegister = false;    for (Boolean changed : changeList) {        if (changed) {            needRegister = true;            break;        }    }    return needRegister;}

    这个方法调用了brokerOuterAPI.needRegister(...)来判断broker是否发生了变化,只要一个NameServer上发生了变化,就说明需要进行注册操作。

    brokerOuterAPI.needRegister(...)是如何判断broker是否发生了变化的呢?继续跟进BrokerOuterAPI#needRegister

    public List<Boolean> needRegister(    final String clusterName,    final String brokerAddr,    final String brokerName,    final long brokerId,    final TopicConfigSerializeWrapper topicConfigWrapper,    final int timeoutMills) {    final List<Boolean> changedList = new CopyOnWriteArrayList<>();    // 获取所有的 nameServer    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());        // 遍历所有的nameServer,逐一发送请求        for (final String namesrvAddr : nameServerAddressList) {            brokerOuterExecutor.execute(new Runnable() {                @Override                public void run() {                    try {                        QueryDataVersionRequestHeader requestHeader                             = new QueryDataVersionRequestHeader();                        ...                        // 向nameServer发送消息,命令是 RequestCode.QUERY_DATA_VERSION                        RemotingCommand request = RemotingCommand                            .createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);                        // 把当前的 DataVersion 发到 nameServer                             request.setBody(topicConfigWrapper.getDataVersion().encode());                        // 发请求到nameServer                        RemotingCommand response = remotingClient                            .invokeSync(namesrvAddr, request, timeoutMills);                        DataVersion nameServerDataVersion = null;                        Boolean changed = false;                        switch (response.getCode()) {                            case ResponseCode.SUCCESS: {                                QueryDataVersionResponseHeader queryDataVersionResponseHeader =                                  (QueryDataVersionResponseHeader) response                                  .decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);                                changed = queryDataVersionResponseHeader.getChanged();                                byte[] body = response.getBody();                                if (body != null) {                                    // 拿到 DataVersion                                    nameServerDataVersion = DataVersion.decode(body, D                                        ataVersion.class);                                    // 这里是判断的关键                                    if (!topicConfigWrapper.getDataVersion()                                        .equals(nameServerDataVersion)) {                                        changed = true;                                    }                                }                                if (changed == null || changed) {                                    changedList.add(Boolean.TRUE);                                }                            }                            default:                                break;                        }                        ...                    } catch (Exception e) {                        ...                    } finally {                        countDownLatch.countDown();                    }                }            });        }        try {            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);        } catch (InterruptedException e) {            log.error("query dataversion from nameserver countDownLatch await Exception", e);        }    }    return changedList;}

    这个方法里,先是遍历所有的nameServer,向每个nameServer都发送一条codeRequestCode.QUERY_DATA_VERSION的参数,参数为当前brokerDataVersion,当nameServer收到消息后,就返回nameServer中保存的、与当前broker对应的DataVersion,当两者版本不相等时,就表明当前broker发生了变化,需要重新注册。

    DataVersion是个啥呢?它的部分代码如下:

    public class DataVersion extends RemotingSerializable {    // 时间戳    private long timestamp = System.currentTimeMillis();    // 计数器,可以理解为最近的版本号    private AtomicLong counter = new AtomicLong(0);    public void nextVersion() {        this.timestamp = System.currentTimeMillis();        this.counter.incrementAndGet();    }        @Override    public boolean equals(final Object o) {        if (this == o)            return true;        if (o == null || getClass() != o.getClass())            return false;        final DataVersion that = (DataVersion) o;        if (timestamp != that.timestamp) {            return false;        }        if (counter != null && that.counter != null) {            return counter.longValue() == that.counter.longValue();        }        return (null == counter) && (null == that.counter);    }    ...}

    DataVersionequals()方法来看,只有当timestampcounter都相等时,两个DataVersion对象才相等。那这两个值会在哪里被修改呢?从DataVersion#nextVersion方法的调用情况来看,引起这两个值的变化主要有两种:

    • broker 上新创建了一个 topic

    • topic的发了的变化

    在这两种情况下,DataVersion#nextVersion方法被调用,从而引起DataVersion的改变。DataVersion改变了,就表明当前broker需要向nameServer注册了。

    让我们再回到BrokerController#registerBrokerAll(...)方法:

    public synchronized void registerBrokerAll(final boolean checkOrderConfig,         boolean oneway, boolean forceRegister) {    ...    // 这里会判断是否需要进行注册    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),        this.getBrokerAddr(),        this.brokerConfig.getBrokerName(),        this.brokerConfig.getBrokerId(),        this.brokerConfig.getRegisterBrokerTimeoutMills())) {        // 进行注册操作            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);    }}

    处理注册的方法为BrokerController#doRegisterBrokerAll,稍微看下它的流程:

    private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,        TopicConfigSerializeWrapper topicConfigWrapper) {    // 注册    List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(        this.brokerConfig.getBrokerClusterName(),        this.getBrokerAddr(),        this.brokerConfig.getBrokerName(),        this.brokerConfig.getBrokerId(),        this.getHAServerAddr(),        // 这个对象里就包含了当前broker的版本信息        topicConfigWrapper,        this.filterServerManager.buildNewFilterServerList(),        oneway,        this.brokerConfig.getRegisterBrokerTimeoutMills(),        this.brokerConfig.isCompressedRegister());    ...}

    继续跟下去,最终调用的是BrokerOuterAPI#registerBroker方法:

    private RegisterBrokerResult registerBroker(    final String namesrvAddr,    final boolean oneway,    final int timeoutMills,    final RegisterBrokerRequestHeader requestHeader,    final byte[] body) throws RemotingCommandException, MQBrokerException, RemotingConnectException,     RemotingSendRequestException, RemotingTimeoutException, InterruptedException {    // 构建请求    RemotingCommand request = RemotingCommand        .createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);    request.setBody(body);    // 处理发送操作:sendOneWay    if (oneway) {        try {            // 注册操作            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);        } catch (RemotingTooMuchRequestException e) {            // Ignore        }        return null;        ...    }    ....}

    所以,所谓的注册操作,就是当nameServer发送一条codeRequestCode.REGISTER_BROKER的消息,消息里会带上当前brokertopic信息、版本号等。

    关于“RocketMQ broker启动流程是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注编程网行业资讯频道,小编每天都会为大家更新不同的知识点。

    免责声明:

    ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

    ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

    RocketMQ broker启动流程是什么

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档

    猜你喜欢

    RocketMQ broker启动流程是什么

    这篇文章主要介绍“RocketMQ broker启动流程是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“RocketMQ broker启动流程是什么”文章能帮助大家解决问题。1. 启动入口本系列
    2023-07-05

    RocketMQ源码解析broker 启动流程

    这篇文章主要为大家介绍了RocketMQ源码解析broker启动流程详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-03-23

    RocketMQ broker消息投递流程处理PULL_MESSAGE请求的方法是什么

    这篇文章主要介绍“RocketMQ broker消息投递流程处理PULL_MESSAGE请求的方法是什么”,在日常操作中,相信很多人在RocketMQ broker消息投递流程处理PULL_MESSAGE请求的方法是什么问题上存在疑惑,小编
    2023-07-05

    android启动流程是什么

    Android启动流程是指从手机开机到系统完全启动的过程。具体的流程如下:1. 电源按下:当用户按下电源键时,电源管理芯片会向处理器发送一个启动信号。2. 启动引导加载程序(Bootloader):处理器接收到启动信号后,会从内存中的固定地
    2023-10-11

    springboot启动流程是什么

    Spring Boot 启动流程如下:1. 初始化应用程序上下文:Spring Boot 应用程序启动时,首先会创建一个 Spring 应用程序上下文(ApplicationContext)对象。该上下文对象是整个应用程序的核心,它包含了应
    2023-05-17

    activity启动流程是什么

    Activity启动流程是指在Android应用中启动Activity的一系列操作。它包括以下步骤:1. 调用startActivity()方法:通过调用startActivity()方法,向系统发出启动Activity的请求。2. 系统检
    2023-09-11

    android activity启动流程是什么

    Android Activity的启动流程如下:1. 调用`startActivity()`方法或者`startActivityForResult()`方法启动目标Activity。2. 系统会检查启动目标Activity是否存在,并且是否
    2023-08-08

    android launcher启动流程是什么

    Android Launcher的启动流程如下:1. 用户点击设备上的Home按钮或者通过其他方式启动Launcher应用。2. 系统会检查是否有其他应用正在运行,如果有,则会将其置于后台。3. Launcher应用会被激活并开始启动。4.
    2023-10-20

    Go程序的启动流程是什么

    这篇文章主要讲解了“Go程序的启动流程是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Go程序的启动流程是什么”吧!Go 引导阶段查找入口首先编译上文提到的示例程序:$ GOFLAGS=
    2023-06-15

    Spring容器启动流程是什么

    本篇内容介绍了“Spring容器启动流程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!源码解析考虑到直接看源码是一个非常枯燥无味的过程
    2023-06-15

    Android framework ATMS启动流程是什么

    这篇文章主要介绍“Android framework ATMS启动流程是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Android framework ATMS启动流程是什么”文章能帮助大家解
    2023-07-05

    启动CentOS系统的流程是什么

    本篇内容主要讲解“启动CentOS系统的流程是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“启动CentOS系统的流程是什么”吧!当我们按下开机键后,系统背后的秘密我们是否了解呢?这里,我带
    2023-06-10

    Linux开机启动的流程是什么

    这篇文章主要讲解了“Linux开机启动的流程是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Linux开机启动的流程是什么”吧!Linux开机分为以下6个步骤,分别是BIOS, MBR,
    2023-06-27

    SpringBoot中WEB的启动流程是什么

    这篇文章主要介绍“SpringBoot中WEB的启动流程是什么”,在日常操作中,相信很多人在SpringBoot中WEB的启动流程是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”SpringBoot中WE
    2023-06-29

    Android Service启动绑定流程是什么

    这篇文章主要介绍了Android Service启动绑定流程是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Android Service启动绑定流程是什么文章都会有所收获,下面我们一起来看看吧。一、Ser
    2023-07-05

    Android应用程序的启动流程是什么

    本篇内容介绍了“Android应用程序的启动流程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!应用进程的启动流程本文基于Android
    2023-07-05

    RocketMQ的事务消息发送流程是什么

    本篇内容介绍了“RocketMQ的事务消息发送流程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!事务消息发送流程半消息实现了分布式环境
    2023-07-05

    Android广播Broadcast的启动流程是什么

    这篇“Android广播Broadcast的启动流程是什么”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Android广播B
    2023-07-05

    Linux系统启动的引导流程是什么

    本篇内容介绍了“Linux系统启动的引导流程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成! LINUX是自由开源软件,在LINUX里
    2023-06-12

    编程热搜

    • Python 学习之路 - Python
      一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
      Python 学习之路 - Python
    • chatgpt的中文全称是什么
      chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
      chatgpt的中文全称是什么
    • C/C++中extern函数使用详解
    • C/C++可变参数的使用
      可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
      C/C++可变参数的使用
    • css样式文件该放在哪里
    • php中数组下标必须是连续的吗
    • Python 3 教程
      Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
      Python 3 教程
    • Python pip包管理
      一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
      Python pip包管理
    • ubuntu如何重新编译内核
    • 改善Java代码之慎用java动态编译

    目录