Zookeeper--概述及应用

  ZookeeperHadoop 的分布式协调服务,起源于Google的Chubby。分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。Zookeeper 可以保证数据在Zookeeper 集群之间的数据的事务性一致。

总体概述

ZooKeeper总体架构

  1. 组成: ZooKeeper集群即多个Server节点其中有一个Leader的节点,和多个Follower组成
  2. 写请求: 当客户端Client执行写请求时,会发送到Leader节点上,然后Leader节点上数据变更会同步到集群中其他的Follower节点。Leader节点在接收到数据变更请求后,首先将变更写入本地磁盘,以作恢复之用。当所有的写请求持久化到磁盘以后,才会将变更应用到内存中。
  3. 协议: ZooKeeper使用了一种自定义的原子消息协议,在消息层的这种原子特性,保证了整个协调系统中的节点数据或状态的一致性和本地的ZooKeeper数据与Leader节点同步。
  4. 选举: 当Leader节点发生故障,消息层负责重新选举出Leader,继续作为协调服务集群的中心,处理客户端写请求。

数据模型

Zookeeper数据模型

  1. ZooKeeper数据模型的结构与Linux文件系统很类似,树结构,每个节点称做一个ZNode
  2. 每个ZNode都可以通过其路径唯一标识,每个ZNode上可存储少量数据(默认是1M, 可以通过配置修改, 通常不建议在ZNode上存储大量的数据)。
  3. 每个ZNode还拥有自身的一些信息,包括:数据、数据长度、创建时间、修改时间等Znode中的数据可以有多个版本,比如某一个路径下存有多个数据版本,那么查询这个路径下的数据就需要带上版本。
  4. 每当Znode中的数据更新后它所维护的版本号将增加,每一个Znode的数据将被原子地读写。读操作将读取与Znode相关的所有数据,写操作将替换掉所有的数据。
  5. znode 的目录名可以自动编号,如 ZK_1 已经存在,再创建的话,将会自动命名为 ZK_2
  • 永久节点
    永久节点一经创建就永久保留了,就像我们在文件系统上创建一个普通文件,这个文件的生命周期跟创建它的应用没有任何关系。

  • 临时节点
    一个session回话创建临时节点后,该会话过期之后,临时节点就会被zookeeper自动删除。我们可以很好的利用该特性,做一些集群感知。

ZooKeeper特性

  1. 读、写(更新)模式
    ZooKeeper集群中,读可以从任意一个ZooKeeper Server读,这一点是保证ZooKeeper比较好的读性能的关键;写的请求会先Forwarder到Leader,然后由Leader来通过ZooKeeper中的原子广播协议,将请求广播给所有的Follower,Leader收到一半以上的写成功的Ack后,就认为该写成功了,就会将该写进行持久化,并告诉客户端写成功了。

  2. WAL和Snapshot
    和大多数分布式系统一样,ZooKeeper也有WAL(Write-Ahead-Log),对于每一个更新操作,ZooKeeper都会先写WAL, 然后再对内存中的数据做更新,然后向Client通知更新结果。另外,ZooKeeper还会定期将内存中的目录树进行Snapshot,落地到磁盘上,这个跟HDFS中的FSImage是比较类似的。这么做的主要目的,一当然是数据的持久化,二是加快重启之后的恢复速度,如果全部通过Replay WAL的形式恢复的话,会比较慢。

  3. FIFO
    对于每一个ZooKeeper客户端而言,所有的操作都是遵循FIFO顺序的,这一特性是由下面两个基本特性来保证的:一是ZooKeeper Client与Server之间的网络通信是基于TCP,TCP保证了Client/Server之间传输包的顺序;二是ZooKeeper Server执行客户端请求也是严格按照FIFO顺序的。

  4. Linearizability
    在ZooKeeper中,所有的更新操作都有严格的偏序关系,更新操作都是串行执行的,这一点是保证ZooKeeper功能正确性的关键。

Client API

  1. create(path, data, flags): 创建一个ZNode, path是其路径,data是要存储在该ZNode上的数据,flags常用的有: PERSISTEN,PERSISTENT_SEQUENTAIL, EPHEMERAL, EPHEMERAL_SEQUENTAIL。
  2. delete(path, version): 删除一个ZNode,可以通过version删除指定的版本, 如果version是-1的话,表示删除所有的版本。
  3. exists(path, watch): 判断指定ZNode是否存在,并设置是否Watch这个ZNode。这里如果要设置Watcher的话,Watcher是在创建ZooKeeper实例时指定的,如果要设置特定的Watcher的话,可以调用另一个重载版本的exists(path, watcher)。以下几个带watch参数的API也都类似。
  4. getData(path, watch): 读取指定ZNode上的数据,并设置是否watch这个ZNode。
  5. setData(path, watch): 更新指定ZNode的数据,并设置是否Watch这个ZNode。
  6. getChildren(path, watch): 获取指定ZNode的所有子ZNode的名字,并设置是否Watch这个ZNode。
  7. sync(path): 把所有在sync之前的更新操作都进行同步,达到每个请求都在半数以上的ZooKeeper Server上生效。path参数目前没有用。
  8. setAcl(path, acl): 设置指定ZNode的Acl信息。
  9. getAcl(path): 获取指定ZNode的Acl信息。

应用场景

Curator

  • Curator框架,一个流行的zookeeper的客户端,提供了一套高级的API,简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制。下面示例代码将使用Curator。官方文档:http://curator.apache.org/

Curator

详细讲解可查看该博客http://supben.iteye.com/blog/2094077

示例代码

  • 示例代码: 创建节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public void createNode() throws Exception {

    //1. 指定zk集群的地址
    String connectString = "192.168.3.220:2181,192.168.3.221:2181,192.168.3.222:2181";
    //1000 :代表是重试时间间隔 3:表示是重试次数
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

    //2. 使用curator创建一个zk链接
    int sessionTimeoutMs = 5000;//这个值必须在4s--40s之间,表示是链接失效的时间
    int connectionTimeoutMs = 1000;//链接超时时间
    CuratorFramework client = CuratorFrameworkFactory.
    newClient(connectString, sessionTimeoutMs , connectionTimeoutMs , retryPolicy);

    //3. 启动链接
    client.start();

    InetAddress localHost = InetAddress.getLocalHost();
    String ip = localHost.getHostAddress();

    /**
    * 4. 创建节点
    * EPHEMERAL 临时节点
    * EPHEMERAL_SEQUENTIAL 临时有序
    * PERSISTENT 永久节点
    * PERSISTENT_SEQUENTIAL 永久有序
    */
    client.create()
    .creatingParentsIfNeeded()//如果父节点不存在,则创建,这时创建的父节点是永久节点
    .withMode(CreateMode.EPHEMERAL)//指定节点类型
    .withACL(Ids.OPEN_ACL_UNSAFE)//指定节点的权限信息
    .forPath("/spider/"+ip);//指定节点名称


    //这里只是避免创建临时节点时,程序一结束很快就消失了,看不了效果
    Thread.sleep(10000);

    //当创建一个临时节点时,如果程序结果,该节点会过sessionTimeoutMs的时间后消失!
    }
  • 示例代码:监视器,在这里主要举例一个zookeeper通过利用临时节点session失效特性做一个监控功能。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84

    /**
    * 创建一个监视器,这个监视器需要实现watcher接口
    * 接口中有一个process方法。
    * 当监视器发现监视的节点发生变化的时候,这个process方法会被调用
    *
    * 所以这个监视器是一个守护进程,也就是说一个永远不会停止的进程,类似于死循环
    * Created by xiaoxiaomo on 2016/5/5.
    */
    public class SpiderWatcher implements Watcher {

    CuratorFramework client ;
    List<String> chiList ;
    List<String> newChiList ;

    public SpiderWatcher() {

    //指定zk集群的地址
    String connectStr = "192.168.3.220:2181,192.168.3.221:2181,192.168.3.222:2181";

    //1000 :代表是重试时间间隔 3:表示是重试次数
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3) ;

    //使用curator创建一个zk链接
    int sessionTimeoutMs = 3000 ; //这个值必须在4s--40s之间,表示是链接失效的时间
    int connectionTimeoutMs = 4000 ;//链接超时时间
    client = CuratorFrameworkFactory.
    newClient(connectStr, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);

    client.start(); //开启这个链接

    try {
    //使用SpiderWater监视Spider节点下面节点的所有变化
    //(想spider节点注册监视器,监视器需要重复注册)
    chiList = client.getChildren().usingWatcher(this).forPath("/spider");

    } catch (Exception e) {
    e.printStackTrace();
    }

    }

    @Override
    public void process(WatchedEvent event) {
    try {

    //重复注册监视器
    newChiList = client.getChildren().usingWatcher(this).forPath("/spider");
    for (String s : chiList) {
    if( !newChiList.contains(s) ){
    System.out.println("节点消失:"+s);
    //给管理员发送短信,或者邮件
    //发短信的话可以使用一些第三方平台 云片网
    //发邮件的话使用 javamail
    }
    }

    for (String s : chiList) {
    if( !chiList.contains(s) ){
    System.out.println("节点新增:"+s);
    }
    }

    this.chiList = newChiList ;

    } catch (Exception e) {
    e.printStackTrace();
    }

    // System.out.println("节点发生变化,"+event);
    }

    public void start(){
    //为了保证让这个方法一直运行
    while (true){
    ;
    }
    }

    public static void main(String[] args) {
    SpiderWatcher spiderWatcher = new SpiderWatcher();
    spiderWatcher.start();
    }
    }
  • 参考资料
    http://curator.apache.org/
    http://www.blogjava.net/BucketLi/archive/2010/12/21/341268.html
    http://blog.csdn.net/xinguan1267/article/details/38422149

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器