我的编程空间,编程开发者的网络收藏夹
学习永远不晚

ZooKeeper开发实际应用案例实战

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

ZooKeeper开发实际应用案例实战

ZooKeeper入门教程一简介与核心概念

ZooKeeper入门教程二在单机和集群环境下的安装搭建及使用

ZooKeeper入门教程三分布式锁实现及完整运行源码

ZooKeeper框架教程Curator分布式锁实现及源码分析

前面几章,我们学习了zookeeper的概念和使用,并且分析了curator通过zookeeper实现分布式锁的源代码,我们已经熟知zookeeper协调分布式系统的方式,相信大家一定会思考自己的项目场景中是否有zookeeper的用武之地。没错,我们学习的最终目的是要去应用它。本章,我通过实际工作中的一个例子,讲解zookeeper是如何帮我解决分布式问题,以此引导大家发现自己系统中可以应用zookeeper的场景。真正把zookeeper使用起来!

项目背景介绍

首先给大家介绍一下本文描述项目的情况。这是一个检索网站,它让你能在几千万份复杂文档数据中检索出你所需要的文档数据。为了加快检索速度,项目的数据分布在100台机器的内存里,我们称之为数据服务器。除了数据,这100台机器上均部署着检索程序。这些server之外,还有数台给前端提供接口的搜索server,这些机器属一个集群,我们称之为检索服务器。当搜索请求过来时,他们负责把搜索请求转发到那100台机器,待所有机器返回结果后进行合并,最终返回给前端页面。结构如下图:

面临问题

网站上线之初,由于数据只有几百万,所以数据服务器只有10多台。是一个规模比较小的分布式系统,当时没有做分布式系统的协调,也能正常工作,偶尔出问题,马上解决。但是到了近期,机器增长到100台,网站几乎每天都会出现问题,导致整个分布式系统挂掉。问题原因如下:

数据服务器之前没有做分布式协调。对于检索服务器来说,并不知道哪些数据服务器还存活,所以检索服务器每次检索,都会等待100台机器返回结果。但假如100台数据服务中某一台死掉了,检索服务器也会长时间等待他的返回。这导致了检索服务器积累了大量的请求,最终被压垮。当所有的检索服务器都被压垮时,那么网站也就彻底不可用了。

问题的本质为检索服务器维护的数据服务器列表是静态不变的,不能感知数据服务器的上下线。

在10台数据服务器的时候,某一台机器出问题的概率很小。但当增长到100台服务器时,出问题的概率变成了10倍。所以才会导致网站几乎每天都要死掉一次。

由于一台机器的问题,导致100台机器的分布式系统不可用,这是极其不合理,也是无法忍受的。

之前此项目的数据和检索不由我负责。了解到此问题的时候,我觉得这个问题得立刻解决,否则不但用户体验差,而且开发和运维也要每天疲于系统维护,浪费了大量资源,但由于还有很多新的需求在开发,原来的团队也没时间去处理。今年我有机会来解决这个问题,当时正好刚刚研究完zookeeper,立刻想到这正是采用zookeeper的典型场景。

如何解决

我直接说方案,程序分为数据服务器和检索服务器两部分。

数据服务器:

1、每台数据服务器启动时候以临时节点的形式把自己注册到zookeeper的某节点下,如/data_servers。这样当某数据服务器死掉时,session断开链接,该节点被删除。

检索服务器:

1、启动时,加载/data_servers下所有子节点数据,获取了目前所有能提供服务的数据服务器列表,并且加载到内存中。

2、启动时,同时监听/data_servers节点,当新的数据server上线或者某个server下线时,获得通知,然后重新加载/data_servers下所有子节点数据,刷新内存中数据服务器列表。

通过以上方案,做到数据服务器上下线时,检索服务器能够动态感知。检索服务器在检索前,从内存中取得的数据服务器列表将是最新的、可用的。即使在刷新时间差内取到了掉线的数据服务器也没关系,最多影响本次查询,而不会拖垮整个集群。见下图:

代码讲解

捋清思路后,其实代码就比较简单了。数据服务器只需要启动的时候写zookeeper临时节点就好了,同时写入自己服务器的相关信息,比如ip、port之类。检索无服务器端会稍微复杂点,不过此处场景和zookeeper官方给的例子十分符合,所以我直接参考官方例子进行修改,实现起来也很简单。关于官方例子我写过两篇博文,可以参考学习:

zookeeper官方例子翻译:ZooKeeper官方文档之Java客户端开发案例翻译

zookeeper官方例子解读:ZooKeeper官方文档之Java案例解读

数据服务器

数据服务器程序十分简单,只会做一件事情:启动的时候,把自己以临时节点的形式注册到zookeeper。一旦服务器挂掉,zookeeper自动删除临时znode。

我们创建ServiceRegister.java实现Runnable,数据服务启动的时候,单独线程运行此代码,实现注册到zookeeper逻辑。维系和zookeeper的链接。

检索服务器

检索服务器,代码设计完全采用官方案例,所以详细的代码解读请参考上面提到的两篇文章,这里只做下简述。

代码有两个类DataMonitor和LoadSaidsExecutor。LoadSaidsExecutor是启动入口,他来启动DataMonitor监控zookeeper节点变化。DataMonitor负责监控,初次启动和发现变化时,调用LoadSaidsExecutor的方法来加载最新的数据服务器列表信息。

DataMonitor和LoadSaidsExecutor的工作流程如下:

Excutor把自己注册为DataMonitor的监听

DataMonitor实现watcher接口,并监听znode

znode变化时,触发DataMonitor的监听

回调回调中通过ZooKeeper.exist() 再次监听znode

上一步exist的回调方法中,调用监听自己的Executor,执行业务逻辑6

Executor启新的线程加载数据服务器信息到内存中

注意:图为以前文章配图。图里应该把6,7步改为文字描述的第6步。

检索服务启动的时候,单独线程运行LoadSaIdsExecutor。LoadSaIdsExecutor会阻塞线程,转为事件驱动。

总结

我们通过一个例子,展示了zookeeper在实际系统中的应用,通过zookeeper解决了分布式系统的问题。其实以上代码还有很大的优化空间。我能想到如下两点:

1、数据服务器会假死或者变慢,但和zk链接还在,并不会从zk中删除,但已经拖慢了集群的速度。解决此问题,我们可以在数据服务器中加入定时任务,通过定时跑真实业务查询,监控服务器状态,一旦达到设定的红线阈值,强制下线,而不是等到server彻底死掉。

2、检索服务器每个server都监控zookeeper同一个节点,在节点变化时会出现羊群效应。当然,检索服务器如果数量不多还好。其实检索服务器应该通过zookeeper做一个leader选举,只由leader去监控zookeeper节点变化,更新redis中的数据服务器列表缓存即可。

附:完整代码

数据服务端代码

ServiceRegister.java


public class ServiceRegister implements Runnable{
    private ZooKeeper zk;
    private static final String ZNODE = "/sas"; 
    private static final String SA_NODE_PREFIX = "sa_"; 
    private String hostName="localhost:2181"; 
    public void setHostName(String hostName) {
        this.hostName = hostName;
    }
     public ServiceRegister() throws IOException {
        zk = new ZooKeeper(hostName, 10000,null);
    }
  
    @Override
    public void run() {
        try {
             createSaNode();
            synchronized (this) {
                wait();
            }
 
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    //测试用
    public static void main(String[] args){
        try {
            new ServiceRegister().run();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //创建子节点
    private String createSaNode() throws KeeperException, InterruptedException {
        // 如果根节点不存在,则创建根节点
        Stat stat = zk.exists(ZNODE, false);
        if (stat == null) {
            zk.create(ZNODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
 
        String hostName = System.getenv("HOSTNAME");
        // 创建EPHEMERAL_SEQUENTIAL类型节点
        String saPath = zk.create(ZNODE + "/" + SA_NODE_PREFIX,
                hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        return saPath;
    }
}

检索服务端代码

DataMonitor.java


public class DataMonitor implements Watcher, AsyncCallback.ChildrenCallback { 
    ZooKeeper zk;
    String znode; 
    Watcher chainedWatcher; 
    boolean dead;
    DataMonitorListener listener;
     List<String> prevSaIds;
     public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
                       DataMonitorListener listener) {
        this.zk = zk;
        this.znode = znode;
        this.chainedWatcher = chainedWatcher;
        this.listener = listener;
        // 这是整个监控的真正开始,通过获取children节点开始。设置了本对象为监控对象,回调对象也是本对象。以后均是事件驱动。
        zk.getChildren(znode, true, this, null);
    }
 
    
    public interface DataMonitorListener {
        
        void changed(List<String> saIds);
 
        
        void closing(int rc);
    }
 
   
    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() == Event.EventType.None) {
            // We are are being told that the state of the
            // connection has changed
            switch (event.getState()) {
                case SyncConnected:
                    // In this particular example we don't need to do anything
                    // here - watches are automatically re-registered with
                    // server and any watches triggered while the client was
                    // disconnected will be delivered (in order of course)
                    break;
                case Expired:
                    // It's all over
                    dead = true;
                    listener.closing(Code.SESSIONEXPIRED.intValue());
                    break;
            }
        } else {
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                zk.getChildren(znode, true, this, null);
            }
        }
        if (chainedWatcher != null) {
            chainedWatcher.process(event);
        }
    }
 
    //拿到Children节点后的回调函数。
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
        boolean exists;
        switch (rc) {
            case Code.Ok:
                exists = true;
                break;
            case Code.NoNode:
                exists = false;
                break;
            case Code.SessionExpired:
            case Code.NoAuth:
                dead = true;
                listener.closing(rc);
                return;
            default:
                // Retry errors
                zk.getChildren(znode, true, this, null);
                return;
        }
 
        List<String> saIds = null;
 
        //如果存在,再次查询到最新children,此时仅查询,不要设置监控了
        if (exists) {
            try {
                saIds = zk.getChildren(znode,null);
            } catch (KeeperException e) {
                // We don't need to worry about recovering now. The watch
                // callbacks will kick off any exception handling
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        }
 
        //拿到最新saids后,通过listener(executor),加载Saids。
        if ((saIds == null && saIds != prevSaIds)
                || (saIds != null && !saIds.equals(prevSaIds))) {
            listener.changed(saIds);
            prevSaIds = saIds;
        }
    }
}

LoadSaIdsExecutor.java


public class LoadSaIdsExecutor
        implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
 
    private DataMonitor dm;
    private ZooKeeper zk;
    private static final String znode = "/sas"; 
    private String hostName="localhost:2181";
    public void setHostName(String hostName) {
        this.hostName = hostName;
    }
 
        
    public LoadSaIdsExecutor() throws KeeperException, IOException {
        zk = new ZooKeeper(hostName, 300000, this);
        dm = new DataMonitor(zk, znode, null, this);
    }
 
    
    public static void main(String[] args) {
        try {
            new LoadSaIdsExecutor().run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public void run() {
        try {
            synchronized (this) {
                while (!dm.dead) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }
    
    @Override
    public void process(WatchedEvent event) {
        dm.process(event);
    } 
    
    public void closing(int rc) {
        synchronized (this) {
            notifyAll();
        }
    }
 
    
    static class SaIdsLoader extends Thread {
         List<String> saIds = null; 
        //构造对象后直接启动线程
        public SaIdsLoader(List<String> saIds){
            this.saIds = saIds;
            start();
        }
 
        public void run() {
            System.out.println("------------加载开始------------");
            //业务处理的地方
            if(saIds!=null){
                saIds.forEach(id->{
                    System.out.println(id);
                });
            }
            System.out.println("------------加载结束------------");
        }
    }
 
    
    @Override
    public void changed(List<String> data) {
                new SaIdsLoader(data);
    }
}

以上就是ZooKeeper开发实际应用案例实战的详细内容,更多关于ZooKeeper开发应用案例的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

ZooKeeper开发实际应用案例实战

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

PHP开发缓存的实际应用案例分析

PHP开发缓存的实际应用案例分析引言:随着互联网的快速发展,网站的访问量大幅增加。为了提高网站的性能和响应速度,开发人员需要使用缓存来减少数据库查询,加快数据访问速度。本文将重点介绍PHP中缓存的实际应用案例,包括数据缓存和页面缓存,并提供
PHP开发缓存的实际应用案例分析
2023-11-07

PHP中封装性的实际应用案例

导言:封装性是面向对象编程中的重要原则之一,它指的是将类的数据和方法封装在一起,以实现数据的隐藏和保护。在PHP开发中,封装性的应用非常广泛,可以帮助我们创建更加可维护、可扩展和安全的代码。本文将通过具体的案例和代码示例,展示PHP中封装性
2023-10-21

Sphinx PHP 的实际案例与项目应用

引言:在当今互联网时代,随着信息量的爆炸式增长和用户需求的多样化,搜索引擎成为了我们获取所需信息的主要方式之一。为了满足这个需求,全文搜索引擎Sphinx应运而生。而结合PHP语言使用Sphinx,也成为了许多项目的选择。本文将以具体的案例
2023-10-21

C++ 可变参数的实际应用案例

可变参数函数允许函数接受任意数量的参数,可用于处理未知数量的输入。例如,可声明一个函数计算数组中最大值:声明可变参数函数 max,接收一个整型参数 num 和可变参数 ...。初始化 va_list 变量 args,接收可变参数。初始化最大
C++ 可变参数的实际应用案例
2024-04-19

Android多功能时钟开发案例(实战篇)

上一篇为大家介绍的是Android多功能时钟开发基础内容,大家可以回顾一下,Android多功能时钟开发案例(基础篇) 接下来进入实战,快点来学习吧。 一、时钟 在布局文件中我们看到,界面上只有一个TextView,这个TextView的作
2022-06-06

AndroidService开发应用实例

Android的服务是开发Android应用程序的重要组成部分。不同于活动Activity,服务是在后台运行,服务没有接口,生命周期也与活动Activity非常不同。通过使用服务我们可以实现一些后台操作,比如想从远程服务器加载一个网页等,下面来看看详细内容,需要的朋友可以参考下
2022-12-16

Golang应用实践:实战经验与案例分享

Golang应用实践:实战经验与案例分享近年来,作为一门快速发展且备受关注的编程语言,Golang在各个领域的应用越来越广泛。其简洁高效的设计让开发者能够快速地构建稳健的应用程序,同时其并发特性和内置的工具也为解决现代软件开发中的各种挑战
Golang应用实践:实战经验与案例分享
2024-03-05

Go后端开发:入门指南与实战案例

go 后端开发入门指南:安装并设置 go 语言环境。编写第一个后端服务,定义处理请求的函数并启动 http 服务器。集成 mysql 数据库,创建模型、连接数据库并执行查询。Go 后端开发:入门指南和实战案例引言Go 是一种现代、高性能
Go后端开发:入门指南与实战案例
2024-04-08

VUE移动端开发实战案例:用VUE打造一款热门移动应用

随着移动互联网的飞速发展,移动应用开发成为了一项热门的技术。本文将介绍如何使用VUE框架开发一款热门的移动应用,并提供一个实际的开发案例。
VUE移动端开发实战案例:用VUE打造一款热门移动应用
2024-02-27

springBoot+dubbo+zookeeper如何实现分布式开发应用

小编给大家分享一下springBoot+dubbo+zookeeper如何实现分布式开发应用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!环境搭建项目结构图:1.我们首先做好服务端pom.xml
2023-06-29

WebSocket在实时游戏开发中的应用案例

引言:随着网络技术的不断发展,实时游戏的需求也日益增长。传统的HTTP协议在实时游戏的场景下往往无法满足即时性和实时性的要求。而WebSocket作为一种新兴的通信协议,在实时游戏开发中得到了广泛应用。本文将以具体的案例和示例代码来探讨We
2023-10-21

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录