添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
老实的牛腩  ·  @SuppressLint("NotifyD ...·  1 年前    · 
睡不着的跑步鞋  ·  python 函数备注 ...·  2 年前    · 
潇洒的课本  ·  A malware : ...·  2 年前    · 
python&大数据
首发于 python&大数据

python操作zookeeper

1.前言

zookeeper是一个用于维护配置信息、命名、提供分布式同步和提供组服务。它自身是高可用的,只要宕机节点不达到半数,zookeeper服务都不会离线。zookeeper为实现分布式锁,分布式栅栏,分布式队列,安全的配置存储交换,在线状态监控,选举提供了坚实的基础。

在hadoop环境中,zookeeper被广泛应用。hadoop 高可用是依赖zookeeper的实现的。Hbase,storm,kafka都强依赖zookeeper,没有zookeeper根本都运行不起来。阿里的微服务治理框架dubbo也是依赖zookeeper的。

python访问zookeeper使用的的模块是kazoo。

本文主要介绍kazoo操作zookeeper,并不会讨论zookeeper的原理和使用方法。

2.环境

  1. zookeeper版本:3.4.5-cdh5.14.0
  2. kazoo 2.4.0

3.示例代码

3.1 连接到zookeeper并获取节点及数据

#!/usr/bin/python
# -*- coding: UTF-8 -*-
Created on 2018.2.27
@author: laofeng
@note: kazoo 访问  zookeeper
import sys
from kazoo.client import KazooClient,KazooState
import logging
logging.basicConfig(
    level=logging.DEBUG
    ,stream=sys.stdout
    ,format='%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s')
#创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
    hosts='172.16.21.23:2181,172.16.21.24:2181,172.16.21.25:2181'
    ,timeout=10.0  #连接超时时间
    , logger=logging #传一个日志对象进行,方便 输出debug日志
#开始心跳
zk.start()
#获取根节点数据和状态
data, stat = zk.get('/')
print data   #这行没有输出,‘/’根节点,并没有数据
print stat   
这个是stat的输出:
ZnodeStat(czxid=0, mzxid=0, ctime=0, mtime=0, version=0, cversion=8448, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=4, pzxid=4295036257L)
ZnodeState的属性列表:
czxid : 创建这个节点时的zxid
mzxid : 修改这个节点时的zxid
ctime : 创建时间
mtime : 修改时间
version : 数据被修改的次数
cversion: 子节点被修改的次数
aversion: acl被改变的次数
ephemeralOwner:临时节点创建的用户,如果不是临时节点值为0
dataLength:节点数据长度
numChildren:子节点的数量
pzxid:子节点被修改的zxid
#获取根节点的所有子节点,返回的是一个列表,只有子节点的名称
children = zk.get_children("/");
print children
#下面是根节点的返回值
#[u'rmstore', u'kazoo', u'yarn-leader-election', u'zookeeper']
#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()

3.2 遍历所有子节点的函数,

#!/usr/bin/python
# -*- coding: UTF-8 -*-
Created on 2018.2.27
@author: laofeng
@note: kazoo 访问  zookeeper
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from kazoo.client import KazooClient,KazooState
import logging
logging.basicConfig(
    level=logging.DEBUG
    ,stream=sys.stdout
    ,format='%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s')
#递归遍历所有节点的子节点函数,_zk是KazooClient的对象,node是节点名称字符串,func是回调函数
def zk_walk(_zk, node, func):
    data, stat = _zk.get(node)
    children = _zk.get_children(node)
    func(node, data, stat, children);
    if len(children) > 0:
        for sub in children:
            sub_node = ''
            if node != '/':
                sub_node = node + '/' + sub
            else:
                sub_node = '/' + sub
            zk_walk(_zk, sub_node, func)
#测试zk_walk的打印回调函数,只是把所有数据都打印出来
def printZNode(node,  data, stat, children):
    print("node  : " + node)
    print("data  : " + str(data))
    print("stat  : " + str(stat))
    print("child : " + str(children))
    print '\n'
#创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
    hosts='172.16.21.23:2181,172.16.21.24:2181,172.16.21.25:2181'
    ,timeout=10.0  #连接超时间
#开始心跳
zk.start()
#遍历谋个节点的所有子节点
zk_walk(zk, '/', printZNode)
#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()


打印结果:因为有二进制数据,所以一些乱码在

node  : /
data  : 
stat  : ZnodeStat(czxid=0, mzxid=0, ctime=0, mtime=0, version=0, cversion=8622, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=4, pzxid=4295037672L)
child : [u'rmstore', u'kazoo', u'yarn-leader-election', u'zookeeper']
node  : /rmstore
data  : None
stat  : ZnodeStat(czxid=4295013289L, mzxid=4295013289L, ctime=1519892893473L, mtime=1519892893473L, version=0, cversion=1, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=1, pzxid=4295013291L)
child : [u'ZKRMStateRoot']
node  : /rmstore/ZKRMStateRoot
data  : None
stat  : ZnodeStat(czxid=4295013291L, mzxid=4295013291L, ctime=1519892893488L, mtime=1519892893488L, version=0, cversion=2573, aversion=2, ephemeralOwner=0, dataLength=0, numChildren=5, pzxid=4295034705L)
child : [u'AMRMTokenSecretManagerRoot', u'RMAppRoot', u'EpochNode', u'RMVersionNode', u'RMDTSecretManagerRoot']
node  : /rmstore/ZKRMStateRoot/AMRMTokenSecretManagerRoot
data  : 
••�ˣ������•••:
���•3
stat  : ZnodeStat(czxid=4295013299L, mzxid=4295013303L, ctime=1519892893729L, mtime=1519892893882L, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=23, numChildren=0, pzxid=4295013299L)
child : []
node  : /rmstore/ZKRMStateRoot/RMAppRoot
data  : None
stat  : ZnodeStat(czxid=4295013294L, mzxid=4295013294L, ctime=1519892893677L, mtime=1519892893677L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=4295013294L)
child : []
node  : /rmstore/ZKRMStateRoot/EpochNode
data  : ••
stat  : ZnodeStat(czxid=4295013302L, mzxid=4295034703L, ctime=1519892893824L, mtime=1519969389707L, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=2, numChildren=0, pzxid=4295013302L)
child : []
node  : /rmstore/ZKRMStateRoot/RMVersionNode
data  : ••••
stat  : ZnodeStat(czxid=4295013301L, mzxid=4295013301L, ctime=1519892893761L, mtime=1519892893761L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=4, numChildren=0, pzxid=4295013301L)
child : []
node  : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot
data  : None
stat  : ZnodeStat(czxid=4295013295L, mzxid=4295013295L, ctime=1519892893687L, mtime=1519892893687L, version=0, cversion=3, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=3, pzxid=4295013298L)
child : [u'RMDTSequentialNumber', u'RMDTMasterKeysRoot', u'RMDelegationTokensRoot']
node  : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTSequentialNumber
data  : None
stat  : ZnodeStat(czxid=4295013298L, mzxid=4295013298L, ctime=1519892893721L, mtime=1519892893721L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=4295013298L)
child : []
node  : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot
data  : None
stat  : ZnodeStat(czxid=4295013296L, mzxid=4295013296L, ctime=1519892893696L, mtime=1519892893696L, version=0, cversion=4, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=4, pzxid=4295034705L)
child : [u'DelegationKey_1', u'DelegationKey_3', u'DelegationKey_2', u'DelegationKey_4']
node  : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_1
data  : •�•b    ��••"��>L��
stat  : ZnodeStat(czxid=4295013304L, mzxid=4295013304L, ctime=1519892893907L, mtime=1519892893907L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=17, numChildren=0, pzxid=4295013304L)
child : []
node  : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_3
data  : •�•b•o,�•#���•�V�
stat  : ZnodeStat(czxid=4295034704L, mzxid=4295034704L, ctime=1519969389826L, mtime=1519969389826L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=17, numChildren=0, pzxid=4295034704L)
child : []
node  : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_2
data  : •�•b    ���•••jd••��
stat  : ZnodeStat(czxid=4295013305L, mzxid=4295013305L, ctime=1519892893929L, mtime=1519892893929L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=17, numChildren=0, pzxid=4295013305L)
child : []
node  : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_4
data  : •�•b•o-••wM��@�W



3.3 监控节点的变化

两种方式,使用注解和在代码中调用

3.3.1 使用注解的方式

@zk.DataWatch("/kazoo") #当节点kazoo的数据变化时这个函数会被调用
def watch_node(data, stat):
    #如果节点被删除这个函数也会被调用,但是data和stat都是None
    if stat and data:
        print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
    else:
        print "节点已经被删除"
@zk.ChildrenWatch("/kazoo") #观察子节点的变化
def watch_children(children):
    print("Children are now: %s" % children)
# Above function called immediately, and from then on
        

3.3.2编程的方式监听节点变化

def monitor(event):
    print type(event)
    print event
#当rm_node产生某些变化时会出发这个函数调用,但是只能出发一次    
if not zk.exists(rm_node, monitor):
    zk.create(rm_node, "123",  makepath=True)
#输出信息如下,event的类型和属性
<class 'kazoo.protocol.states.WatchedEvent'>
WatchedEvent(type='CHANGED', state='CONNECTED', path=u'/kazoo')
'''


4.其他API

创建节点,删除节点,修改节点的数据,事务api

'''
create 节点
递归创建所有层级的节点,只能设置acl,不能设置data
zk.ensure_path("/my/favorite")
创建节点并设置data
zk.create("/my/favorite/node", b"a value")
    zk.exists()
    zk.get()
    zk.get_children()
    zk.set()
    zk.delete()
重试&自定义重试
retry,可以多次重复执行一个方法,直到成功,这个函数真是赞
result = zk.retry(zk.get, "/path/to/node")
from kazoo.retry import KazooRetry
kr = KazooRetry(max_tries=3, ignore_expire=False)
result = kr(client.get, "/some/path")