Zookeeper中的分布式协调

Watch机制和异步回调在分布式协调中的作用

ZooKeeper是一个为分布式应用设计的高性能协调服务,它通过提供一致性的数据注册、配置管理、命名服务、分布式同步等功能,帮助分布式系统中的各个组件协同工作。ZooKeeper中的watch机制和异步回调是实现分布式协调的两个关键特性。

Watch机制

Watch(观察者)机制允许客户端在ZooKeeper的节点上设置观察点。当这些节点上的数据变化或节点本身发生变化时(例如,节点创建、删除),设置了观察点的客户端会收到一个一次性的通知。这个机制非常适合用于实现配置更新通知、集群成员变化通知等场景。

使用Watch的步骤通常如下:

  1. 设置观察点:客户端在读取特定节点的数据或检查节点存在性时设置观察点。
  2. 接收通知:一旦被观察的节点发生了变化(数据变更、节点创建或删除等),客户端会收到一个通知。
  3. 采取行动:客户端在收到通知后,可以根据需要重新读取数据、更新本地状态或执行其他协调操作。

异步回调

异步回调机制允许客户端在不阻塞当前线程的情况下执行ZooKeeper操作。客户端可以发起一个异步操作,并提供一个回调对象。一旦操作完成,无论是成功还是失败,ZooKeeper都会调用这个回调对象,传递操作结果。这种机制非常适合于需要高性能和高并发处理的场景。

使用异步回调的步骤通常如下:

  1. 发起异步操作:客户端发起一个异步ZooKeeper操作,并提供一个实现了特定AsyncCallback接口的回调对象。
  2. 处理回调:操作完成后,ZooKeeper调用相应的回调方法,传递操作结果和客户端提供的上下文信息(如果有的话)。
  3. 执行后续逻辑:在回调方法中,根据操作的结果,客户端可以执行后续逻辑,如更新内部状态、发起新的ZooKeeper操作等。

结合使用Watch和回调进行分布式协调

在分布式系统中,结合使用Watch机制和异步回调可以有效地实现复杂的协调逻辑。例如,可以使用Watch机制来监控集群配置的变化,当配置发生变化时,使用异步操作读取新的配置信息,并通过回调方法更新应用状态。

这种模式不仅能够保证应用能及时响应配置变化,还能通过异步处理避免阻塞关键线程,从而提高系统的整体性能和响应能力。通过这种方式,ZooKeeper帮助分布式系统实现高效、可靠的协调。

实现一个简单的分布式协调

下面通过watch和回调机制来实现一个简单的分布式配置,也就是我们每个客户端都会向我们zookeeper服务器订阅也就是watch一个配置,其中配置文件我们使用MyConf类来表示,我们使用的是简单的字符串,后期这个类代表具体的配置。我们实时订阅配置信息,只要配置信息更改或删除,我们客户端可以快速响应。

首先我们的程序逻辑抽象出了两个类,分别是MyWatcherAndCallBackDefaultWatch。我们接下来会重点讲解为什么需要抽象MyWatcherAndCallBack类。

DefaultWatch

这个类的存在很简单,其实就是把我们初始连接zookeeper服务器的watcher抽象出来了,在这个项目中并不会使用。

MyWatcherAndCallBack

我们二话不说先上代码,可以看到这个类实现了很多功能,包括实现的接口功能和我们封装的一些逻辑操作。我们目前只需要关注实现的接口功能,分别是Watcher, AsyncCallback.StatCallback,AsyncCallback.DataCallback这三个类的实现方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* @author roxanne_waar
* @date 2024/3/26 18:36
* @description 自定义的watch和callback 解决了多层的嵌套问题
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback,AsyncCallback.DataCallback {
ZooKeeper zk;
CountDownLatch countDownLatch;
MyConf conf;

public MyConf getConf() {
return conf;
}

public void setConf(MyConf conf) {
this.conf = conf;
}

public CountDownLatch getCountDownLatch() {
return countDownLatch;
}

public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

public ZooKeeper getZk() {
return zk;
}

public void setZk(ZooKeeper zk) {
this.zk = zk;
}

@Override
public void process(WatchedEvent watchedEvent) {
Event.EventType type = watchedEvent.getType();
switch (type) {
case None:
break;
case NodeCreated:
//说明从无到有
System.out.println("新建了配置,即将更新配置");
zk.getData("/config",this,this,"abc");
break;
case NodeDeleted:
//如果配置被删除了 我们需要重新等待
System.out.println("远程配置被删除,请耐心等待");
this.conf.setConf("");
this.countDownLatch = new CountDownLatch(1);

break;
case NodeDataChanged:
//如果配置更新了 我们也要更细你配制
System.out.println("远程配置更新");
zk.getData("/config",this,this,"abc");
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}

@Override
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
if(bytes != null){
conf.setConf(new String(bytes));
countDownLatch.countDown();
}

}

@Override
public void processResult(int i, String s, Object o, Stat stat) {
if(stat != null){
zk.getData("/config",this,this,"abc");
}
}

public void await() throws InterruptedException {
zk.exists("/config",this,this,"abc");
countDownLatch.await();
}
}

Watcher, AsyncCallback.StatCallback,AsyncCallback.DataCallback这三个类的实现方法分别是两个异步回调方法,和一个watch。其中我们如果是自己不封装这个类的话,我们单独写代码逻辑,那就是如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
zk.exists("/path", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 如果不存在之后的处理

}
}, new AsyncCallback.StatCallback() {
@Override
public void processResult(int i, String s, Object o, Stat stat) {
if(stat != null){
//如果存在我们就需要get
byte[] data = zk.getData("/path", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//判断逻辑 如果修改了或者删除会怎么办?
}
}, new DataCallback() {

@Override
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
// 如果我们获得了需要更新配置
}
});
}
}
},"abc");

这还没有实现全部的逻辑,我们就需要疯狂的嵌套watch和回调函数,所以我们的MyWatcherAndCallBack类就是解决这个疯狂嵌套问题的。

理解了为什么有这个类之后我们就开始整理代码逻辑了,同样,我还是会先把所有代码放上来。

TestConf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* @author roxanne_waar
* @date 2024/3/26 17:44
* @description 测试主程序
*/

public class TestConf {
ZooKeeper zk;

CountDownLatch countDownLatch = new CountDownLatch(1);

MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
@Before
public void getzk(){
zk = ZkUtils.getZk();
}

@Test
public void test()throws Exception{
watcherAndCallBack.setConf(new MyConf());
watcherAndCallBack.setZk(zk);
watcherAndCallBack.setCountDownLatch(countDownLatch);

watcherAndCallBack.await();
while(true){
if(watcherAndCallBack.conf.getConf().equals("")){
try {
watcherAndCallBack.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("配置:"+watcherAndCallBack.conf.getConf());
Thread.sleep(2000);

}
}

@After
public void releaseZk(){
try {
zk.close();
}catch (Exception e){
e.printStackTrace();
}
}
}

ZkUtils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* @author roxanne_waar
* @date 2024/3/26 17:26
* @description zookeeper工具类 用来创建zookeeper对象
*/
public class ZkUtils {

// 默认在根目录下disCon进行操作
public static final String con = "192.168.93.133:2181,192.168.93.129:2181,192.168.93.132:2181/disCon";

public static Watcher dw = new DefaultWatch();
public static ZooKeeper zk;


public static ZooKeeper getZk(){
try{
zk = new ZooKeeper(con,3000,dw);
}catch (Exception e){
System.out.println("新建连接失败,错误内容如下:");
e.printStackTrace();
}
return zk;
}
}

MyConf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author roxanne_waar
* @date 2024/3/26 18:44
* @description 自定义配置
*/
public class MyConf {
String conf;

public String getConf() {
return conf;
}

public void setConf(String conf) {
this.conf = conf;
}
}

DefaultWatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* @author roxanne_waar
* @date 2024/3/26 17:31
* @description 默认新建连接的时候的监听器
*/
public class DefaultWatch implements Watcher {
@Override
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState();
switch (state) {
case Unknown:
break;
case Disconnected:
System.out.println("zookeeper 客户端即将关闭连接");
break;
case NoSyncConnected:
break;
case SyncConnected:
System.out.println("zookeeper 客户端新建连接");
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
}
}

逻辑

在这里分为几种情况,简单介绍一下:

  1. 开始没有配置,我们进入exits回调之后发现没有,就会进入await等待,直到这个节点被创建,我们通过watch来进行getdata和cutdown唤醒。
  2. 有配置,被修改了,有配置的时候我们是回调函数会进行修改配置,一旦被修改了,我们watch也会监听到,并且再次get修改数据。
  3. 有配置,被删除了。这个被删除的情况也会被watch反馈,我们新建countdown然后进入等待。