我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Zookeeper笔记

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Zookeeper笔记

为什么要使用Zookeeper

dubbo需要一个注册中心,而Zookeeper是我们在使用Dubbo是官方推荐的注册中心
在这里插入图片描述
在这里插入图片描述

Zookeeper介绍

在这里插入图片描述
Zookeeper的集群机制
Zookeeper是为了其他分布式程序提供服务的,所以不能随便就挂了。Zookeeper的集群机制采取的是半数存活机制。也就是整个集群节点中有半数以上的节点存活,那么整个集群环境可用。这也是为什么说zk的集群最好是奇数个节点。

zk的作用

序号功能
1为别的分布式程序服务的
2本身就是一个分布式程序
3主从协调 服务器节点动态上下线 统一配置管理 分布式共享锁 统一名称服务
4管理(存储,读取)用户程序提交的数据 并为用户程序提供数据节点监听服务

Zookeeper 节点的三种角色

Leader
Leader是Zookeeper集群工作的核心,其主要工作是:

  1. 事务请求的唯一调度和处理者,保证集群事务处理的顺序性。
  2. 集群内部各服务器的调度者

Follower
Follower是zookeeper集群的跟随者,主要工作是:

  1. 处理客户端非事务性请求(读取数据),转发事务请求给Leader服务器
  2. 参与事务请求Proposal的投票
  3. 参与Leader选举投票

Observer
Observer充当观察者的角色,观察Zookeeper集群的最小状态变化并将这些状态同步过来,其对于非事务请求可以独立处理,对于事务请求,会转给Leader节点进行处理。Observer不会参与投票,包括事务请求Proposal的投票和Leader选举投票

###Zookeeper 节点的四种状态
在知道了 Zookeeper 中有三种角色后,我们可能会问: Zookeeper 是如何知道自己目前是什么角色的呢?

在 ZAB 协议中定义:通过自身的状态来区分自己的角色的,在运行期间各个进程可能出现以下四种状态之一:

  • Looking:不确定Leader状态,该状态下的服务器认为当前集群中没有Leader,会发起Leader选举
  • Following:跟随者状态,表明当前服务器角色是Follower,并且它知道Leader是谁
  • Leading:领导者状态,表明当前服务器角色是Leader,它会维护与Follower间的心跳
  • Observing:观察者状态,表明当前服务器角色是Observer,与Follower唯一的不同在于不参与选举,也不参与集群写操作时的投票

在组成 ZAB 协议的所有进程启动的时候,初始化状态都是 LOOKING 状态,此时进程组中不存在 Leader,选举之后才有,在进行选举成功后,就进入消息广播模式,此时 Zookeeper 集群中的角色状态就不再是 LOOKING 状态。

集群环境准备

节点的映射关系

每个节点设置相应的ip和主机名的映射关系,方便集群环境的部署
修改hosts配置文件中的信息
在这里插入图片描述

配置免密登录
生成公钥和私钥

ssh-keygen

输入命令后根据提示,四次回车即可
在这里插入图片描述

发送公钥给需要免密登录的节点

ssh-copy-id zk01ssh-copy-id zk02ssh-copy-id zk03

在这里插入图片描述
节点和节点发送文件通过scp命令实现

scp -r b.txt bobo01:/root/

在这里插入图片描述
关闭防火墙
查看防火墙状态

firewall-cmd --state

停止防火墙

systemctl stop firewall.service

禁止开机启动

systemctl disable firewall.service

Zookeeper的选举机制

Leader主要作用是保证分布式数据一致性,即每个节点的存储的数据同步。

服务器初始化时Leader选举

Zookeeper由于自身的性质,一般建议选取奇数个节点进行搭建分布式服务器集群。以3个节点组成的服务器集群为例,说明服务器初始化时的选举过程。启动第一台安装Zookeeper的节点时,无法单独进行选举,启动第二台时,两节点之间进行通信,开始选举Leader。

  1. 每个Server投出一票。第一次他们都投给自己作为Leader,投票内容未(SID,ZXID)。
    SID为Server的id,即启动ZK时配置文件中的myid;
    ZXID为事务id,为节点的更新程序,ZXID越大,代表Server对ZK节点的操作越新。由于服务器初始化,
    每个Sever上的Znode为0,所以Server1投的票为(1,0),Server2为(2,0)。两Server将各自投票发给集群中其他机器。
  2. 每个Server接收来自其他Server的投票。集群中的每个Server先判断投票的有效性,如检查是不是本轮的投票,是不是来Looking状态的服务器投的票。
  3. 对投票结果进行处理。处理规则为:
    • 首先对比ZXID。ZXID大的服务器优先作为Leader
    • 若ZXID系统,如初始化时,每个Server的ZXID都是0
    • 就会比较sid即myid,myid大的选出来做Leader。
      首次选举对于Server而言,他接受到的投票为(2,0),因为自身的票为(1,0),所以此时它会选举Server2为Leader,
      将自己的更新为(2,0)。而Server2收到的投票为Server1的(1,0)由于比他自己小,
      Server2的投票不变。Server1和Server2再次将票投出,投出的票都为(2,0)
  4. 统计投票。每次投票后,服务器都会统计投票信息,如果判定某个Server有过半的票数,俺么该Server就是Leader。首次投票对于Server1和Server2而言,统计出已经有两台机器接收了(2,0)的投票信息,此时认为选出了Leader。
  5. 改变服务器的状态。当确定了Leader之后,每个Server更新自己的状态,
    Leader将状态更新为Leading,Follower将状态更新为Following。
    image.png
    在这里插入图片描述

服务器运行期间的Leader选举

ZK运行期间,如果有新的Server加入,或非Leader节点挂了,那么Leader会同步数据给新的Server或寻找其他备用Server替代宕机的Server。若Leader宕机,此时集群暂停对外服务,开始在内部选举新的Leader。假设当前集群中有Server1、Server2、Server3三台服务器,Server2为当前集群的Leader,由于意外情况,Server2宕机了,便开始进入选举状态。过程如下

1 变更状态。其他非Observer服务器将自己的状态改变成Looking,开始进入Leader选举。
2. 每个Server投出1张票(myid,ZXID),由于集群运行过,所以每个Server的ZXID可能不同。
假设Server1的ZXID为145,Server3的为122,第一轮投票中,Server1和Server3都投自己,
票分别为(1,145)、(3,122),将自己的票发送给集群中所有机器。
3. 每个Server接收接收来自其他Server的投票,接下来的步骤与初始化时相同。

恢复模式的两个原则

当集群正在启动过程中,或 Leader 与超过半数的主机断连后,集群就进入了恢复模式。对于要恢复的数据状态需要遵循两个原则。
已被处理过的消息不能丢失
被丢弃的消息(原Leader未提交的事务)不能再现

1.已被处理过的消息不能丢失(部分follower提交了)
当 Leader 收到超过半数 Follower 的 ACKs 后,就向各个 Follower 广播 COMMIT 消息, 批准各个 Server 执行该写操作事务。当各个 Server 在接收到 Leader 的 COMMIT 消息后就会在本地执行该写操作,然后会向客户端响应写操作成功。

但是如果在非全部 Follower 收到 COMMIT 消息之前 Leader 就挂了,这将导致一种后 果:部分 Server 已经执行了该事务,而部分 Server 尚未收到 COMMIT 消息,所以其并没有执行该事务。当新的 Leader 被选举出,集群经过恢复模式后需要保证所有 Server 上都执行 了那些已经被部分 Server 执行过的事务。

2.被 Leader 提出的但没有被提交的消息不能再现(无follower提交)
当新事务在 Leader 上已经通过,其已经将该事务更新到了本地,但所有 Follower 还都没有收到 COMMIT 之前,Leader 宕机了(比前面叙述的宕机更早),此时,所有 Follower 根本 就不知道该 Proposal 的存在。当新的 Leader 选举出来,整个集群进入正常服务状态后,之 前挂了的 Leader 主机重新启动并注册成为了 Follower。

若那个别人根本不知道的 Proposal 还保留在那个主机,那么其数据就会比其它主机多出了内容,导致整个系统状态的不一致。所以,该 Proposa 应该被丢弃。类似这样应该被丢弃的事务,是不能再次出现在集群中的, 应该被清除。

myid与zxid

myid : 每个 Zookeeper 服务器,都需要在数据文件夹下创建一个名为 myid 的文件,该文件包含整个 Zookeeper 集群唯一的 ID

例如,某个 Zookeeper 集群包含三台服务器,在配置文件中,server.后面的数据即为 myid
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888

zxid : 类似于 RDBMS 中的事务ID,用于标识一个 Proposal ID,为了保证顺序性,ZXID 必须单调递增.

ZAB协议

ZAB(Zookeeper Atomic Broadcast)协议是Apache Zookeeper分布式协调服务中使用的一种原子广播协议。Zookeeper是一个开源的、高性能的分布式协调服务,用于管理分布式系统中的配置信息、命名服务、分布式锁等。ZAB协议是Zookeeper的核心协议,通过确保Zookeeper集群中所有服务器之间的事务广播顺序,它实现了Zookeeper数据的一致性

Zab 协议核心
Zookeeper采用ZAB协议的核心就是只要一台服务器提交了Proposal(事务),就要确保所有服务器最终都能正确提交Proposal,这也是CAP/BASE最终一致性的体现。(选举的核心

从大体上看,Zab 协议的原理可细分为四个阶段:

  1. 选举(Leader Election)
  2. 发现(Discovery)
  3. 同步(Synchronization)
  4. 广播(Broadcast)
    但是在 ZooKeeper 的实现中,将发现和同步两部分内容合成数据恢复一部分,所以按实现划分可以分为三个阶段:(选举的核心
  5. Leader选举(Fast Leader Election)
  6. 数据恢复(Recovery Phase)
  7. 广播(Broadcast)

Zab 协议按照功能的不同,有两种模式:一种是消息广播模式,另一种是崩溃恢复模式。
image.png
消息广播模式用于处理客户端的请求,崩溃恢复模式用于在节点意外崩溃时能够快速恢复,继续对外提供服务,让集群达成高可用状态。

1) 消息广播模式

  • 在Zookeeper集群中数据副本的传递策略就是采用消息广播模式,Zookeeper中的数据副本同步方式与2PC方式相似但却不同,2PC是要求协调者必须等待所有参与者全部反馈ACK确认消息后,再发送commit消息,要求所有参与者要么全成功要么全失败,2PC方式会产生严重的阻塞问题。
  • 而Zookeeper中Leader等待Follower的ACK反馈是指:只要半数以上的Follower成功反馈即可,不需要收到全部的Follower反馈。
    image.png

Zookeeper 中广播消息步骤:

  • 客户端发起一个写操作请求
  • Leader服务器处理客户端请求后将请求转为Proposal(事务),同时为每个Proposal分配一个全局唯一ID,即ZXID
  • Leader服务器与每个Follower之间都有一个队列,Leader将消息发送到该队列
  • Follower机器从队列中取出消息处理完(写入本地事务日志中)后,向LEader服务器发送ACK确认
  • Leader服务器收到半数以上的Follower的ACK后,即可以发送Commit
  • Leader向所有的Follower服务器发送Commit消息

崩溃恢复模式

一旦 Leader 服务器出现崩溃或者由于网络原因导致 Leader 服务器失去了与过半 Follower 的联系,那么就会进入崩溃恢复模式。

Zookeeper 集群中为保证任何进程能够顺序执行,只能是 Leader 服务器接收写请求,其他服务器接收到客户端的写请求,也会转发至 Leader 服务器进行处理。

Zab 协议崩溃恢复需满足以下2个请求:

  • 确保已经被 Leader 提交的 proposal 必须最终被所有的 Follower 服务器提交
  • 确保丢弃已经被 Leader 提出的但没有被提交的 Proposal(事务)

也就是新选举出来的 Leader 不能包含未提交的 Proposal(事务),必须都是已经提交了的 Proposal 的 Follower 服务器节点,新选举出来的 Leader 节点中含有最高的 ZXID,所以,在 Leader 选举时,将 ZXID 作为每个 Follower 投票时的信息依据。这样做的好处是避免了 Leader 服务器检查 Proposal 的提交和丢弃工作。(具体看上面的恢复规则)

Zookeeper是不是强一致性

写操作时是强一致性,读操作则不是强一致性
强一致性

  • 数据的强一致性指的是:一个外部的客户端,去请求一个分布式系统时,一旦这个数据被允许查询,那么任何一个被要求存储该数据的节点上,在任何时候都能查询到这份数据,且数据内容要求一模一样。

顺序一致性

  • 写入操作顺序一致性:当多个客户端同时向ZooKeeper进行写入操作时,这些写入操作会被按照客户端发起的顺序逐个执行。ZooKeeper通过强制所有写入操作都经过Leader节点进行协调,保证了写入操作的顺序一致性(Leader节点与从节点写入的数据一定是一致的)。
  • 读取数据顺序一致性:无论客户端连接到哪个ZooKeeper节点,它们在读取数据时都会获得相同的数据视图。

Zookeeper官方定义:顺序一致性。(Sequential consistency)

  • zookeeper 并不能保证所有客户端在某一时刻读到的数据是一致的,客户端还是有可能读到旧数据的,但可以使用sync()方法向leader节点同步最新的已提交数据。
    在这里插入图片描述
    已上图为例,如果一个zk集群有10000台节点,当进行写入的时候,如果已经有6K个节点写入成功,zk就认为本次写请求成功。但是这时候如果一个客户端读取的刚好是另外4K个节点的数据,那么读取到的就是旧的过期数据。

在zk的官方文档中对此有解释,地址在:https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html

ZK保证了保持长链接的单个客户端,总是能得到自己保持链接的机器上的最新值,这个值不一定是Leader的最新值。如果该机器挂了,会连接到zxid不小于当前节点的机器,从而保证不会读到更旧的值。

Zookeeper顺序一致性怎么做到的?

Leader 发送 proposal 时,其实会为每个 Follower 创建一个队列,都往各自的队列中发送 proposal。
image.png

Zookeeper客户端使用

配置Zookeeper的环境变量
为了简化我们每次操作Zookeeper而不用进入到Zookeeper的安装目录,我们可以将Zookeeper的安装信息配置到系统的环境变量中

vim /etc/profile

添加的内容

export ZOOKEPPER_HOME=/opt/zookeeperexport PATH=$PATH:$ZOOKEEPER_HOME/bin

执行source命令

source /etc/profile

我们就可以在节点的任意位置操作Zookeeper了,通过scp命令将profile文件发送到其他几个节点上

scp /etc/profile zk02:/etc/

客户端连接
通过bin目录下的zkCli.sh 命令连接即可

zkCli.sh

在这里插入图片描述
zkCli.sh默认连接的是当前节点的Zookeeper节点,如果我们要连接其他节点执行如下命令即可

zkCli.sh -timeout 5000 -server zk02:2181

数据操作

Zookeeper的数据结构

  1. 层次化的目录结构,命名符合常规文件系统规范
  2. 每个节点在Zookeeper中叫做znode,并且有一个唯一的路径标识
  3. 节点znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
  4. 客户端应用可以在节点上设置监听器
    在这里插入图片描述

节点类型

1).znode有两种类型:

短暂性(ephemeral)(断开连接自己删除)
持久性(persistent)(断开连接不删除)

2).znode有四种形式的目录节点(默认是persistent)如下

序号节点类型描述
1PERSISTENT持久节点
2PERSISTENT_SEQUENTIAL持久有序节点(顺序节点)
3EPHEMERAL短暂节点 (临时节点)
4EPHEMERAL_SEQUENTIAL短暂有序节点 (临时顺序节点)

创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,有父节点维护在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

常用命令
Zookeeper作为Dubbo的注册中心用来保存我们各个服务的节点信息,显示Zookeeper是可以实现输出的存储操作的,我们来看下Zookeeper中存储操作的基本命令

ls
​ ls用来查看某个节点下的子节点信息
在这里插入图片描述
增强的命令,查看节点下的子节点及当前节点的属性信息 ls2或者 ls -s 命令
在这里插入图片描述
create
​ 创建节点信息
在这里插入图片描述
get

​ get命令用来查看节点的数据
在这里插入图片描述
如果要查看节点的属性信息那么我们可以通过get -s 来实现

delete
​ delete只能删除没有子节点的节点要删除非空节点可以通过 rmr 或者 deleteall 命令实现
在这里插入图片描述
set

​ set命令可以用来修改节点的内容。
在这里插入图片描述
事件监听
监听某个节点的数据内容变化,通过get命令 带 -w 参数即可,在3.4版本的Zookeeper中是通过 get path watch 来说实现监控的
在这里插入图片描述
然后我们在其他节点上修改app1节点的数据,会触监听事件

Zookeeper Java API使用

pom

<dependencies><dependency>        <groupId>org.apache.zookeepergroupId>        <artifactId>zookeeperartifactId>        <version>3.5.9version>    dependency>    <dependency>        <groupId>com.github.sgroschupfgroupId>        <artifactId>zkclientartifactId>        <version>0.1version>    dependency>    <dependency>        <groupId>junitgroupId>        <artifactId>junitartifactId>        <version>4.12version>    dependency>dependencies>

连接ZK服务,并监听节点变化

@Slf4jpublic class ConfigCenter {    private final static  String CONNECT_STR="192.168.40.243:2181";    private final static Integer  SESSION_TIMEOUT=30*1000;    private static ZooKeeper zooKeeper=null;    private static CountDownLatch countDownLatch=new CountDownLatch(1);    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {        zooKeeper=new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() {            @Override            public void process(WatchedEvent event) {                if (event.getType()== Event.EventType.None                        && event.getState() == Event.KeeperState.SyncConnected){                    log.info("连接已建立");                    countDownLatch.countDown();                }            }        });        countDownLatch.await();        MyConfig myConfig = new MyConfig();        myConfig.setKey("anykey");        myConfig.setName("anyName");        ObjectMapper objectMapper=new ObjectMapper();        byte[] bytes = objectMapper.writeValueAsBytes(myConfig);        String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        Watcher watcher = new Watcher() {            @SneakyThrows            @Override            public void process(WatchedEvent event) {                if (event.getType()== Event.EventType.NodeDataChanged                        && event.getPath()!=null && event.getPath().equals("/myconfig")){                    log.info(" PATH:{}  发生了数据变化" ,event.getPath());                    //循环监听                    //监听结束后,重新设置                    byte[] data = zooKeeper.getData("/myconfig", this, null);                    MyConfig newConfig = objectMapper.readValue(new String(data), MyConfig.class);                    log.info("数据发生变化: {}",newConfig);                }            }        };        byte[] data = zooKeeper.getData("/myconfig", watcher, null);        MyConfig originalMyConfig = objectMapper.readValue(new String(data), MyConfig.class);        log.info("原始数据: {}", originalMyConfig);//        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);    }

在这里插入图片描述
standalone版本

@Slf4jpublic abstract class  StandaloneBase {    private static final String CONNECT_STR="192.168.109.200:2181";    private static final int SESSION_TIMEOUT=30 * 1000;    private static ZooKeeper zooKeeper =null;    private static CountDownLatch countDownLatch = new CountDownLatch(1);    private Watcher watcher =new Watcher() {        @Override        public void process(WatchedEvent event) {            if (event.getState() == Event.KeeperState.SyncConnected                    && event.getType()== Event.EventType.None){                countDownLatch.countDown();                log.info("连接建立");            }        }    };    @Before    public void init(){        try {            log.info(" start to connect to zookeeper server: {}",getConnectStr());             zooKeeper=new ZooKeeper(getConnectStr(), getSessionTimeout(), watcher);            log.info(" 连接中...");            countDownLatch.await();        } catch (IOException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }    }    public static ZooKeeper getZooKeeper() {        return zooKeeper;    }    @After    public void test(){        try {            TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);        } catch (InterruptedException e) {            e.printStackTrace();        }    }    protected String getConnectStr(){        return CONNECT_STR;    }    protected int getSessionTimeout() {        return SESSION_TIMEOUT;    }}
    private String first_node = "/first-node";    @Test    public void testCreate() throws KeeperException, InterruptedException {        ZooKeeper zooKeeper = getZooKeeper();        String s = zooKeeper.create(first_node, "first".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);        log.info("Create:{}",s);    }@Test    public void testGetData(){        Watcher watcher = new Watcher() {            @Override            public void process(WatchedEvent event) {                      if (event.getPath()!=null && event.getPath().equals(first_node)  && event.getType()== Event.EventType.NodeDataChanged){                          log.info(" PATH: {}  发现变化",first_node);                          try {  byte[] data = getZooKeeper().getData(first_node, this, null);  log.info(" data: {}",new String(data));                          } catch (KeeperException e) {  e.printStackTrace();                          } catch (InterruptedException e) {  e.printStackTrace();                          }                      }            }        };        try {            byte[] data = getZooKeeper().getData(first_node, watcher, null);  //            log.info(" data: {}",new String(data));        } catch (KeeperException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }    }        @Test    public void  asyncTest(){          String userId="xxx";          getZooKeeper().getData("/test", false, (rc, path, ctx, data, stat) -> {              Thread thread = Thread.currentThread();              log.info(" Thread Name: {},   rc:{}, path:{}, ctx:{}, data:{}, stat:{}",thread.getName(),rc, path, ctx, data, stat);        },"test");        log.info(" over .");    }        @Test    public void exist() throws  Exception{        ZooKeeper zooKeeper = getZooKeeper();        // true表示的是使用Zookeeper中的watch        Stat stat = zooKeeper.exists(first_node, true);        if(stat != null){            System.out.println("节点存在"+ stat.getNumChildren());        }else{            System.out.println("节点不存在 ....");        }    }        @Test    public void getChildrens() throws Exception{    ZooKeeper zooKeeper = getZooKeeper();        List<String> childrens = zooKeeper.getChildren(first_node, true);        for (String children : childrens) {            // System.out.println(children);            // 获取子节点中的数据            byte[] data = zooKeeper.getData(first_node+"/" + children, false, null);            System.out.println(children+":" + new String(data));        }    }        @Test    public void setData() throws Exception{        // -1 不指定版本 自动维护        Stat stat = zooKeeper.setData(first_node+"/a1", "666666".getBytes(), -1);        System.out.println(stat);        //  指定版本 自动维护//ZooKeeper zooKeeper = getZooKeeper();        //Stat stat = new Stat();        //byte[] data = zooKeeper.getData(first_node, false, stat);        //int version = stat.getVersion();//版本号        //zooKeeper.setData(first_node, "third".getBytes(), version);    }        @Test    public void deleteNode() throws Exception{        zooKeeper.delete(first_node,-1);            }

事件监听处理

    @Test    public void nodeChildrenChange() throws Exception{        List<String> list = zooKeeper.getChildren("/app1", new Watcher() {                        @Override            public void process(WatchedEvent watchedEvent) {                System.out.println("--->"+ watchedEvent.getType());            }        });        for (String s : list) {            System.out.println(s);        }        Thread.sleep(Integer.MAX_VALUE);    }        @Test    public void nodeDataChanged() throws Exception{        byte[] data = zooKeeper.getData("/app1/a1", new Watcher() {            @Override            public void process(WatchedEvent watchedEvent) {                System.out.println("--->" + watchedEvent.getType());            }        }, null);        System.out.println("--->" + new String(data));        Thread.sleep(Integer.MAX_VALUE);    }

Curator

Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目
是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使
用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、
分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工
作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper
处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简
单,不但减少了开发时间,而且增强了程序的可靠性。

yaml

<dependency>    <groupId>org.apache.curatorgroupId>    <artifactId>curator-recipesartifactId>    <version>5.0.0version>    <exclusions>        <exclusion>            <groupId>org.apache.zookeepergroupId>            <artifactId>zookeeperartifactId>        exclusion>    exclusions>dependency><dependency>    <groupId>org.apache.curatorgroupId>    <artifactId>curator-x-discoveryartifactId>    <version>5.0.0version>    <exclusions>        <exclusion>            <groupId>org.apache.zookeepergroupId>            <artifactId>zookeeperartifactId>        exclusion>    exclusions>dependency><dependency>    <groupId>org.apache.zookeepergroupId>    <artifactId>zookeeperartifactId>    <version>3.5.8version>dependency>

父类

@Slf4jpublic abstract  class CuratorStandaloneBase {    private static final String CONNECT_STR = "192.168.109.200:2181";    private static final int sessionTimeoutMs = 60*1000;    private static final int connectionTimeoutMs = 5000;    private static CuratorFramework curatorFramework;    @Before    public void init() {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);        curatorFramework = CuratorFrameworkFactory.builder().connectString(getConnectStr())                .retryPolicy(retryPolicy)                .sessionTimeoutMs(sessionTimeoutMs)                .connectionTimeoutMs(connectionTimeoutMs)                .canBeReadOnly(true)                .build();        curatorFramework.getConnectionStateListenable().addListener((client, newState) -> {            if (newState == ConnectionState.CONNECTED) {                log.info("连接成功!");            }        });        log.info("连接中......");        curatorFramework.start();    }    public void createIfNeed(String path) throws Exception {        Stat stat = curatorFramework.checkExists().forPath(path);        if (stat==null){            String s = curatorFramework.create().forPath(path);            log.info("path {} created! ",s);        }    }    public static CuratorFramework getCuratorFramework() {        return curatorFramework;    }       protected   String getConnectStr(){        return CONNECT_STR;    }}
// 递归创建子节点    @Test    public void testCreateWithParent() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        String pathWithParent = "/node-parent/sub-node-1";        String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);        log.info("curator create node :{}  successfully.", path);    }    // protection 模式,防止由于异常原因,导致僵尸节点    @Test    public void testCreate() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        String forPath = curatorFramework                .create()                .withProtection()                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).                        forPath("/curator-node", "some-data".getBytes());        log.info("curator create node :{}  successfully.", forPath);    }    @Test    public void testGetData() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        byte[] bytes = curatorFramework.getData().forPath("/curator-node");        log.info("get data from  node :{}  successfully.", new String(bytes));    }    @Test    public void testSetData() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());        byte[] bytes = curatorFramework.getData().forPath("/curator-node");        log.info("get data from  node /curator-node :{}  successfully.", new String(bytes));    }    @Test    public void testDelete() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        String pathWithParent = "/node-parent";        curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);    }    @Test    public void testListChildren() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        String pathWithParent = "/discovery/example";        List<String> strings = curatorFramework.getChildren().forPath(pathWithParent);        strings.forEach(System.out::println);    }//线程池方式    @Test    public void testThreadPool() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        ExecutorService executorService = Executors.newSingleThreadExecutor();        String ZK_NODE="/zk-node";        curatorFramework.getData().inBackground((client, event) -> {            log.info(" background: {}", event);        },executorService).forPath(ZK_NODE);     }

Watch机制

ZooKeeper的Watch机制是它的一个重要特性,它允许客户端在ZooKeeper节点发生变化时得到通知。通过Watch机制,客户端可以设置对指定节点的监视,并在节点发生变化(数据更新、节点删除、子节点变化等)时,ZooKeeper会通知相关的客户端。(相当与redis分布式锁中的watchDog机制,是分布式锁中十分重要的机制,用于监听比自己小的节点

Watch机制的主要特点包括:

一次性触发:一旦Watch被设置在一个节点上,并且该节点发生了监视的事件(例如数据更新),那么Watch就会触发通知。一次触发后,Watch就会失效,客户端需要重新设置Watch以继续监视节点。

轻量级通知:ZooKeeper的Watch机制是轻量级的,因为Watch只是一个通知,不包含实际数据。客户端收到Watch通知后,可以根据需要再次向ZooKeeper请求节点的最新数据。

顺序性:Watch通知是有序的。也就是说,如果多个Watch被设置在一个节点上,当该节点发生变化时,通知的顺序是确定的,保证了客户端可以按照特定的顺序处理通知。

一次性回调:客户端收到Watch通知后,需要进行一次性回调来处理事件。在处理通知的过程中,如果发生了新的变化,之前的Watch不会再次触发。

需要注意的是,Watch机制并不保证强一致性。由于网络延迟或其他因素,Watch通知可能有一定的延迟,客户端可能会收到旧的数据变更通知。因此,客户端在处理Watch通知时,需要谨慎处理,并考虑可能出现的数据不一致情况。

Watch机制是ZooKeeper实现分布式协作的关键机制之一。通过Watch,客户端可以实时获取ZooKeeper节点的变化,从而在分布式系统中做出相应的处理,实现高效的协作和协调。

ZK分布式锁

创建节点,判断顺序号是否是最小的
在这里插入图片描述
在这里插入图片描述

创建永久节点,在普通节点下创建临时顺序节点,节点之间按顺序依次监听(通过watch机制),当拿到锁的节点处理完事务后,释放锁,后一个节点监听到前一个节点释放锁后,立刻申请获得锁,以此类推
过程解析

  • 第一部分:客户端在Zookeeper集群创建临时顺序节点
  • 第二部分:判断节点是否是当前最小的节点,如果是,获取锁,反之,监听前一个节点
    原生方式实现Zookeeper的分布式锁
public class DistributedLock {    private String connectString = "192.168.58.100:2181";    private ZooKeeper client;    private CountDownLatch countDownLatch = new CountDownLatch(1);    private CountDownLatch waitLatch = new CountDownLatch(1);    //当前节点    private String currentNode;    //要等待的节点    private String waitPath;        //1.连接Zookeeper    public DistributedLock() throws Exception {        client = new ZooKeeper(connectString, 300000, new Watcher() {            @Override            public void process(WatchedEvent watchedEvent) {                //连接上zk  释放                if(watchedEvent.getState() == Event.KeeperState.SyncConnected){                    countDownLatch.countDown();                }                // waitLatch 需要释放,节点被删除,并且是前一个节点                if(watchedEvent.getType() == Event.EventType.NodeDeleted &&                watchedEvent.getPath().equals(waitPath)){                    waitLatch.countDown();                }            }        });        //zk 连接成功,再往下走        countDownLatch.await();        //2.判断节点是否存在        Stat stat = client.exists("/locks", false);        if(stat == null){            //创建根节点            client.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        }    }    //3.加锁    public void zkLock() throws KeeperException, InterruptedException {        //创建临时顺序节点        currentNode = client.create("/locks/" + "seq-" ,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);        //判断创建的节点是否是最小序号节点,如果是 就获取锁,不是监听前一个节点        List<String> children = client.getChildren("/locks", false);        //如果集合中只有一个元素,可以直接获取锁        if(children.size() == 1){            return;        }else{            //先排序            Collections.sort(children);            //获取节点名称            String nodeName = currentNode.substring("/locks/".length());            //获取节点名称 在集合的位置            int index = children.indexOf(nodeName);            if(index == -1){                System.out.println("数据异常");            }else if(index == 0){                return;            }else{                //需要监听前一个节点的变化                waitPath = "/locks/" + children.get(index - 1);                client.getData(waitPath,true,null);                //等待监听执行                waitLatch.await();                return;            }        }    }    //解锁    public void unZkLock() throws KeeperException, InterruptedException {        //删除节点        client.delete(currentNode,-1);    }}

Nginx代理转发
在这里插入图片描述

Curator框架实现分布式锁

实现思路
image.png

InterProcessMutex介绍

Apache Curator 内置了分布式锁的实现: InterProcessMutex

  • InterProcessMutex有两个构造方法
public InterProcessMutex(CuratorFramework client, String path){    this(client, path, new StandardLockInternalsDriver());}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver){    this(client, path, LOCK_NAME, 1, driver);}
  • 参数说明如下
参数说明
clientcurator中zk客户端对象
path抢锁路径,同一个锁path需一致
driver可自定义lock驱动实现分布式锁
  • 主要方法
//获取锁,若失败则阻塞等待直到成功,支持重入public void acquire() throws Exception  //超时获取锁,超时失败public boolean acquire(long time, TimeUnit unit) throws Exception  //释放锁public void release() throws Exception
  • 注意点,调用acquire()方法后需相应调用release()来释放锁

代码实现
配置类

@Configurationpublic class CuratorCfg {    //Curator初始化    @Bean(initMethod = "start")    public CuratorFramework curatorFramework(){        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.40.240:2181", retryPolicy);//CuratorFramework build = CuratorFrameworkFactory.builder()//                .connectString("192.168.40.240:2181")//                .sessionTimeoutMs(60*1000)//                .connectionTimeoutMs(60*1000)//                .retryPolicy(new ExponentialBackoffRetry(1000,3))//                .build();        return client;    }}

业务代码

@RestControllerpublic class TestController {    @Autowired    private OrderService orderService;    @Value("${server.port}")    private String port;        public static final String product = "/product_";    @Autowired    CuratorFramework curatorFramework;    @PostMapping("/stock/deduct")    public Object reduceStock(Integer id) throws Exception {        InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, product + id);//互斥锁        try {            // ...            interProcessMutex.acquire();//加锁            orderService.reduceStock(id);        } catch (Exception e) {            if (e instanceof RuntimeException) {                throw e;            }        }finally {            interProcessMutex.release();//解锁        }        return "ok:" + port;    }}

全局异常处理类

@ControllerAdvicepublic class ExceptionHandlerController {    @ExceptionHandler    @ResponseStatus(value = HttpStatus.BAD_REQUEST)    @ResponseBody    public Object exceptionHandler(RuntimeException e){        Map<String,Object> result=new HashMap<>(  );        result.put( "status","error" );        result.put( "message",e.getMessage() );        return result;    }}

ZK注册中心

项目较小的话会考虑使用ZK做注册中心,原理:使用临时节点

spring.application.name=product-center1#zookeeper 连接地址spring.cloud.zookeeper.connect-string=192.168.40.240:2181#将本服务注册到zookeeper,如果不希望直接被发现可以配置为false,默认为truespring.cloud.zookeeper.discovery.register=truespring.cloud.zookeeper.session-timeout=30000

来源地址:https://blog.csdn.net/Forbidden_City/article/details/132010388

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Zookeeper笔记

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

2023-09-08

Zookeeper学习笔记 --- Py

zookeeper给python提供了几种API具体代码请参考我的Github。1.引用kazoo libAPI DOC:http://kazoo.readthedocs.io/en/latest/install.htmlCode:# -*
2023-01-31

linux/centos zookeeper 使用记录

配置cfg 下载zookeeper-3.4.14.tar.gz负责到centos服务器解压 /xxx/zookeeper-3.4.14/conf/下创建zoo.cfg文件并配置以下属性,/bsoft/zookeeperdata/目录先预
2023-08-30

django笔记

装饰器:require_http_method()当然你可以在视图函数内丢弃那些期望之外的请求(比如上例中的POST /report/1234/), 但更简单的做法是使用Django预置的require_http_methods(metho
2023-01-30

Python笔记

列表[List] 元组(truple) 字典{dict}# 列表中的每个元素都分配一个数字 - 它的位置,或索引,第一个索引是0,第二个索引是1,依此类推# list comprehension[i for i in range(10)]#
2023-01-31

redis笔记-

前言为了避免单点故障,我们需要将数据复制多份部署在多台不同的服务器上,即使有一台服务器出现故障其他服务器依然可以继续提供服务作用:数据备份扩展读性能(读写分离)复制方式:全量复制部分复制 实现方式 1、一主二扑 A(B、C) 一个Master两个Slave2、
redis笔记-
2021-09-21

mysql笔记-

用单表接近一千万数据(四列:int,两个varchar,日期时间)测,发现几个效率几乎一致count(0)=count(1)=count(数字)=count(*)>count(primary key)>count(column) 根据相关解释:1. count
mysql笔记-
2021-01-21
2024-04-02

SpringCloud笔记

2023年最新笔记,全文约 3 万字,蕴含 Spring Cloud 常用组件 Nacos、OpenFeign、Seata、Sentinel 等 〇、简介 什么是Spring Cloud? ​ Spring Cloud是一系列框架
2023-08-23

Linux笔记

一。基础思想 一切皆文件。 两条权限原则: 权限分组原则权限最小原则 su是切换用户,而sudo则是用root权限执行某操作( 普通用户+sudo=安全) Linux目录 系统只存在一颗文件树、从==/==开始,所有的文件都
2023-08-20
2023-09-26

Redis 笔记

Redis常用管理命令# dbsize 返回当前数据库 key 的数量。# info 返回当前 redis 服务器状态和一些统计信息。# monitor 实时监听并返回redis服务器接收到的所有请求信息。# shutdown 把数据同步保存到磁盘上,并关闭r
Redis 笔记
2020-04-14
2023-09-05

MySQL笔记

目录著名数据库(了解)概念组成(了解即可)注意点DDL操作操作数据库数据类型(列类型)操作表DML(Data Manipulation Language)DCL(Data Control Language)DQL(Data Query Language)编码备
MySQL笔记
2019-03-29

PostgreSQL笔记

一.PostgreSQL简介PostgreSQL数据库一种关系型数据库。是当前世界上最先进的开源关系型数据库。PostgreSQL使用的是一种客户端/服务器的模式。一次PostgreSQL会话由以下相关进程组成:1.postgres:一个服务器进程(该进程管理
PostgreSQL笔记
2021-03-05

redis笔记

//redis类型介绍//获取数据$lotterylRedis = Redis::getInstance()->redisGet($key);//有效期时间$key = "Kaijang:lottery:frequency:simplehtml:" . $lo
redis笔记
2022-02-07

SQLite3 笔记

SQLite insert插入一行:插入一组:使用select结果插入多行插入到新表:多行插入到新表,一步到位:非常有用的临时表:sqlite update更新一条记录:update 注意 约束sqlite delete约束:唯一约束uni
2023-01-31

python3 笔记

.1# 数的定义:#格式化:format ---字符串拼接%d,%f,%s 格式化输出str()、int()、float()强转类型函数#age=int(input('请输入您的年龄:'))#print ( age-1)#强转化的函数例子#
2023-01-31

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录