Zookeeper研究和应用

jopen 10年前

zookeeper简介

zookeeper是一个开源分布式的服务,它提供了分布式协作,分布式同步,配置管理等功能. 其实现的功能与google的chubby基本一致.zookeeper的官方网站已经写了一篇非常经典的概述性文章,请大家参阅:ZooKeeper: A Distributed Coordination Service for Distributed Applications
在此我仅花少量笔墨介绍下本文相关的内容。
在zookeeper的集群中,各个节点共有下面3种角色和4种状态:

  • 角色:leader,follower,observer
  • 状态:leading,following,observing,looking

除了observer和observing之外,其它的角色和状态与下面将要介绍的Paoxs算法中的角色与状态一一对应,我们将在下文中具体描述.
observer是zookeeper-3.3版本新添加的一个角色,在这里有相关的介绍. 他们的引入是为了解决zookeeper集群扩大后,由于网络可靠性下降可能导致的拜占庭将军问题. observer的行为在大多数情况下与follower完全一致, 但是他们不参加选举和投票, 而仅仅接受(observing)选举和投票的结果.

zookeeper实现了一个层次名字空间(hierarchal name space)的数据模型, 它特别象一个文件系统, 每个文件被称为znode, 一个znode除了自己包含一些数据外,还能拥有孩子节点.
存在下述的3种类型znode:

  • Persistent Nodes: 永久有效地节点,除非client显式的删除,否则一直存在
  • Ephemeral Nodes: 临时节点,仅在创建该节点client保持连接期间有效,一旦连接丢失,zookeeper会自动删除该节点
  • Sequence Nodes: 顺序节点,client申请创建该节点时,zk会自动在节点路径末尾添加递增序号,这种类型是实现分布式锁,分布式queue等特殊功能的关键

Zookeeper Watch 定义如下:

A watch event is one-time trigger, sent to the client that set the watch, which occurs when the data for which the watch was set changes.

在我看来,watch可以理解为一个分布式的回调,当client关心的znodes发生变化时,zookeeper将会把消息传回到client,并导致client的消息处理函数得到调用.zk的任何一个读操作都能够设置watch,例如:getData(), getChildren(), and exists()
可以watch的event包括如下的二种:

  • KeeperState:Disconnected,SyncConnected,Expired
  • EventType:None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged

这些状态是很容易理解的. watch的实现只言片语没法说清楚,后面我可能会专门写一篇文章讲述这个实现.

Paoxs算法

说到zookeeper,我们不得不提起Paoxs算法Lesile Lamport.
Paoxs 算法是zookeeper的灵魂,这个算法是Leslie Lamport在1990年提出的一种基于消息传递的一致性算法.Paxos 算法解决的问题是一个分布式系统如何就某个值(决议)达成一致。一个典型的场景就是:”在zookeeper cluster中谁是leader?”。
该算法由Leslie于1990年在文章The Part-Time Parliament中首次提出,但是这篇文章相当的晦涩难懂(也有一些轶事,可以看文章链接中Leslie自己写的内容),于是,Lesilie在2001年写下了Paxos Made Simple.他对此解释道:

At the PODC 2001 conference, I got tired of everyone saying how difficult it was to understand the Paxos algorithm, published in [122]. Although people got so hung up in the pseudo-Greek names that they found the paper hard to understand, the algorithm itself is very simple. So, I cornered a couple of people at the conference and explained the algorithm to them orally, with no paper. When I got home, I wrote down the explanation as a short note, which I later revised based on comments from Fred Schneider and Butler Lampson. The current version is 13 pages long, and contains no formula more complicated than n1 > n2.

Paxos Made Simple的abstract只有一句话:

The Paxos algorithm, when presented in plain English, is very simple.

可见这位Lamport老兄是多么的有意思. 顺便说一句,这位老哥就是LaTex中的”La”.
在上文中是这样描述Paoxs算法执行过程的:

Phase 1.
(a) A proposer selects a proposal number n and sends a prepare request with number n to a majority of acceptors.
(b) If an acceptor receives a prepare request with number n greater than that of any prepare request to which it has already responded, then it responds to the request with a promise not to accept any more proposals numbered less than n and with the highest-numbered proposal (if any) that it has accepted.
Phase 2.
(a) If the proposer receives a response to its prepare requests (numbered n) from a majority of acceptors, then it sends an accept request to each of those acceptors for a proposal numbered n with a value v, where v is the value of the highest-numbered proposal among the responses, or is any value if the responses reported no proposals.
(b) If an acceptor receives an accept request for a proposal numbered n, it accepts the proposal unless it has already responded to a prepare request having a number greater than n.

这几乎就是Paxos的全部了.具体的执行过程举例可以在Zookeeper全解析――Paxos作为灵魂中找到,在此不再赘述.
Zookeeper 完全实现了Paoxs算法,zk cluster中每个节点都保持了一份完整的数据模型,当任何一个client通过某集群节点向集群发起读写请求时,该节点会向Leader节点发出投票 请求,如果投票通过(超过一半节点同意)则该请求被执行,否则该请求被驳回. 通过paoxs算法,zookeeper的保持了数据模型的一致性,同时保持了任何操作的原子性.

分布式选举

介绍完了Paoxs算法, 分布式选举几乎是顺理成章的, 因为分布式选举不过是Paoxs算法的一次或者若干次执行, 所不同的只是proposal内容为:”谁是Leader”.下面这两个图解释了zookeeper集群在正常工作和选举时各个节点状态的异同:

zookeeper状态示意图

zookeeper状态示意图

zookeeper采用org.apache.zookeeper.server.quorum.FastLeaderElection作为其缺省选举算法,关于这个算法的具体执行流程可以参考淘宝核心系统段飞同学的文章“paxos 实现”.或者也可以直接阅读源代码. zookeeper源代码量不大,结构清晰,注释充分,阅读体验超好~ 我就不在这里越俎代庖了.

zookeeper应用

拥有了zookeeper如此强大的分布式协作系统后,我们可以很容易的实现大量的分布式应用,包括了分布式锁,分布式队列,分布式Barrier,双阶段提交等等. 这些应用可以帮我们改进很多复杂系统的协作方式,将这些系统的实现变得更加优雅而高效.
鉴于篇幅,本文仅介绍分布式锁的实现.
利用了前文提到的sequence nodes可以非常容易的实现分布式锁. 实现分布式锁的基本步骤如下(这些步骤需要在所有需要锁的客户端执行):

  1. client调用create()创建名为”_locknode_/lock-”的节点,注意需要设置sequence和ephemeral属性
  2. client调用getChildren(“_locknode_”),注意不能设置watch,这样才能避免羊群效应
  3. 如果步骤1中创建的节点序号最低,则该client获得锁,开始执行其它程序
  4. client对lock-xxx中序号仅次于自己创建节点的那个节点调用exists(),并设置watch
  5. 如果exist()返回false(节点不存在)则回到步骤2,否则等待步骤4中的watch被触发并返回步骤2

分布式锁在zookeeper的源代码中已经有实现,可以参考org.apache.zookeeper.recipes.lock

下面是一个使用分布式锁的样例,这段程序摘自一个hadoop reduce的configure函数, 使用分布式锁的目的是确保一台机器上的所有reduce进程中,只有一个reduce进程会执行某些初始化代码. 同时其它reduce在总和初始化完成之前不会继续执行.


以下是代码片段:
class zkWatcher implements Watcher {
     //watch回调函数
    public void process(WatchedEvent event) {
         if (event.getType() == EventType.NodeCreated) {
            if (event.getPath() == "balbalbal.init_done"
            //如果回调信息是节点创建,且创建的节点是init成功节点,则触发latch
                  gcihInitLatch.countDown();
        } else if (event.getState() == KeeperState.SyncConnected) {
            //server连接成功,触发连接成功latch
            zkConnectedLatch.countDown();
         }
    }
}
public void configure(String conf) {
    try {
        //zookeeper服务器列表,节点间用,分隔
        String keepers = "zk_server0:port,zk_server1:port,zk_server2:port";
        String Init_Done = "/full-dump-gcih/"
                + InetAddress.getLocalHost().getHostName() + ".init_done";
        String HostName = InetAddress.getLocalHost().getHostName();
        // 初始化一个Watch
        zkWatcher zkw = new zkWatcher();
        //异步创建连接, 并设置zkw为watch回调
        ZooKeeper zk = new ZooKeeper(keepers, 5000, zkw);
        //等待zookeeper创建连接成功
        zkConnectedLatch.await();
        //创建分布式锁
        WriteLock gcih_lock = new WriteLock(zk, "/full-dump-gcih/" + HostName, null);
        //检测初始化成功标识是否存在,并设置watch
        if (null == zk.exists(Init_Done, true)) {
            // if the init_done node not exists we try to init
            if (gcih_lock.lock()) {
                //获取锁成功,初始化数据
                initializeData(conf);
                //创建初始化成功标识,注意这个标志是永久节点
                zk.create(Init_Done, null, Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT);
                //工作完成,释放锁
                gcih_lock.unlock();
            } else {
                //未获取锁,说明已经有reduce在做初始化了,释放锁
                gcih_lock.unlock();
                if (!gcihInitLatch.await(30, TimeUnit.MINUTES))
                    throw new IOException(
                            "Init UDP time out, critical error");
                else {
                    //latch成功返回,说明the one 初始化成功了
                    initializeData(null);
                }
            }
        } else {// if init_done exists we simply load data from gcih
            initializeData(null);
        }
     } catch (Exception e) {
        .....
    }
  }

多个reduce分别获取锁后,加锁节点的子节点信息如下所示


以下是引用片段:
[zk: localhost:2181(CONNECTED) 31] ls /full-dump-gcih/xxxxx.cm2
[x-84692699318388014-0000000001, x-84692699318387993-0000000000]

这些节点全部是Sequence+Ephemeral 属性的节点, 其中


以下是引用片段:
x-84692699318388014-000000000
name-zk_session_id-sequence_number

这个节点名称是org.apache.zookeeper.recipes.lock中使用的名称,可以根据需要自己重新实现相关代码,进而设计一个专用的锁.
关于Zookeeper更多的应用请参阅ZooKeeper Recipes and Solutions