Zookeeper中的分布式协调 Watch机制和异步回调在分布式协调中的作用 ZooKeeper是一个为分布式应用设计的高性能协调服务,它通过提供一致性的数据注册、配置管理、命名服务、分布式同步等功能,帮助分布式系统中的各个组件协同工作。ZooKeeper中的watch机制和异步回调是实现分布式协调的两个关键特性。
Watch机制 Watch(观察者)机制允许客户端在ZooKeeper的节点上设置观察点。当这些节点上的数据变化或节点本身发生变化时(例如,节点创建、删除),设置了观察点的客户端会收到一个一次性的通知。这个机制非常适合用于实现配置更新通知、集群成员变化通知等场景。
使用Watch的步骤通常如下:
设置观察点 :客户端在读取特定节点的数据或检查节点存在性时设置观察点。
接收通知 :一旦被观察的节点发生了变化(数据变更、节点创建或删除等),客户端会收到一个通知。
采取行动 :客户端在收到通知后,可以根据需要重新读取数据、更新本地状态或执行其他协调操作。
异步回调 异步回调机制允许客户端在不阻塞当前线程的情况下执行ZooKeeper操作。客户端可以发起一个异步操作,并提供一个回调对象。一旦操作完成,无论是成功还是失败,ZooKeeper都会调用这个回调对象,传递操作结果。这种机制非常适合于需要高性能和高并发处理的场景。
使用异步回调的步骤通常如下:
发起异步操作 :客户端发起一个异步ZooKeeper操作,并提供一个实现了特定AsyncCallback
接口的回调对象。
处理回调 :操作完成后,ZooKeeper调用相应的回调方法,传递操作结果和客户端提供的上下文信息(如果有的话)。
执行后续逻辑 :在回调方法中,根据操作的结果,客户端可以执行后续逻辑,如更新内部状态、发起新的ZooKeeper操作等。
结合使用Watch和回调进行分布式协调 在分布式系统中,结合使用Watch机制和异步回调可以有效地实现复杂的协调逻辑。例如,可以使用Watch机制来监控集群配置的变化,当配置发生变化时,使用异步操作读取新的配置信息,并通过回调方法更新应用状态。
这种模式不仅能够保证应用能及时响应配置变化,还能通过异步处理避免阻塞关键线程,从而提高系统的整体性能和响应能力。通过这种方式,ZooKeeper帮助分布式系统实现高效、可靠的协调。
实现一个简单的分布式协调 下面通过watch和回调机制来实现一个简单的分布式配置,也就是我们每个客户端都会向我们zookeeper服务器订阅也就是watch一个配置,其中配置文件我们使用MyConf类来表示,我们使用的是简单的字符串,后期这个类代表具体的配置。我们实时订阅配置信息,只要配置信息更改或删除,我们客户端可以快速响应。
首先我们的程序逻辑抽象出了两个类,分别是MyWatcherAndCallBack
和DefaultWatch
。我们接下来会重点讲解为什么需要抽象MyWatcherAndCallBack
类。
DefaultWatch
这个类的存在很简单,其实就是把我们初始连接zookeeper服务器的watcher抽象出来了,在这个项目中并不会使用。
MyWatcherAndCallBack
我们二话不说先上代码,可以看到这个类实现了很多功能,包括实现的接口功能和我们封装的一些逻辑操作。我们目前只需要关注实现的接口功能,分别是Watcher
, AsyncCallback.StatCallback
,AsyncCallback.DataCallback
这三个类的实现方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 public class MyWatcherAndCallBack implements Watcher , AsyncCallback.StatCallback,AsyncCallback.DataCallback { ZooKeeper zk; CountDownLatch countDownLatch; MyConf conf; public MyConf getConf () { return conf; } public void setConf (MyConf conf) { this .conf = conf; } public CountDownLatch getCountDownLatch () { return countDownLatch; } public void setCountDownLatch (CountDownLatch countDownLatch) { this .countDownLatch = countDownLatch; } public ZooKeeper getZk () { return zk; } public void setZk (ZooKeeper zk) { this .zk = zk; } @Override public void process (WatchedEvent watchedEvent) { Event.EventType type = watchedEvent.getType(); switch (type) { case None: break ; case NodeCreated: System.out.println("新建了配置,即将更新配置" ); zk.getData("/config" ,this ,this ,"abc" ); break ; case NodeDeleted: System.out.println("远程配置被删除,请耐心等待" ); this .conf.setConf("" ); this .countDownLatch = new CountDownLatch (1 ); break ; case NodeDataChanged: System.out.println("远程配置更新" ); zk.getData("/config" ,this ,this ,"abc" ); break ; case NodeChildrenChanged: break ; case DataWatchRemoved: break ; case ChildWatchRemoved: break ; case PersistentWatchRemoved: break ; } } @Override public void processResult (int i, String s, Object o, byte [] bytes, Stat stat) { if (bytes != null ){ conf.setConf(new String (bytes)); countDownLatch.countDown(); } } @Override public void processResult (int i, String s, Object o, Stat stat) { if (stat != null ){ zk.getData("/config" ,this ,this ,"abc" ); } } public void await () throws InterruptedException { zk.exists("/config" ,this ,this ,"abc" ); countDownLatch.await(); } }
Watcher
, AsyncCallback.StatCallback
,AsyncCallback.DataCallback
这三个类的实现方法分别是两个异步回调方法,和一个watch。其中我们如果是自己不封装这个类的话,我们单独写代码逻辑,那就是如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 zk.exists("/path" , new Watcher () { @Override public void process (WatchedEvent watchedEvent) { } }, new AsyncCallback .StatCallback() { @Override public void processResult (int i, String s, Object o, Stat stat) { if (stat != null ){ byte [] data = zk.getData("/path" , new Watcher () { @Override public void process (WatchedEvent watchedEvent) { } }, new DataCallback () { @Override public void processResult (int i, String s, Object o, byte [] bytes, Stat stat) { } }); } } },"abc" );
这还没有实现全部的逻辑,我们就需要疯狂的嵌套watch和回调函数,所以我们的MyWatcherAndCallBack
类就是解决这个疯狂嵌套问题的。
理解了为什么有这个类之后我们就开始整理代码逻辑了,同样,我还是会先把所有代码放上来。
TestConf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class TestConf { ZooKeeper zk; CountDownLatch countDownLatch = new CountDownLatch (1 ); MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack (); @Before public void getzk () { zk = ZkUtils.getZk(); } @Test public void test () throws Exception{ watcherAndCallBack.setConf(new MyConf ()); watcherAndCallBack.setZk(zk); watcherAndCallBack.setCountDownLatch(countDownLatch); watcherAndCallBack.await(); while (true ){ if (watcherAndCallBack.conf.getConf().equals("" )){ try { watcherAndCallBack.await(); } catch (InterruptedException e) { throw new RuntimeException (e); } } System.out.println("配置:" +watcherAndCallBack.conf.getConf()); Thread.sleep(2000 ); } } @After public void releaseZk () { try { zk.close(); }catch (Exception e){ e.printStackTrace(); } } }
ZkUtils
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class ZkUtils { public static final String con = "192.168.93.133:2181,192.168.93.129:2181,192.168.93.132:2181/disCon" ; public static Watcher dw = new DefaultWatch (); public static ZooKeeper zk; public static ZooKeeper getZk () { try { zk = new ZooKeeper (con,3000 ,dw); }catch (Exception e){ System.out.println("新建连接失败,错误内容如下:" ); e.printStackTrace(); } return zk; } }
MyConf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class MyConf { String conf; public String getConf () { return conf; } public void setConf (String conf) { this .conf = conf; } }
DefaultWatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class DefaultWatch implements Watcher { @Override public void process (WatchedEvent watchedEvent) { Event.KeeperState state = watchedEvent.getState(); switch (state) { case Unknown: break ; case Disconnected: System.out.println("zookeeper 客户端即将关闭连接" ); break ; case NoSyncConnected: break ; case SyncConnected: System.out.println("zookeeper 客户端新建连接" ); break ; case AuthFailed: break ; case ConnectedReadOnly: break ; case SaslAuthenticated: break ; case Expired: break ; case Closed: break ; } } }
逻辑 在这里分为几种情况,简单介绍一下:
开始没有配置,我们进入exits
回调之后发现没有,就会进入await
等待,直到这个节点被创建,我们通过watch来进行getdata
和cutdown唤醒。
有配置,被修改了,有配置的时候我们是回调函数会进行修改配置,一旦被修改了,我们watch也会监听到,并且再次get修改数据。
有配置,被删除了。这个被删除的情况也会被watch反馈,我们新建countdown然后进入等待。