Zookeeper--概述及应用
Zookeeper 是 Hadoop
的分布式协调服务,起源于Google的Chubby。分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。Zookeeper 可以保证数据在Zookeeper 集群之间的数据的事务性一致。
总体概述
- 组成: ZooKeeper集群即多个Server节点,其中有一个Leader的节点,和多个Follower组成。
- 写请求: 当客户端Client执行写请求时,会发送到Leader节点上,然后Leader节点上数据变更会同步到集群中其他的Follower节点。Leader节点在接收到数据变更请求后,首先将变更写入本地磁盘,以作恢复之用。当所有的写请求持久化到磁盘以后,才会将变更应用到内存中。
- 协议: ZooKeeper使用了一种自定义的原子消息协议,在消息层的这种原子特性,保证了整个协调系统中的节点数据或状态的一致性和本地的ZooKeeper数据与Leader节点同步。
- 选举: 当Leader节点发生故障,消息层负责重新选举出Leader,继续作为协调服务集群的中心,处理客户端写请求。
数据模型
- ZooKeeper数据模型的结构与Linux文件系统很类似,树结构,每个节点称做一个ZNode。
- 每个ZNode都可以通过其路径唯一标识,每个ZNode上可存储少量数据(默认是1M, 可以通过配置修改, 通常不建议在ZNode上存储大量的数据)。
- 每个ZNode还拥有自身的一些信息,包括:数据、数据长度、创建时间、修改时间等Znode中的数据可以有多个版本,比如某一个路径下存有多个数据版本,那么查询这个路径下的数据就需要带上版本。
- 每当Znode中的数据更新后它所维护的版本号将增加,每一个Znode的数据将被原子地读写。读操作将读取与Znode相关的所有数据,写操作将替换掉所有的数据。
- znode 的目录名可以自动编号,如 ZK_1 已经存在,再创建的话,将会自动命名为 ZK_2
永久节点
永久节点一经创建就永久保留了,就像我们在文件系统上创建一个普通文件,这个文件的生命周期跟创建它的应用没有任何关系。临时节点
一个session回话创建临时节点后,该会话过期之后,临时节点就会被zookeeper自动删除。我们可以很好的利用该特性,做一些集群感知。
ZooKeeper特性
读、写(更新)模式
在ZooKeeper集群
中,读可以从任意一个ZooKeeper Server读,这一点是保证ZooKeeper比较好的读性能的关键;写的请求会先Forwarder到Leader,然后由Leader来通过ZooKeeper中的原子广播协议,将请求广播给所有的Follower,Leader收到一半以上的写成功的Ack后,就认为该写成功了,就会将该写进行持久化,并告诉客户端写成功了。WAL和Snapshot
和大多数分布式系统一样,ZooKeeper也有WAL(Write-Ahead-Log),对于每一个更新操作,ZooKeeper都会先写WAL, 然后再对内存中的数据做更新,然后向Client通知更新结果。另外,ZooKeeper还会定期将内存中的目录树进行Snapshot,落地到磁盘上,这个跟HDFS中的FSImage是比较类似的。这么做的主要目的,一当然是数据的持久化,二是加快重启之后的恢复速度,如果全部通过Replay WAL的形式恢复的话,会比较慢。FIFO
对于每一个ZooKeeper客户端而言,所有的操作都是遵循FIFO顺序的,这一特性是由下面两个基本特性来保证的:一是ZooKeeper Client与Server之间的网络通信是基于TCP,TCP保证了Client/Server之间传输包的顺序;二是ZooKeeper Server执行客户端请求也是严格按照FIFO顺序的。Linearizability
在ZooKeeper中,所有的更新操作都有严格的偏序关系,更新操作都是串行执行的,这一点是保证ZooKeeper功能正确性的关键。
Client API
- create(path, data, flags): 创建一个ZNode, path是其路径,data是要存储在该ZNode上的数据,flags常用的有: PERSISTEN,PERSISTENT_SEQUENTAIL, EPHEMERAL, EPHEMERAL_SEQUENTAIL。
- delete(path, version): 删除一个ZNode,可以通过version删除指定的版本, 如果version是-1的话,表示删除所有的版本。
- exists(path, watch): 判断指定ZNode是否存在,并设置是否Watch这个ZNode。这里如果要设置Watcher的话,Watcher是在创建ZooKeeper实例时指定的,如果要设置特定的Watcher的话,可以调用另一个重载版本的exists(path, watcher)。以下几个带watch参数的API也都类似。
- getData(path, watch): 读取指定ZNode上的数据,并设置是否watch这个ZNode。
- setData(path, watch): 更新指定ZNode的数据,并设置是否Watch这个ZNode。
- getChildren(path, watch): 获取指定ZNode的所有子ZNode的名字,并设置是否Watch这个ZNode。
- sync(path): 把所有在sync之前的更新操作都进行同步,达到每个请求都在半数以上的ZooKeeper Server上生效。path参数目前没有用。
- setAcl(path, acl): 设置指定ZNode的Acl信息。
- getAcl(path): 获取指定ZNode的Acl信息。
应用场景
Curator
- Curator框架,一个流行的zookeeper的客户端,提供了一套高级的API,简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制。下面示例代码将使用Curator。官方文档:http://curator.apache.org/
详细讲解可查看该博客http://supben.iteye.com/blog/2094077
示例代码
示例代码: 创建节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public void createNode() throws Exception {
//1. 指定zk集群的地址
String connectString = "192.168.3.220:2181,192.168.3.221:2181,192.168.3.222:2181";
//1000 :代表是重试时间间隔 3:表示是重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//2. 使用curator创建一个zk链接
int sessionTimeoutMs = 5000;//这个值必须在4s--40s之间,表示是链接失效的时间
int connectionTimeoutMs = 1000;//链接超时时间
CuratorFramework client = CuratorFrameworkFactory.
newClient(connectString, sessionTimeoutMs , connectionTimeoutMs , retryPolicy);
//3. 启动链接
client.start();
InetAddress localHost = InetAddress.getLocalHost();
String ip = localHost.getHostAddress();
/**
* 4. 创建节点
* EPHEMERAL 临时节点
* EPHEMERAL_SEQUENTIAL 临时有序
* PERSISTENT 永久节点
* PERSISTENT_SEQUENTIAL 永久有序
*/
client.create()
.creatingParentsIfNeeded()//如果父节点不存在,则创建,这时创建的父节点是永久节点
.withMode(CreateMode.EPHEMERAL)//指定节点类型
.withACL(Ids.OPEN_ACL_UNSAFE)//指定节点的权限信息
.forPath("/spider/"+ip);//指定节点名称
//这里只是避免创建临时节点时,程序一结束很快就消失了,看不了效果
Thread.sleep(10000);
//当创建一个临时节点时,如果程序结果,该节点会过sessionTimeoutMs的时间后消失!
}示例代码:监视器,在这里主要举例一个zookeeper通过利用临时节点session失效特性做一个监控功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 创建一个监视器,这个监视器需要实现watcher接口
* 接口中有一个process方法。
* 当监视器发现监视的节点发生变化的时候,这个process方法会被调用
*
* 所以这个监视器是一个守护进程,也就是说一个永远不会停止的进程,类似于死循环
* Created by xiaoxiaomo on 2016/5/5.
*/
public class SpiderWatcher implements Watcher {
CuratorFramework client ;
List<String> chiList ;
List<String> newChiList ;
public SpiderWatcher() {
//指定zk集群的地址
String connectStr = "192.168.3.220:2181,192.168.3.221:2181,192.168.3.222:2181";
//1000 :代表是重试时间间隔 3:表示是重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3) ;
//使用curator创建一个zk链接
int sessionTimeoutMs = 3000 ; //这个值必须在4s--40s之间,表示是链接失效的时间
int connectionTimeoutMs = 4000 ;//链接超时时间
client = CuratorFrameworkFactory.
newClient(connectStr, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
client.start(); //开启这个链接
try {
//使用SpiderWater监视Spider节点下面节点的所有变化
//(想spider节点注册监视器,监视器需要重复注册)
chiList = client.getChildren().usingWatcher(this).forPath("/spider");
} catch (Exception e) {
e.printStackTrace();
}
}
public void process(WatchedEvent event) {
try {
//重复注册监视器
newChiList = client.getChildren().usingWatcher(this).forPath("/spider");
for (String s : chiList) {
if( !newChiList.contains(s) ){
System.out.println("节点消失:"+s);
//给管理员发送短信,或者邮件
//发短信的话可以使用一些第三方平台 云片网
//发邮件的话使用 javamail
}
}
for (String s : chiList) {
if( !chiList.contains(s) ){
System.out.println("节点新增:"+s);
}
}
this.chiList = newChiList ;
} catch (Exception e) {
e.printStackTrace();
}
// System.out.println("节点发生变化,"+event);
}
public void start(){
//为了保证让这个方法一直运行
while (true){
;
}
}
public static void main(String[] args) {
SpiderWatcher spiderWatcher = new SpiderWatcher();
spiderWatcher.start();
}
}参考资料
http://curator.apache.org/
http://www.blogjava.net/BucketLi/archive/2010/12/21/341268.html
http://blog.csdn.net/xinguan1267/article/details/38422149