“Zookeeper:分布式事件监听”的版本间差异
跳到导航
跳到搜索
(→关于) |
|||
第152行: | 第152行: | ||
== Cache机制监听 == | == Cache机制监听 == | ||
Watcher需要反复注册比较烦琐,所以,Curator 引入了 Cache 来监听 ZooKeeper 服务器端的事件。Cache 机制对 ZooKeeper 事件监听进行了封装,能够自动处理反复注册监听。 | |||
Cache 缓存机制的实现拥有一个系列的类型: | |||
# Node Cache:'''节点缓存''',可用于ZNode节点的监听; | |||
# Path Cache:'''子节点缓存''',可用于ZNode的子节点的监听; | |||
# Tree Cache:'''树缓存'''是Path Cache的增强,不光能监听子节点,还能监听 ZNode 节点自身。 | |||
=== Node Cache === | |||
Node Cache,用于监控节点的增加,删除和更新。 | |||
* note:如果在监听的时候 NodeCache 监听的节点为空(也就是说 ZNode 路径不存在),也是可以的。之后,如果创建了对应的节点,也是会触发事件从而回调 nodeChanged 方法。 | |||
使用方法: | |||
# 构造一个 '''NodeCache''' 缓存实例。其构造方法有两种: | |||
#: <syntaxhighlight lang="Java" highlight=""> | |||
// client:Curator的框架客户端 | |||
// path:监听节点的路径 | |||
NodeCache(CuratorFramework client, String path) | |||
// client:Curator的框架客户端 | |||
// path:监听节点的路径 | |||
// dataIsCompressed:是否对数据进行压缩 | |||
NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) | |||
</syntaxhighlight> | |||
# 构造一个 '''NodeCacheListener''' 监听器实例: | |||
#: <syntaxhighlight lang="Java" highlight=""> | |||
NodeCacheListenerlistener = new NodeCacheListener() { | |||
@Override | |||
public void nodeChanged() throws Exception { | |||
ChildDatachildData = nodeCache.getCurrentData(); | |||
log.info("ZNode节点状态改变, path={}", childData.getPath()); | |||
log.info("ZNode节点状态改变, data={}", | |||
new String(childData.getData(),"Utf-8")); | |||
log.info("ZNode节点状态改变, stat={}", childData.getStat()); | |||
} | |||
}; | |||
</syntaxhighlight> | |||
#* NodeCacheListener 接口的定义如下: | |||
#*: <syntaxhighlight lang="Java" highlight=""> | |||
// 只定义了一个简单的方法 nodeChanged,当节点变化时,这个方法就会被回调。 | |||
package org.apache.curator.framework.recipes.cache; | |||
public interface NodeCacheListener { | |||
void nodeChanged() throws Exception; | |||
} | |||
</syntaxhighlight> | |||
# 将 NodeCacheListener 实例'''注册'''到 NodeCache 缓存实例,使用缓存实例的“addListener”方法: | |||
#: <syntaxhighlight lang="Java" highlight=""> | |||
//启动节点的事件监听 | |||
nodeCache.getListenable().addListener(listener); | |||
</syntaxhighlight> | |||
# 使用缓存实例 nodeCache 的“start”方法启动节点的事件监听。该方法有两种方式: | |||
#: <syntaxhighlight lang="Java" highlight=""> | |||
// 启动节点 | |||
void start() | |||
// 启动节点 | |||
// buildInitial 代表着是否将该节点的数据立即进行缓存。(true:在start启动时立即调用NodeCache的getCurrentData方法,就能够得到对应节点的信息ChildData类) | |||
void start(boolean buildInitial) | |||
</syntaxhighlight> | |||
示例: | |||
<syntaxhighlight lang="Java" highlight=""> | |||
/** | |||
* 客户端监听实践 | |||
**/ | |||
@Slf4j | |||
@Data | |||
public class ZkWatcherDemo { | |||
private String workerPath = "/test/listener/remoteNode"; | |||
private String subWorkerPath = "/test/listener/remoteNode/id-"; | |||
/** | |||
* NodeCache节点缓存的监听 | |||
*/ | |||
@Test | |||
public void testNodeCache() { | |||
//检查节点是否存在,没有则创建 | |||
booleanisExist = ZKclient.instance.isNodeExist(workerPath); | |||
if (!isExist) { | |||
ZKclient.instance.createNode(workerPath, null); | |||
} | |||
CuratorFramework client = ZKclient.instance.getClient(); | |||
try { | |||
//构造 NodeCache 缓存实例 | |||
NodeCache nodeCache = new NodeCache(client, workerPath, false); | |||
//构造 NodeCacheListener 监听实例 | |||
NodeCacheListener listener = new NodeCacheListener() { | |||
@Override | |||
public void nodeChanged() throws Exception { | |||
ChildDatachildData = nodeCache.getCurrentData(); | |||
log.info("ZNode节点状态改变, path={}", childData.getPath()); | |||
log.info("ZNode节点状态改变, data={}", new | |||
String(childData.getData(), "Utf-8")); | |||
log.info("ZNode节点状态改变, stat={}", childData.getStat()); | |||
} | |||
}; | |||
//注册并启动节点的事件监听 | |||
nodeCache.getListenable().addListener(listener); | |||
nodeCache.start(); | |||
// 第1次变更节点数据 | |||
client.setData().forPath(workerPath, "第1次更改内容".getBytes()); | |||
Thread.sleep(1000); | |||
// 第2次变更节点数据 | |||
client.setData().forPath(workerPath, "第2次更改内容".getBytes()); | |||
Thread.sleep(1000); | |||
// 第3次变更节点数据 | |||
client.setData().forPath(workerPath, "第3次更改内容".getBytes()); | |||
Thread.sleep(1000); | |||
// 第4次变更节点数据 | |||
client.delete().forPath(workerPath); | |||
Thread.sleep(Integer.MAX_VALUE); | |||
} catch (Exception e) { | |||
log.error("创建NodeCache监听失败, path={}", workerPath); | |||
} | |||
} | |||
} | |||
</syntaxhighlight> | |||
=== Path Cache === | |||
=== Tree Cache === |
2021年9月30日 (四) 17:08的版本
关于
实现对ZooKeeper服务器端的事件监听,是客户端操作服务器的一项重点工作。在 Curator 的API中,事件监听有两种模式:
- 一种是标准的观察者模式:通过Watcher监听器实现的;
- 另一种是缓存监听模式:通过引入了一种“本地缓存视图”Cache机制去实现的。
二者的最大的不同在于:
- Cache机制提供了反复注册的能力,而观察模式的Watcher监听器只能监听一次。
- 在类型上,Watcher监听器比较简单,只有一种;Cache事件监听的种类有3种:Path Cache,Node Cache 和 Tree Cache。
Cache机制监听,可以理解为一个本地缓存视图与远程 ZooKeeper 视图的对比过程。 简单来说,Cache 在客户端缓存了 ZNode 的各种状态,当感知到 Zk 集群的 ZNode 状态变化时,会触发事件,注册的监听器会处理这些事件。 虽然,Cache 是一种缓存机制,但是可以借助 Cache 实现实际的监听。
Watcher监听器
在 ZooKeeper 中,Watcher 监听器的使用过程:
- 定义一个 Watcher 接口的实例:使用 Watcher 接口的事件回调方法:“process(WatchedEvent event)”定义回调处理逻辑。如下:
//演示:定义一个监听器 Watcher w = new Watcher() { @Override public void process(WatchedEventwatchedEvent) { log.info("监听器watchedEvent:" + watchedEvent); } };
- 通过“实现了Watchable<T>接口”的构造者的“usingWatcher(Watcher w)”方法,为构造者设置 Watcher 监听器实例。如下:
//为GetDataBuilder实例设置监听器 byte[] content = client.getData().usingWatcher(w).forPath(workerPath);
- 可以分别通过 getData()、exists()、getChildren() 方法去设置监听器。
- 一个 Watcher 监听器在向服务器端完成注册后,当服务器端的一些事件触发了这个 Watcher,就会向注册过的客户端会话发送一个事件通知来实现分布式的通知功能。
- 在 Curator 客户端收到服务器端的通知后,会封装一个通知事件“WatchedEvent”的实例,再传递给监听器的“process(WatchedEvent e)”回调方法。
示例:利用 Watcher 来对节点事件进行监听
package com.crazymakercircle.zk.publishSubscribe; import com.crazymakercircle.zk.ZKclient; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.*; import org.apache.ZooKeeper.WatchedEvent; import org.apache.ZooKeeper.Watcher; import org.junit.Test; import java.io.UnsupportedEncodingException; /** * 客户端监听实践 **/ @Slf4j @Data public class ZkWatcherDemo { private String workerPath = "/test/listener/remoteNode"; private String subWorkerPath = "/test/listener/remoteNode/id-"; // 利用Watcher来对节点进行监听操作 @Test public void testWatcher() { CuratorFramework client = ZKclient.instance.getClient(); // 检查节点是否存在,没有则创建 boolean isExist = ZKclient.instance.isNodeExist(workerPath); if (!isExist) { ZKclient.instance.createNode(workerPath, null); } try { Watcher w = new Watcher() { @Override public void process(WatchedEventwatchedEvent) { System.out.println("监听到的变化 watchedEvent = " + watchedEvent); } }; byte[] content = client.getData().usingWatcher(w).forPath(workerPath); log.info("监听节点内容:" + new String(content)); // 第一次变更节点数据 client.setData().forPath(workerPath, "第1次更改内容".getBytes()); // 第二次变更节点数据 client.setData().forPath(workerPath, "第2次更改内容".getBytes()); Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
结果:
//….省略其他的输出 监听到的变化 watchedEvent = WatchedEventstate:SyncConnectedtype:NodeDataChanged path:/test/listener/node
- 如上:虽然两次调用 setData 方法改变节点内容,但是监听器仅仅监听到了一个事件。即:“监听器的注册是一次性的”,当第二次改变节点内容时,注册已经失效,无法再次捕获节点的变动事件。
所以,Watcher 监听器不适用于节点的数据频繁变动或者节点频繁变动这样的业务场景,而是适用于一些特殊的、变动不频繁的场景,例如“会话超时”、“授权失败”等这样的特殊场景。
关于“Watcher”、“Watchable<T>”、“WatchedEvent”、“WatcherEvent”
Watcher接口:用于表示一个标准的事件处理器,用来定义收到事件通知后相关的回调处理逻辑。
- 接口中包含两个内部枚举类:“KeeperState”(“通知状态”)和“EventType”(“事件类型”)。
Watchable<T>接口:定义了 usingWatcher(Watcher w) 用于为实现了此接口的构造者设置监视器。
- 实现了 Watchable<T> 接口的构造者,如:GetDataBuilder、GetChildrenBuilder 和 ExistsBuilder。(分别对应了前一节所述三个方法)
- 在 Curator 中,Watchable<T> 接口的源代码如下:
package org.apache.curator.framework.api; import org.apache.ZooKeeper.Watcher; public interface Watchable<T> { T watched(); T usingWatcher(Watcher w); T usingWatcher(CuratorWatcher cw); }
WatchedEvent通知事件:Curator 客户端将服务器端的通知(WatcherEvent)封装为一个“WatchedEvent”实例,再传递给监听器的“process(WatchedEvent e)”回调方法。
- WatchedEvent包含了三个基本属性:
- 通知状态(keeperState)
- 事件类型(EventType)
- 节点路径(path)
- WatchedEvent 中所用到的通知状态和事件类型定义在 Watcher 接口中。(如上图)
- 关于“WatchedEvent”与“WatcherEvent”:
WatcherEvent:从 ZooKeeper 服务器端直接通过网络传输传递过来的通知事件实例的类型; WatchedEvent:Curator 将服务器端的通知事件封装后的事件实例的类型;【WatchedEvent 类型没有实现序列化接口 java.io.Serializable,因此不能用于网络传输】 WatcherEvent 传输实例 和 Curator的 WatchedEvent 封装实例,在名称上基本上一样,只有一个字母之差,而且功能也是一样的,都表示的是同一个服务器端事件。
Cache机制监听
Watcher需要反复注册比较烦琐,所以,Curator 引入了 Cache 来监听 ZooKeeper 服务器端的事件。Cache 机制对 ZooKeeper 事件监听进行了封装,能够自动处理反复注册监听。
Cache 缓存机制的实现拥有一个系列的类型:
- Node Cache:节点缓存,可用于ZNode节点的监听;
- Path Cache:子节点缓存,可用于ZNode的子节点的监听;
- Tree Cache:树缓存是Path Cache的增强,不光能监听子节点,还能监听 ZNode 节点自身。
Node Cache
Node Cache,用于监控节点的增加,删除和更新。
- note:如果在监听的时候 NodeCache 监听的节点为空(也就是说 ZNode 路径不存在),也是可以的。之后,如果创建了对应的节点,也是会触发事件从而回调 nodeChanged 方法。
使用方法:
- 构造一个 NodeCache 缓存实例。其构造方法有两种:
// client:Curator的框架客户端 // path:监听节点的路径 NodeCache(CuratorFramework client, String path) // client:Curator的框架客户端 // path:监听节点的路径 // dataIsCompressed:是否对数据进行压缩 NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
- 构造一个 NodeCacheListener 监听器实例:
NodeCacheListenerlistener = new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildDatachildData = nodeCache.getCurrentData(); log.info("ZNode节点状态改变, path={}", childData.getPath()); log.info("ZNode节点状态改变, data={}", new String(childData.getData(),"Utf-8")); log.info("ZNode节点状态改变, stat={}", childData.getStat()); } };
- NodeCacheListener 接口的定义如下:
// 只定义了一个简单的方法 nodeChanged,当节点变化时,这个方法就会被回调。 package org.apache.curator.framework.recipes.cache; public interface NodeCacheListener { void nodeChanged() throws Exception; }
- 将 NodeCacheListener 实例注册到 NodeCache 缓存实例,使用缓存实例的“addListener”方法:
//启动节点的事件监听 nodeCache.getListenable().addListener(listener);
- 使用缓存实例 nodeCache 的“start”方法启动节点的事件监听。该方法有两种方式:
// 启动节点 void start() // 启动节点 // buildInitial 代表着是否将该节点的数据立即进行缓存。(true:在start启动时立即调用NodeCache的getCurrentData方法,就能够得到对应节点的信息ChildData类) void start(boolean buildInitial)
示例:
/**
* 客户端监听实践
**/
@Slf4j
@Data
public class ZkWatcherDemo {
private String workerPath = "/test/listener/remoteNode";
private String subWorkerPath = "/test/listener/remoteNode/id-";
/**
* NodeCache节点缓存的监听
*/
@Test
public void testNodeCache() {
//检查节点是否存在,没有则创建
booleanisExist = ZKclient.instance.isNodeExist(workerPath);
if (!isExist) {
ZKclient.instance.createNode(workerPath, null);
}
CuratorFramework client = ZKclient.instance.getClient();
try {
//构造 NodeCache 缓存实例
NodeCache nodeCache = new NodeCache(client, workerPath, false);
//构造 NodeCacheListener 监听实例
NodeCacheListener listener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildDatachildData = nodeCache.getCurrentData();
log.info("ZNode节点状态改变, path={}", childData.getPath());
log.info("ZNode节点状态改变, data={}", new
String(childData.getData(), "Utf-8"));
log.info("ZNode节点状态改变, stat={}", childData.getStat());
}
};
//注册并启动节点的事件监听
nodeCache.getListenable().addListener(listener);
nodeCache.start();
// 第1次变更节点数据
client.setData().forPath(workerPath, "第1次更改内容".getBytes());
Thread.sleep(1000);
// 第2次变更节点数据
client.setData().forPath(workerPath, "第2次更改内容".getBytes());
Thread.sleep(1000);
// 第3次变更节点数据
client.setData().forPath(workerPath, "第3次更改内容".getBytes());
Thread.sleep(1000);
// 第4次变更节点数据
client.delete().forPath(workerPath);
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
log.error("创建NodeCache监听失败, path={}", workerPath);
}
}
}