我的编程空间,编程开发者的网络收藏夹
学习永远不晚

zookeeper的Leader选举机制是什么

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

zookeeper的Leader选举机制是什么

本篇内容主要讲解“zookeeper的Leader选举机制是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“zookeeper的Leader选举机制是什么”吧!

    zookeeper

    一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题zookeeper需要Leader选举进行保障数据的强一致性机制和稳定性。

    01Leader选举机制

    Leader选举机制采用半数选举算法。

    每一个zookeeper服务端称之为一个节点,每个节点都有投票权,把其选票投向每一个有选举权的节点,当其中一个节点选举出票数过半,这个节点就会成为Leader,其它节点成为Follower。

    02Leader选举集群配置

    • 重命名zoo_sample.cfg文件为zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg

    • 修改zoo.cfg文件,修改值如下:

    【plain】zoo1.cfg文件内容:dataDir=/export/data/zookeeper-1clientPort=2181server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observerzoo2.cfg文件内容:dataDir=/export/data/zookeeper-2clientPort=2182server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observerzoo3.cfg文件内容:dataDir=/export/data/zookeeper-3clientPort=2183server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observerzoo4.cfg文件内容:dataDir=/export/data/zookeeper-4clientPort=2184server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observer
    • server.第几号服务器(对应myid文件内容)=ip:数据同步端口:选举端口:选举标识

    • participant默认参与选举标识,可不写. observer不参与选举

    在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目录下创建myid文件,文件内容分别写1 ,2,3,4,用于标识sid(全称:Server ID)赋值。

    • 启动三个zookeeper实例:

    • bin/zkServer.sh start conf/zoo1.cfg

    • bin/zkServer.sh start conf/zoo2.cfg

    • bin/zkServer.sh start conf/zoo3.cfg

    • 每启动一个实例,都会读取启动参数配置zoo.cfg文件,这样实例就可以知道其作为服务端身份信息sid以及集群中有多少个实例参与选举。

    03Leader选举流程

    zookeeper的Leader选举机制是什么

    图1 第一轮到第二轮投票流程

    前提:

    设定票据数据格式vote(sid,zxid,epoch)

    • sid是Server ID每台服务的唯一标识,是myid文件内容;

    • zxid是数据事务id号;

    • epoch为选举周期,为方便理解下面讲解内容暂定为1初次选举,不写入下面内容里。

    按照顺序启动sid=1,sid=2节点

    第一轮投票:

    • sid=1节点:初始选票为自己,将选票vote(1,0)发送给sid=2节点;

    • sid=2节点:初始选票为自己,将选票vote(2,0)发送给sid=1节点;

    • sid=1节点:收到sid=2节点选票vote(2,0)和当前自己的选票vote(1,0),首先比对zxid值,zxid越大代表数据最新,优先选择zxid最大的选票,如果zxid相同,选举最大sid。当前投票选举结果为vote(2,0),sid=1节点的选票变为vote(2,0);

    • sid=2节点:收到sid=1节点选票vote(1,0)和当前自己的选票vote(2,0),参照上述选举方式,选举结果为vote(2,0),sid=2节点的选票不变;

    • 第一轮投票选举结束。

    第二轮投票:

    • sid=1节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=2节点;

    • sid=2节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=1节点;

    • sid=1节点:收到sid=2节点选票vote(2,0)和自己的选票vote(2,0), 按照半数选举算法,总共3个节点参与选举,已有2个节点选举出相同选票,推举sid=2节点为Leader,自己角色变为Follower;

    • sid=2节点:收到sid=1节点选票vote(2,0)和自己的选票vote(2,0),按照半数选举算法推举sid=2节点为Leader,自己角色变为Leader。

    这时启动sid=3节点后,集群里已经选举出leader,sid=1和sid=2节点会将自己的leader选票发回给sid=3节点,通过半数选举结果还是sid=2节点为leader。

    3.1 Leader选举采用多层队列架构

    zookeeper选举底层主要分为选举应用层和消息传输队列层,第一层应用层队列统一接收和发送选票,而第二层传输层队列,是按照服务端sid分成了多个队列,是为了避免给每台服务端发送消息互相影响。比如对某台机器发送不成功不会影响正常服务端的发送。

    zookeeper的Leader选举机制是什么

    图2 多层队列上下关系交互流程图

    04解析代码入口类

    通过查看zkServer.sh文件内容找到服务启动类:

    org.apache.zookeeper.server.quorum.QuorumPeerMain

    05选举流程代码解析

    • 加载配置文件QuorumPeerConfig.parse(path);

    针对 Leader选举关键配置信息如下:

    • 读取dataDir目录找到myid文件内容,设置当前应用sid标识,做为投票人身份信息。下面遇到myid变量为当前节点自己sid标识。

      • 设置peerType当前应用是否参与选举

    • new QuorumMaj()解析server.前缀加载集群成员信息,加载allMembers所有成员,votingMembers参与选举成员,observingMembers观察者成员,设置half值votingMembers.size()/2.

    【Java】public QuorumMaj(Properties props) throws ConfigException {        for (Entry<Object, Object> entry : props.entrySet()) {            String key = entry.getKey().toString();            String value = entry.getValue().toString();            //读取集群配置文件中的server.开头的应用实例配置信息            if (key.startsWith("server.")) {                int dot = key.indexOf('.');                long sid = Long.parseLong(key.substring(dot + 1));                QuorumServer qs = new QuorumServer(sid, value);                allMembers.put(Long.valueOf(sid), qs);                if (qs.type == LearnerType.PARTICIPANT)//应用实例绑定的角色为PARTICIPANT意为参与选举                    votingMembers.put(Long.valueOf(sid), qs);                else {                    //观察者成员                    observingMembers.put(Long.valueOf(sid), qs);                }            } else if (key.equals("version")) {                version = Long.parseLong(value, 16);            }        }        //过半基数        half = votingMembers.size() / 2;    }

    QuorumPeerMain.runFromConfig(config) 启动服务;

    QuorumPeer.startLeaderElection() 开启选举服务;

    • 设置当前选票new Vote(sid,zxid,epoch)

    【plain】synchronized public void startLeaderElection(){try {           if (getPeerState() == ServerState.LOOKING) {               //首轮:当前节点默认投票对象为自己               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());           }       } catch(IOException e) {           RuntimeException re = new RuntimeException(e.getMessage());           re.setStackTrace(e.getStackTrace());           throw re;       }//........}
    • 创建选举管理类:QuorumCnxnManager;

    • 初始化recvQueue<Message(sid,ByteBuffer)>接收投票队列(第二层传输队列);

    • 初始化queueSendMap<sid,queue>按sid发送投票队列(第二层传输队列);

    • 初始化senderWorkerMap<sid,SendWorker>发送投票工作线程容器,表示着与sid投票节点已连接;

    • 初始化选举监听线程类QuorumCnxnManager.Listener。

    【Java】//QuorumPeer.createCnxnManager()public QuorumCnxManager(QuorumPeer self,                        final long mySid,                        Map&lt;Long,QuorumPeer.QuorumServer&gt; view,                        QuorumAuthServer authServer,                        QuorumAuthLearner authLearner,                        int socketTimeout,                        boolean listenOnAllIPs,                        int quorumCnxnThreadsSize,                        boolean quorumSaslAuthEnabled) {    //接收投票队列(第二层传输队列)    this.recvQueue = new ArrayBlockingQueue&lt;Message&gt;(RECV_CAPACITY);    //按sid发送投票队列(第二层传输队列)    this.queueSendMap = new ConcurrentHashMap&lt;Long, ArrayBlockingQueue&lt;ByteBuffer&gt;&gt;();    //发送投票工作线程容器,表示着与sid投票节点已连接     this.senderWorkerMap = new ConcurrentHashMap&lt;Long, SendWorker&gt;();    this.lastMessageSent = new ConcurrentHashMap&lt;Long, ByteBuffer&gt;();    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");    if(cnxToValue != null){        this.cnxTO = Integer.parseInt(cnxToValue);    }    this.self = self;    this.mySid = mySid;    this.socketTimeout = socketTimeout;    this.view = view;    this.listenOnAllIPs = listenOnAllIPs;    initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,            quorumSaslAuthEnabled);    // Starts listener thread that waits for connection requests     //创建选举监听线程 接收选举投票请求    listener = new Listener();    listener.setName("QuorumPeerListener");}//QuorumPeer.createElectionAlgorithmprotected Election createElectionAlgorithm(int electionAlgorithm){    Election le=null;    //TODO: use a factory rather than a switch    switch (electionAlgorithm) {    case 0:        le = new LeaderElection(this);        break;    case 1:        le = new AuthFastLeaderElection(this);        break;    case 2:        le = new AuthFastLeaderElection(this, true);        break;    case 3:        qcm = createCnxnManager();// new QuorumCnxManager(... new Listener())        QuorumCnxManager.Listener listener = qcm.listener;        if(listener != null){            listener.start();//启动选举监听线程            FastLeaderElection fle = new FastLeaderElection(this, qcm);            fle.start();            le = fle;        } else {            LOG.error("Null listener when initializing cnx manager");        }        break;    default:        assert false;    }return le;}
    • 开启选举监听线程QuorumCnxnManager.Listener;

    • 创建ServerSockket等待大于自己sid节点连接,连接信息存储到senderWorkerMap<sid,SendWorker>;

    • sid>self.sid才可以连接过来。

    【Java】//上面的listener.start()执行后,选择此方法public void run() {    int numRetries = 0;    InetSocketAddress addr;    Socket client = null;    while((!shutdown) && (numRetries < 3)){        try {            ss = new ServerSocket();            ss.setReuseAddress(true);            if (self.getQuorumListenOnAllIPs()) {                int port = self.getElectionAddress().getPort();                addr = new InetSocketAddress(port);            } else {                // Resolve hostname for this server in case the                // underlying ip address has changed.                self.recreateSocketAddresses(self.getId());                addr = self.getElectionAddress();            }            LOG.info("My election bind port: " + addr.toString());            setName(addr.toString());            ss.bind(addr);            while (!shutdown) {                client = ss.accept();                setSockOpts(client);                LOG.info("Received connection request "                        + client.getRemoteSocketAddress());                // Receive and handle the connection request                // asynchronously if the quorum sasl authentication is                // enabled. This is required because sasl server                // authentication process may take few seconds to finish,                // this may delay next peer connection requests.                if (quorumSaslAuthEnabled) {                    receiveConnectionAsync(client);                } else {//接收连接信息                    receiveConnection(client);                }                numRetries = 0;            }        } catch (IOException e) {            if (shutdown) {                break;            }            LOG.error("Exception while listening", e);            numRetries++;            try {                ss.close();                Thread.sleep(1000);            } catch (IOException ie) {                LOG.error("Error closing server socket", ie);            } catch (InterruptedException ie) {                LOG.error("Interrupted while sleeping. " +                    "Ignoring exception", ie);            }            closeSocket(client);        }    }    LOG.info("Leaving listener");    if (!shutdown) {        LOG.error("As I'm leaving the listener thread, "                + "I won't be able to participate in leader "                + "election any longer: "                + self.getElectionAddress());    } else if (ss != null) {        // Clean up for shutdown.        try {            ss.close();        } catch (IOException ie) {            // Don't log an error for shutdown.            LOG.debug("Error closing server socket", ie);        }    }}//代码执行路径:receiveConnection()->handleConnection(...)private void handleConnection(Socket sock, DataInputStream din)            throws IOException {//...省略     if (sid < self.getId()) {                        SendWorker sw = senderWorkerMap.get(sid);            if (sw != null) {                sw.finish();            }                        LOG.debug("Create new connection to server: {}", sid);            closeSocket(sock);            if (electionAddr != null) {                connectOne(sid, electionAddr);            } else {                connectOne(sid);            }        } else { // Otherwise start worker threads to receive data.            SendWorker sw = new SendWorker(sock, sid);            RecvWorker rw = new RecvWorker(sock, din, sid, sw);            sw.setRecv(rw);            SendWorker vsw = senderWorkerMap.get(sid);            if (vsw != null) {                vsw.finish();            }  //存储连接信息<sid,SendWorker>            senderWorkerMap.put(sid, sw);            queueSendMap.putIfAbsent(sid,                    new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));            sw.start();            rw.start();     }}
    • 创建FastLeaderElection快速选举服务;

    • 初始选票发送队列sendqueue(第一层队列)

    • 初始选票接收队列recvqueue(第一层队列)

    • 创建线程WorkerSender

    • 创建线程WorkerReceiver

    【Java】//FastLeaderElection.starterprivate void starter(QuorumPeer self, QuorumCnxManager manager) {    this.self = self;    proposedLeader = -1;    proposedZxid = -1;    //发送队列sendqueue(第一层队列)    sendqueue = new LinkedBlockingQueue<ToSend>();    //接收队列recvqueue(第一层队列)    recvqueue = new LinkedBlockingQueue<Notification>();    this.messenger = new Messenger(manager);}//new Messenger(manager)Messenger(QuorumCnxManager manager) {    //创建线程WorkerSender    this.ws = new WorkerSender(manager);    this.wsThread = new Thread(this.ws,            "WorkerSender[myid=" + self.getId() + "]");    this.wsThread.setDaemon(true);    //创建线程WorkerReceiver    this.wr = new WorkerReceiver(manager);    this.wrThread = new Thread(this.wr,            "WorkerReceiver[myid=" + self.getId() + "]");    this.wrThread.setDaemon(true);}
    • 开启WorkerSender和WorkerReceiver线程。

    WorkerSender线程自旋获取sendqueue第一层队列元素

    • sendqueue队列元素内容为相关选票信息详见ToSend类;

    • 首先判断选票sid是否和自己sid值相同,相等直接放入到recvQueue队列中;

    • 不相同将sendqueue队列元素转储到queueSendMap<sid,queue>第二层传输队列中。

    【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{//...  public void run() {    while (!stop) {        try {            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);            if(m == null) continue;  //将投票信息发送出去            process(m);        } catch (InterruptedException e) {            break;        }    }    LOG.info("WorkerSender is down");  }}//QuorumCnxManager#toSendpublic void toSend(Long sid, ByteBuffer b) {        if (this.mySid == sid) {         b.position(0);         addToRecvQueue(new Message(b.duplicate(), sid));            } else {                  ArrayBlockingQueue&lt;ByteBuffer&gt; bq = new ArrayBlockingQueue&lt;ByteBuffer&gt;(            SEND_CAPACITY);         ArrayBlockingQueue&lt;ByteBuffer&gt; oldq = queueSendMap.putIfAbsent(sid, bq);         //转储到queueSendMap&lt;sid,queue&gt;第二层传输队列中         if (oldq != null) {             addToSendQueue(oldq, b);         } else {             addToSendQueue(bq, b);         }         connectOne(sid);         }}

    WorkerReceiver线程自旋获取recvQueue第二层传输队列元素转存到recvqueue第一层队列中。

    【Java】//WorkerReceiverpublic void run() {    Message response;    while (!stop) {      // Sleeps on receive      try {          //自旋获取recvQueue第二层传输队列元素          response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);          if(response == null) continue;          // The current protocol and two previous generations all send at least 28 bytes          if (response.buffer.capacity() &lt; 28) {              LOG.error("Got a short response: " + response.buffer.capacity());              continue;          }          //...  if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){         //第二层传输队列元素转存到recvqueue第一层队列中         recvqueue.offer(n);         //...      }    }//...}

    06选举核心逻辑

    • 启动线程QuorumPeer

    开始Leader选举投票makeLEStrategy().lookForLeader();

    sendNotifications()向其它节点发送选票信息,选票信息存储到sendqueue队列中。sendqueue队列由WorkerSender线程处理。

    【plain】//QuorunPeer.run//...try {   reconfigFlagClear();    if (shuttingDownLE) {       shuttingDownLE = false;       startLeaderElection();       }    //makeLEStrategy().lookForLeader() 发送投票    setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {    LOG.warn("Unexpected exception", e);    setPeerState(ServerState.LOOKING);}  //...//FastLeaderElection.lookLeaderpublic Vote lookForLeader() throws InterruptedException {//...  //向其他应用发送投票sendNotifications();//...}private void sendNotifications() {    //获取应用节点    for (long sid : self.getCurrentAndNextConfigVoters()) {        QuorumVerifier qv = self.getQuorumVerifier();        ToSend notmsg = new ToSend(ToSend.mType.notification,                proposedLeader,                proposedZxid,                logicalclock.get(),                QuorumPeer.ServerState.LOOKING,                sid,                proposedEpoch, qv.toString().getBytes());        if(LOG.isDebugEnabled()){            LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +                  Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +                  " (n.round), " + sid + " (recipient), " + self.getId() +                  " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");        }        //储存投票信息        sendqueue.offer(notmsg);    }}class WorkerSender extends ZooKeeperThread {    //...    public void run() {    while (!stop) {        try {//提取已储存的投票信息            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);            if(m == null) continue;            process(m);        } catch (InterruptedException e) {            break;        }    }    LOG.info("WorkerSender is down");  }//...}

    自旋recvqueue队列元素获取投票过来的选票信息:

    【Java】public Vote lookForLeader() throws InterruptedException {//...while ((self.getPeerState() == ServerState.LOOKING) &amp;&amp;        (!stop)){        //提取投递过来的选票信息    Notification n = recvqueue.poll(notTimeout,            TimeUnit.MILLISECONDS);if(n == null){    if(manager.haveDelivered()){        //已全部连接成功,并且前一轮投票都完成,需要再次发起投票        sendNotifications();    } else {        //如果未收到选票信息,manager.contentAll()自动连接其它socket节点        manager.connectAll();    }        int tmpTimeOut = notTimeout*2;    notTimeout = (tmpTimeOut &lt; maxNotificationInterval?            tmpTimeOut : maxNotificationInterval);    LOG.info("Notification time out: " + notTimeout);         }     //....    }  //...}
    【Java】//manager.connectAll()-&gt;connectOne(sid)-&gt;initiateConnection(...)-&gt;startConnection(...)private boolean startConnection(Socket sock, Long sid)        throws IOException {    DataOutputStream dout = null;    DataInputStream din = null;    try {        // Use BufferedOutputStream to reduce the number of IP packets. This is        // important for x-DC scenarios.        BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());        dout = new DataOutputStream(buf);        // Sending id and challenge        // represents protocol version (in other words - message type)        dout.writeLong(PROTOCOL_VERSION);        dout.writeLong(self.getId());        String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();        byte[] addr_bytes = addr.getBytes();        dout.writeInt(addr_bytes.length);        dout.write(addr_bytes);        dout.flush();        din = new DataInputStream(                new BufferedInputStream(sock.getInputStream()));    } catch (IOException e) {        LOG.warn("Ignoring exception reading or writing challenge: ", e);        closeSocket(sock);        return false;    }    // authenticate learner    QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);    if (qps != null) {        // TODO - investigate why reconfig makes qps null.        authLearner.authenticate(sock, qps.hostname);    }    // If lost the challenge, then drop the new connection    //保证集群中所有节点之间只有一个通道连接    if (sid &gt; self.getId()) {        LOG.info("Have smaller server identifier, so dropping the " +                "connection: (" + sid + ", " + self.getId() + ")");        closeSocket(sock);        // Otherwise proceed with the connection    } else {        SendWorker sw = new SendWorker(sock, sid);        RecvWorker rw = new RecvWorker(sock, din, sid, sw);        sw.setRecv(rw);        SendWorker vsw = senderWorkerMap.get(sid);        if(vsw != null)            vsw.finish();        senderWorkerMap.put(sid, sw);        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue&lt;ByteBuffer&gt;(                SEND_CAPACITY));        sw.start();        rw.start();        return true;    }    return false;}

    如上述代码中所示,sid>self.sid才可以创建连接Socket和SendWorker,RecvWorker线程,存储到senderWorkerMap<sid,SendWorker>中。对应第2步中的sid<self.sid逻辑,保证集群中所有节点之间只有一个通道连接。

    zookeeper的Leader选举机制是什么

    节点之间连接方式

    【Java】public Vote lookForLeader() throws InterruptedException {//...    if (n.electionEpoch > logicalclock.get()) {        //当前选举周期小于选票周期,重置recvset选票池        //大于当前周期更新当前选票信息,再次发送投票        logicalclock.set(n.electionEpoch);        recvset.clear();        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {            updateProposal(n.leader, n.zxid, n.peerEpoch);        } else {            updateProposal(getInitId(),                    getInitLastLoggedZxid(),                    getPeerEpoch());        }        sendNotifications();    } else if (n.electionEpoch < logicalclock.get()) {        if(LOG.isDebugEnabled()){            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"                    + Long.toHexString(n.electionEpoch)                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));        }        break;    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,            proposedLeader, proposedZxid, proposedEpoch)) {//相同选举周期        //接收的选票与当前选票PK成功后,替换当前选票        updateProposal(n.leader, n.zxid, n.peerEpoch);        sendNotifications();    }//...}

    在上代码中,自旋从recvqueue队列中获取到选票信息。开始进行选举:

    • 判断当前选票和接收过来的选票周期是否一致

    • 大于当前周期更新当前选票信息,再次发送投票

    • 周期相等:当前选票信息和接收的选票信息进行PK

    【Java】//接收的选票与当前选票PKprotected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));        if(self.getQuorumVerifier().getWeight(newId) == 0){            return false;        }                return ((newEpoch > curEpoch) ||                ((newEpoch == curEpoch) &&                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId)))));  }

    在上述代码中的totalOrderPredicate方法逻辑如下:

    • 竞选周期大于当前周期为true

    • 竞选周期相等,竞选zxid大于当前zxid为true

    • 竞选周期相等,竞选zxid等于当前zxid,竞选sid大于当前sid为true

    • 经过上述条件判断为true将当前选票信息替换为竞选成功的选票,同时再次将新的选票投出去。

    【Java】public Vote lookForLeader() throws InterruptedException {//...   //存储节点对应的选票信息    // key:选票来源sid  value:选票推举的Leader sid    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));    //半数选举开始    if (termPredicate(recvset,            new Vote(proposedLeader, proposedZxid,                    logicalclock.get(), proposedEpoch))) {        // Verify if there is any change in the proposed leader        while((n = recvqueue.poll(finalizeWait,                TimeUnit.MILLISECONDS)) != null){            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,                    proposedLeader, proposedZxid, proposedEpoch)){                recvqueue.put(n);                break;            }        }                if (n == null) {            //已选举出leader 更新当前节点是否为leader             self.setPeerState((proposedLeader == self.getId()) ?                    ServerState.LEADING: learningState());            Vote endVote = new Vote(proposedLeader,                    proposedZxid, proposedEpoch);            leaveInstance(endVote);            return endVote;        }    }//...}private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();    voteSet.addQuorumVerifier(self.getQuorumVerifier());    if (self.getLastSeenQuorumVerifier() != null            && self.getLastSeenQuorumVerifier().getVersion() > self                    .getQuorumVerifier().getVersion()) {        voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());    }        //votes 来源于recvset 存储各个节点推举出来的选票信息    for (Map.Entry<Long, Vote> entry : votes.entrySet()) {//选举出的sid和其它节点选择的sid相同存储到voteSet变量中。        if (vote.equals(entry.getValue())) {//保存推举出来的sid            voteSet.addAck(entry.getKey());        }    }    //判断选举出来的选票数量是否过半    return voteSet.hasAllQuorums();}//QuorumMaj#containsQuorumpublic boolean containsQuorum(Set<Long> ackSet) {    return (ackSet.size() > half);   }

    在上述代码中:recvset是存储每个sid推举的选票信息。

    第一轮 sid1:vote(1,0,1) ,sid2:vote(2,0,1);

    第二轮 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。

    最终经过选举信息vote(2,0,1)为推荐leader,并用推荐leader在recvset选票池里比对持相同票数量为2个。因为总共有3个节点参与选举,sid1和sid2都选举sid2为leader,满足票数过半要求,故确认sid2为leader。

    • setPeerState更新当前节点角色;

    • proposedLeader选举出来的sid和自己sid相等,设置为Leader;

    • 上述条件不相等,设置为Follower或Observing;

    • 更新currentVote当前选票为Leader的选票vote(2,0,1)。

    到此,相信大家对“zookeeper的Leader选举机制是什么”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

    免责声明:

    ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

    ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

    zookeeper的Leader选举机制是什么

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档

    猜你喜欢

    zookeeper的Leader选举机制是什么

    本篇内容主要讲解“zookeeper的Leader选举机制是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“zookeeper的Leader选举机制是什么”吧!zookeeper一个分布式服务
    2023-07-05

    zookeeper的Leader选举机制源码解析

    这篇文章主要为大家介绍了zookeeper的Leader选举源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-05-14

    ZooKeeper中的Leader是如何选举出来的

    ZooKeeper中的Leader是通过选举算法来确定的。当一个ZooKeeper服务器(节点)启动时,它会尝试与其他ZooKeeper服务器建立连接,然后它们会相互通信以确定彼此的状态。在选举算法中,每个节点都有一个编号和一个逻辑时钟,节
    ZooKeeper中的Leader是如何选举出来的
    2024-03-07

    ZooKeeper的选举算法是什么

    ZooKeeper使用的选举算法是基于Paxos协议的Zab(ZooKeeper Atomic Broadcast)协议。在Zab协议中,ZooKeeper集群中的所有节点都会通过一个Leader选举过程来选举出一个节点作为“领导者”(Le
    ZooKeeper的选举算法是什么
    2024-04-09

    elasticsearch的master选举机制是什么

    Elasticsearch的主节点选举机制是通过Zen Discovery和Unicast Discovery两个插件来实现的。Zen Discovery插件是Elasticsearch的默认插件,它使用一种基于选举的机制来选举主节点。当
    2023-10-23

    zookeeper选举的源码过程是什么样的

    zookeeper选举的源码过程是什么样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。集群概述zookeper 在生产环境中通常都是通过集群方式来部署的,以保
    2023-06-15

    redis集群选举机制是什么

    Redis集群的选举机制是基于Raft算法的一种实现。在Redis集群中,每个节点都有可能成为领导者(leader),而其他节点则成为追随者(follower)。选举机制的目的是为了确保集群中只有一个领导者,以确保数据的一致性和可用性。在
    redis集群选举机制是什么
    2024-04-09

    redis从节点选举机制是什么

    Redis的从节点选举机制是通过主节点选举从节点的方式来实现的。当一个从节点与主节点连接时,它会发送一个SYNC命令来请求复制主节点的数据。主节点在收到SYNC命令后,会执行BGSAVE命令来创建一个RDB快照,并将快照发送给从节点。同时,
    2023-09-11

    ZooKeeper中的Watch机制是什么

    ZooKeeper中的Watch机制是一种事件监听机制,用于通知客户端关于特定节点的状态变化。当客户端对某个节点注册了Watch事件后,如果该节点的状态发生变化(例如节点的数据被更新、节点被创建或被删除等),ZooKeeper会通知客户端,
    ZooKeeper中的Watch机制是什么
    2024-03-06

    Zookeeper集群管理与选举方法是什么

    这篇文章主要讲解了“Zookeeper集群管理与选举方法是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Zookeeper集群管理与选举方法是什么”吧!  1.集群机器监控  这通常用于
    2023-06-02

    ZooKeeper的Watcher机制是基于什么原理的

    ZooKeeper的Watcher机制是基于发布/订阅模式的原理。在ZooKeeper中,客户端可以注册Watcher来监听指定节点的状态变化,当节点的状态发生变化时,ZooKeeper会通知注册了Watcher的客户端,使得客户端可以及时
    ZooKeeper的Watcher机制是基于什么原理的
    2024-03-08

    分布式锁要选择Zookeeper而不是Redis的原因是什么

    这篇文章给大家分享的是有关分布式锁要选择Zookeeper而不是Redis的原因是什么的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。在分布式的应用中,为了防止单点故障,保障高可用,通常会采用主从结构,当主节点挂掉
    2023-06-15

    Linux的Signal机制是什么

    这篇文章主要介绍了Linux的Signal机制是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Linux的Signal机制是什么文章都会有所收获,下面我们一起来看看吧。Signal机制在Linux中是一个非
    2023-06-27

    Java的SPI机制是什么

    本篇内容介绍了“Java的SPI机制是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!SPI的全名为Service Provider In
    2023-06-17

    MapReduce的Shuffle机制是什么

    这篇文章主要介绍“MapReduce的Shuffle机制是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“MapReduce的Shuffle机制是什么”文章能帮助大家解决问题。Shuffle过程,
    2023-06-27

    Golang的GC机制是什么

    这篇文章主要介绍“Golang的GC机制是什么”,在日常操作中,相信很多人在Golang的GC机制是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Golang的GC机制是什么”的疑惑有所帮助!接下来,请跟
    2023-07-05

    Java的ClassLoader机制是什么

    本篇内容介绍了“Java的ClassLoader机制是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!JVM在加载类的时候,都是通过Cla
    2023-06-17

    编程热搜

    • Python 学习之路 - Python
      一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
      Python 学习之路 - Python
    • chatgpt的中文全称是什么
      chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
      chatgpt的中文全称是什么
    • C/C++中extern函数使用详解
    • C/C++可变参数的使用
      可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
      C/C++可变参数的使用
    • css样式文件该放在哪里
    • php中数组下标必须是连续的吗
    • Python 3 教程
      Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
      Python 3 教程
    • Python pip包管理
      一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
      Python pip包管理
    • ubuntu如何重新编译内核
    • 改善Java代码之慎用java动态编译

    目录