Python与ZooKeeper集群连接
由于项目的需要,需要学习Python客户端连接ZooKeeper集群,并实现创建临时节点、获得指定的路径下的信息、监听子节点变化的功能。
环境配置
ZooKeeper集群的安装可以参考http://blog.csdn.net/mrbcy/article/details/54767484
使用下面的命令安装kazoo
pip install kazoo
基本使用
这一部分可参考官方文档:http://kazoo.readthedocs.io/en/latest/basic_usage.htm
监听子节点变化
下面的代码实现了创建一个临时、顺序的节点,并且可以监听子节点的变化。
#-*- coding: utf-8 -*-
import time
from kazoo.client import KazooClient
from kazoo.recipe.watchers import ChildrenWatch
class ValidatorDetector:
def __init__(self):
self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
self.validator_children_watcher = ChildrenWatch(client=self.zk,path='/mproxy/validators',func=self.validator_watcher_fun)
self.zk.start()
def validator_watcher_fun(self,children):
print "The children now are:", children
def create_node(self):
self.zk.create('/mproxy/validators/validator',b'validator_huabei_1',ephemeral=True,sequence=True,makepath=True)
def __del__(self):
self.zk.close()
if __name__ == '__main__':
detector = ValidatorDetector()
detector.create_node()
time.sleep(10)
ZooKeeper原生提供了监听节点变化及值的变化的API。关于这一部分可以参考http://blog.csdn.net/mrbcy/article/details/54790758。但是这些API只能生效一次,一旦被触发过一次以后就不会再触发了,除非再次注册。而kazoo则在这个基础上封装了更上层的API,可以持续的触发。这就是上面的ChildrenWatch,除此之外kazoo还封装了一个DataWatch,用于监听数据的变化。下面我们也会用到。
kazoo还实现了自动续订功能,使得在会话过期后我们不需要再次初始化ZooKeeper客户端(这里可以参考http://blog.csdn.net/mrbcy/article/details/55062713),也是非常方便的。
注册验证器
有了上面的知识就可以做一个注册类和一个监测类了。
#-*- coding: utf-8 -*-
import threading
import time
from kazoo.client import KazooClient
from kazoo.protocol.states import KazooState
class InfoKeeper(threading.Thread):
def __init__(self,register):
threading.Thread.__init__(self)
self.register=register
def run(self):
time.sleep(0.25)
if self.register.zk_node is None:
print "create method has not been called"
return
check_result = self.register.zk.exists(self.register.validator_path)
if check_result is None:
# redo the regist
print "redo the regist"
self.register.regist()
else:
print "the path remain exists"
class ValidatorRegister:
def __init__(self):
self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
self.zk_node = None
self.validator_path = '/mproxy/validators/'
self.zk.add_listener(self.conn_state_watcher)
self.zk.start()
def __del__(self):
self.zk.close()
def regist(self):
self.zk_node = self.zk.create(self.validator_path + 'validator',bytes('validator_huabei_1'),ephemeral=True,sequence=True,makepath=True)
def close(self):
self.zk.stop()
self.zk.close()
def conn_state_watcher(self, state):
if state == KazooState.CONNECTED:
print "Now connected"
if self.zk_node is None:
print "create method has not been called"
return
info_keeper = InfoKeeper(self)
info_keeper.start()
elif state == KazooState.LOST:
print "Now lost"
else:
print "Now suspended"
监测类:
#-*- coding: utf-8 -*-
import time
from kazoo.client import KazooClient
from kazoo.recipe.watchers import ChildrenWatch
class ValidatorDetector:
def __init__(self):
self.validator_path = '/mproxy/validators/'
self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
self.validator_children_watcher = ChildrenWatch(client=self.zk,path=self.validator_path,func=self.validator_watcher_fun)
self.zk.start()
def validator_watcher_fun(self,children):
for child in children:
validator_name = self.zk.get(path=self.validator_path + str(child))
print validator_name[0]
print "The children now are:", children
def __del__(self):
self.zk.close()
注册类这里稍微复杂了一点,做了一个在会话过期后重新注册的机制,如果会话过期,重新注册之前的注册信息。
监听子节点值的变化
嗯,这个需求仔细想过后可以通过监听子节点的变化来代替,所以暂时不实现了。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341