我的编程空间,编程开发者的网络收藏夹
学习永远不晚

ZooKeeper入门教程三分布式锁实现及完整运行源码

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

ZooKeeper入门教程三分布式锁实现及完整运行源码

ZooKeeper入门教程一简介与核心概念

ZooKeeper入门教程二在单机和集群环境下的安装搭建及使用

1.0版本

首先我们先介绍一个简单的zookeeper实现分布式锁的思路:

用zookeeper中一个临时节点代表锁,比如在/exlusive_lock下创建临时子节点/exlusive_lock/lock。

  • 所有客户端争相创建此节点,但只有一个客户端创建成功。
  • 创建成功代表获取锁成功,此客户端执行业务逻辑
  • 未创建成功的客户端,监听/exlusive_lock变更
  • 获取锁的客户端执行完成后,删除/exlusive_lock/lock,表示锁被释放
  • 锁被释放后,其他监听/exlusive_lock变更的客户端得到通知,再次争相创建临时子节点/exlusive_lock/lock。此时相当于回到了第2步。

我们的程序按照上述逻辑直至抢占到锁,执行完业务逻辑。

上述是较为简单的分布式锁实现方式。能够应付一般使用场景,但存在着如下两个问题:

1、锁的获取顺序和最初客户端争抢顺序不一致,这不是一个公平锁。每次锁获取都是当次最先抢到锁的客户端。

2、羊群效应,所有没有抢到锁的客户端都会监听/exlusive_lock变更。当并发客户端很多的情况下,所有的客户端都会接到通知去争抢锁,此时就出现了羊群效应。

为了解决上面的问题,我们重新设计。

2.0版本

我们在2.0版本中,让每个客户端在/exlusive_lock下创建的临时节点为有序节点,这样每个客户端都在/exlusive_lock下有自己对应的锁节点,而序号排在最前面的节点,代表对应的客户端获取锁成功。排在后面的客户端监听自己前面一个节点,那么在他前序客户端执行完成后,他将得到通知,获得锁成功。逻辑修改如下:

  • 每个客户端往/exlusive_lock下创建有序临时节点/exlusive_lock/lock_。创建成功后/exlusive_lock下面会有每个客户端对应的节点,如/exlusive_lock/lock_000000001
  • 客户端取得/exlusive_lock下子节点,并进行排序,判断排在最前面的是否为自己。如果自己的锁节点在第一位,代表获取锁成功,此客户端执行业务逻辑
  • 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点lock_000000002,那么则监听lock_000000001.
  • 当前一位锁节点(lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(lock_000000002)的逻辑。
  • 监听客户端重新执行第2步逻辑,判断自己是否获得了锁。

如此修改后,每个客户端只关心自己前序锁是否释放,所以每次只会有一个客户端得到通知。而且,所有客户端的执行顺序和最初锁创建的顺序是一致的。解决了1.0版本的两个问题。

接下来我们看看代码如何实现。

LockSample类

此类是分布式锁类,实现了2个分布式锁的相关方法:

1、获取锁

2、释放锁

主要程序逻辑围绕着这两个方法的实现,特别是获取锁的逻辑。我们先看一下该类的成员变量:

private ZooKeeper zkClient;
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_NAME = "Lock_";
private String lockPath;

定义了zkClient,用来操作zookeeper。

锁的根路径,及自增节点的前缀。此处生产环境应该由客户端传入。

当前锁的路径。

构造方法

public LockSample() throws IOException {
    zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if(event.getState()== Event.KeeperState.Disconnected){
                System.out.println("失去连接");
 
            }
        }
    });
}

创建zkClient,同时创建了状态监听。此监听可以去掉,这里只是打印出失去连接状态。

获取锁实现

暴露出来的获取锁的方法为acquireLock(),逻辑很简单:

public  void acquireLock() throws InterruptedException, KeeperException {
    //创建锁节点
    createLock();
    //尝试获取锁
    attemptLock();
}

首先创建锁节点,然后尝试去取锁。真正的逻辑都在这两个方法中。

createLock()

先判断锁的根节点/Locks是否存在,不存在的话创建。然后在/Locks下创建有序临时节点,并设置当前的锁路径变量lockPath。

代码如下:

private void createLock() throws KeeperException, InterruptedException {
    //如果根节点不存在,则创建根节点
    Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
    if (stat == null) {
        zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
 
    // 创建EPHEMERAL_SEQUENTIAL类型节点
    String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
            Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
    this.lockPath=lockPath;
}

attemptLock()

这是最核心的方法,客户端尝试去获取锁,是对2.0版本逻辑的实现,这里就不再重复逻辑,直接看代码:

private void attemptLock() throws KeeperException, InterruptedException {
    // 获取Lock所有子节点,按照节点序号排序
    List<String> lockPaths = null;
    lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
    Collections.sort(lockPaths);
    int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
    // 如果lockPath是序号最小的节点,则获取锁
    if (index == 0) {
        System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
        return ;
    } else {
        // lockPath不是序号最小的节点,监听前一个节点
        String preLockPath = lockPaths.get(index - 1);
 
        Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
 
        // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
        if (stat == null) {
            attemptLock();
        } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
            System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);
            synchronized (watcher) {
                watcher.wait();
            }
            attemptLock();
        }
    }
}

注意这一行代码

Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);

我们在获取前一个节点的时候,同时设置了监听watcher。如果前锁存在,则阻塞主线程。

watcher定义代码如下:

private Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        System.out.println(event.getPath() + " 前锁释放");
        synchronized (this) {
            notifyAll();
        }
    }
};

watcher只是notifyAll,让主线程继续执行,以便再次调用attemptLock(),去尝试获取lock。如果没有异常情况的话,此时当前客户端应该能够成功获取锁。

释放锁实现

释放锁原语实现很简单,参照releaseLock()方法。代码如下:

public void releaseLock() throws KeeperException, InterruptedException {
    zkClient.delete(lockPath, -1);
    zkClient.close();
    System.out.println(" 锁释放:" + lockPath);
}

关于分布式锁的代码到此就讲解完了,我们再看下客户端如何使用它。

我们创建一个TicketSeller类,作为客户端来使用分布式锁。

 TicketSeller类

sell()

不带锁的业务逻辑方法,代码如下:

private void sell(){
    System.out.println("售票开始");
    // 线程随机休眠数毫秒,模拟现实中的费时操作
    int sleepMillis = (int) (Math.random() * 2000);
    try {
        //代表复杂逻辑执行了一段时间
        Thread.sleep(sleepMillis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("售票结束");
}

仅是为了演示,sleep了一段时间。

sellTicketWithLock()

此方法中,加锁后执行业务逻辑,代码如下:

public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
    LockSample lock = new LockSample();
    lock.acquireLock();
    sell();
    lock.releaseLock();
}

测试入口

接下来我们写一个main函数做测试:

public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
    TicketSeller ticketSeller = new TicketSeller();
    for(int i=0;i<1000;i++){
        ticketSeller.sellTicketWithLock();
    }
}

main函数中我们循环调用ticketSeller.sellTicketWithLock(),执行加锁后的卖票逻辑。

测试方法

1、先启动一个java程序运行,可以看到日志输出如下:

main 锁创建: /Locks/Lock_0000000391
main 锁获得, lockPath: /Locks/Lock_0000000391
售票开始
售票结束
 锁释放:/Locks/Lock_0000000391
main 锁创建: /Locks/Lock_0000000392
main 锁获得, lockPath: /Locks/Lock_0000000392
售票开始
售票结束
 锁释放:/Locks/Lock_0000000392
main 锁创建: /Locks/Lock_0000000393
main 锁获得, lockPath: /Locks/Lock_0000000393
售票开始
售票结束
 锁释放:/Locks/Lock_0000000393

可见每次执行都是按照锁的顺序执行,而且由于只有一个进程,并没有锁的争抢发生。

2、我们再启动一个同样的程序,锁的争抢此时发生了,可以看到双方的日志输出如下:

程序1:

main 锁获得, lockPath: /Locks/Lock_0000000471
售票开始
售票结束
 锁释放:/Locks/Lock_0000000471
main 锁创建: /Locks/Lock_0000000473
 等待前锁释放,prelocakPath:Lock_0000000472
/Locks/Lock_0000000472 前锁释放
main 锁获得, lockPath: /Locks/Lock_0000000473
售票开始
售票结束
 锁释放:/Locks/Lock_0000000473

可以看到Lock_0000000471执行完成后,该进程获取的锁为Lock_0000000473,这说明Lock_0000000472被另外一个进程创建了。此时Lock_0000000473在等待前锁释放。Lock_0000000472释放后,Lock_0000000473才获得锁,然后才执行业务逻辑。

我们再看程序2的日志:

main 锁获得, lockPath: /Locks/Lock_0000000472
售票开始
售票结束
 锁释放:/Locks/Lock_0000000472
main 锁创建: /Locks/Lock_0000000474
 等待前锁释放,prelocakPath:Lock_0000000473
/Locks/Lock_0000000473 前锁释放
main 锁获得, lockPath: /Locks/Lock_0000000474
售票开始
售票结束
 锁释放:/Locks/Lock_0000000474

可以看到,确实是进程2获取了Lock_0000000472。

zookeeper实现分布式锁就先讲到这。注意代码只做演示用,并不适合生产环境使用。

代码清单如下:

1、LockSample

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
 
import java.io.IOException;
import java.util.Collections;
import java.util.List;
 
public class LockSample {
 
    //ZooKeeper配置信息
    private ZooKeeper zkClient;
    private static final String LOCK_ROOT_PATH = "/Locks";
    private static final String LOCK_NODE_NAME = "Lock_";
    private String lockPath;
 
    // 监控lockPath的前一个节点的watcher
    private Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println(event.getPath() + " 前锁释放");
            synchronized (this) {
                notifyAll();
            }
 
        }
    };
 
    public LockSample() throws IOException {
        zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(event.getState()== Event.KeeperState.Disconnected){
                    System.out.println("失去连接");
 
                }
            }
        });
    }
 
    //获取锁的原语实现.
    public  void acquireLock() throws InterruptedException, KeeperException {
        //创建锁节点
        createLock();
        //尝试获取锁
        attemptLock();
    }
 
    //创建锁的原语实现。在lock节点下创建该线程的锁节点
    private void createLock() throws KeeperException, InterruptedException {
        //如果根节点不存在,则创建根节点
        Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
        if (stat == null) {
            zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
 
        // 创建EPHEMERAL_SEQUENTIAL类型节点
        String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
                Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
        this.lockPath=lockPath;
    }
 
    private void attemptLock() throws KeeperException, InterruptedException {
        // 获取Lock所有子节点,按照节点序号排序
        List<String> lockPaths = null;
 
        lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
 
        Collections.sort(lockPaths);
 
        int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
 
        // 如果lockPath是序号最小的节点,则获取锁
        if (index == 0) {
            System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
            return ;
        } else {
            // lockPath不是序号最小的节点,监控前一个节点
            String preLockPath = lockPaths.get(index - 1);
 
            Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
 
            // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
            if (stat == null) {
                attemptLock();
            } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
                System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);
                synchronized (watcher) {
                    watcher.wait();
                }
                attemptLock();
            }
        }
    }
 
    //释放锁的原语实现
    public void releaseLock() throws KeeperException, InterruptedException {
        zkClient.delete(lockPath, -1);
        zkClient.close();
        System.out.println(" 锁释放:" + lockPath);
    }
 
 
}

2、TicketSeller

import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class TicketSeller {
    private void sell(){
        System.out.println("售票开始");
        // 线程随机休眠数毫秒,模拟现实中的费时操作
        int sleepMillis = (int) (Math.random() * 2000);
        try {
            //代表复杂逻辑执行了一段时间
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("售票结束");
    }
 
    public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
        LockSample lock = new LockSample();
        lock.acquireLock();
        sell();
        lock.releaseLock();
    }
 
    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
        TicketSeller ticketSeller = new TicketSeller();
        for(int i=0;i<1000;i++){
            ticketSeller.sellTicketWithLock();
 
        }
    }
}

以上就是ZooKeeper入门教程三分布式锁实现及完整运行源码的详细内容,更多关于ZooKeeper分布式锁实现源码的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

ZooKeeper入门教程三分布式锁实现及完整运行源码

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

ZooKeeper三分布式锁实现及完整运行的代码

本文小编为大家详细介绍“ZooKeeper三分布式锁实现及完整运行的代码”,内容详细,步骤清晰,细节处理妥当,希望这篇“ZooKeeper三分布式锁实现及完整运行的代码”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧
2023-06-29

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录