“Zookeeper:分布式事件监听”的版本间差异

来自Wikioe
跳到导航 跳到搜索
 
(未显示同一用户的15个中间版本)
第23行: 第23行:
== Watcher监听器 ==
== Watcher监听器 ==
在 ZooKeeper 中,Watcher 监听器的使用过程:
在 ZooKeeper 中,Watcher 监听器的使用过程:
# 定义一个 '''Watcher''' 接口的实例:使用 Watcher 接口的事件回调方法:“process(WatchedEvent event)”定义回调处理逻辑。如下:
# 定义一个 '''Watcher''' 接口的实例:使用 Watcher 接口的事件回调方法:“'''process(WatchedEvent event)'''”定义回调处理逻辑。如下:
#: <syntaxhighlight lang="Java" highlight="">
#: <syntaxhighlight lang="Java" highlight="">
   //演示:定义一个监听器
   //演示:定义一个监听器
第38行: 第38行:
   byte[] content = client.getData().usingWatcher(w).forPath(workerPath);
   byte[] content = client.getData().usingWatcher(w).forPath(workerPath);
</syntaxhighlight>
</syntaxhighlight>
#* 可以分别通过 '''getData()'''、'''exists()'''、'''getChildren()''' 方法去设置监听器。
#* 可以分别通过 '''getData()'''、'''exists()'''、'''getChildren()''' 方法(对应的构造器“GetDataBuilder”、“GetChildrenBuilder”和“ExistsBuilder”实现了“Watchable<T>”接口)去设置监听器。
# 一个 Watcher 监听器在向服务器端完成注册后,当服务器端的一些事件触发了这个 Watcher,就会向注册过的客户端会话发送一个事件通知来实现分布式的通知功能。
# 一个 Watcher 监听器在向服务器端完成注册后,当服务器端的一些事件触发了这个 Watcher,就会向注册过的客户端会话发送一个事件通知来实现分布式的通知功能。
#* 在 Curator 客户端收到服务器端的通知后,会封装一个通知事件“'''WatchedEvent'''”的实例,再传递给监听器的“process(WatchedEvent e)”回调方法。
#* 在 Curator 客户端收到服务器端的通知后,会封装一个通知事件“'''WatchedEvent'''”的实例,再传递给监听器的“process(WatchedEvent e)”回调方法。
第123行: 第123行:




'''Watchable<T>'''接口:定义了 usingWatcher(Watcher w) 用于为实现了此接口的构造者设置监视器。
'''Watchable<T>'''接口:定义了“'''usingWatcher(Watcher w)'''”用于为实现了此接口的构造者设置监视器。
* 实现了 Watchable<T> 接口的构造者,如:'''GetDataBuilder'''、'''GetChildrenBuilder''' 和 '''ExistsBuilder'''。(分别对应了前一节所述三个方法)
* 实现了 Watchable<T> 接口的构造者,如:'''GetDataBuilder'''、'''GetChildrenBuilder''' 和 '''ExistsBuilder'''。(分别对应了前一节所述三个方法)
* 在 Curator 中,Watchable<T> 接口的源代码如下:
* 在 Curator 中,Watchable<T> 接口的源代码如下:
第150行: 第150行:
WatcherEvent 传输实例 和 Curator的 WatchedEvent 封装实例,在名称上基本上一样,只有一个字母之差,而且功能也是一样的,都表示的是同一个服务器端事件。
WatcherEvent 传输实例 和 Curator的 WatchedEvent 封装实例,在名称上基本上一样,只有一个字母之差,而且功能也是一样的,都表示的是同一个服务器端事件。
</pre>
</pre>
=== watcher 原理 ===
见:“'''[[Zookeeper:watcher 事件机制原理]]'''”


== Cache机制监听 ==
== Cache机制监听 ==
Watcher需要反复注册比较烦琐,所以,'''Curator''' 引入了 Cache 来监听 ZooKeeper 服务器端的事件。Cache 机制对 ZooKeeper 事件监听进行了封装,能够自动处理反复注册监听。
Cache 缓存机制的实现拥有一个系列的类型:
# Node Cache:'''节点缓存''',可用于ZNode节点的监听;
# Path Cache:'''子节点缓存''',可用于ZNode的子节点的监听;
# Tree Cache:'''树缓存'''是Path Cache的增强,不光能监听子节点,还能监听 ZNode 节点自身。
<pre>
TreeCacheEvent 的事件类型:
    “NODE_ADDED:对应于节点的增加。
    “NODE_UPDATED:对应于节点的修改。
    “NODE_REMOVED:对应于节点的删除。
   
PathChildrenCacheEvent 的事件类型:
   “CHILD_ADDED”:对应于子节点的增加。
    “CHILD_UPDATED”:对应子于节点的修改。
    “CHILD_REMOVED”:对应子于节点的删除。
</pre>
=== Node Cache(NodeCache) ===
----
Node Cache,用于监控节点的增加,删除和更新。
* note:如果在监听的时候 NodeCache 监听的节点为空(也就是说 ZNode 路径不存在),也是可以的。之后,如果创建了对应的节点,也是会触发事件从而回调 nodeChanged 方法。
使用方法:
# 构造一个 '''NodeCache''' 缓存实例。其构造方法有两种:
#: <syntaxhighlight lang="Java" highlight="">
// client:Curator的框架客户端
// path:监听节点的路径
NodeCache(CuratorFramework client, String path)
// client:Curator的框架客户端
// path:监听节点的路径
// dataIsCompressed:是否对数据进行压缩
NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
</syntaxhighlight>
# 构造一个 '''NodeCacheListener''' 监听器实例:
#: <syntaxhighlight lang="Java" highlight="">
NodeCacheListenerlistener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData childData = nodeCache.getCurrentData();
log.info("ZNode节点状态改变, path={}", childData.getPath());
log.info("ZNode节点状态改变, data={}",
new String(childData.getData(),"Utf-8"));
log.info("ZNode节点状态改变, stat={}", childData.getStat());
}
};
</syntaxhighlight>
#* NodeCacheListener 接口的定义如下:(只定义了一个简单的方法 '''nodeChanged''',当节点变化时,这个方法就会被回调)
#*: <syntaxhighlight lang="Java" highlight="">
package org.apache.curator.framework.recipes.cache;
public interface NodeCacheListener {
  void nodeChanged() throws Exception;
}
</syntaxhighlight>
# 将 NodeCacheListener 实例'''注册'''到 NodeCache 缓存实例,使用缓存实例的“addListener”方法:
#: <syntaxhighlight lang="Java" highlight="">
//启动节点的事件监听
nodeCache.getListenable().addListener(listener);
</syntaxhighlight>
# 使用缓存实例 nodeCache 的“start”方法启动节点的事件监听。该方法有两种方式:
#: <syntaxhighlight lang="Java" highlight="">
// 启动节点
void start()
// 启动节点
// buildInitial 代表着是否将该节点的数据立即进行缓存。(true:在start启动时立即调用NodeCache的getCurrentData方法,就能够得到对应节点的信息ChildData类)
void start(boolean buildInitial)
</syntaxhighlight>
示例:
<syntaxhighlight lang="Java" highlight="">
package com.crazymakercircle.zk.publishSubscribe;
import com.crazymakercircle.zk.ZKclient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.ZooKeeper.WatchedEvent;
import org.apache.ZooKeeper.Watcher;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
/**
* 客户端监听实践
**/
@Slf4j
@Data
public class ZkWatcherDemo {
private String workerPath = "/test/listener/remoteNode";
private String subWorkerPath = "/test/listener/remoteNode/id-";
/**
* NodeCache节点缓存的监听
*/
@Test
public void testNodeCache() {
//检查节点是否存在,没有则创建
boolean isExist = ZKclient.instance.isNodeExist(workerPath);
if (!isExist) {
ZKclient.instance.createNode(workerPath, null);
}
CuratorFramework client = ZKclient.instance.getClient();
try {
// 构造 NodeCache 缓存实例
NodeCache nodeCache = new NodeCache(client, workerPath, false);
// 构造 NodeCacheListener 监听实例
NodeCacheListener listener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData childData = nodeCache.getCurrentData();
log.info("ZNode节点状态改变, path={}", childData.getPath());
log.info("ZNode节点状态改变, data={}", new String(childData.getData(), "Utf-8"));
log.info("ZNode节点状态改变, stat={}", childData.getStat());
}
};
// 注册节点的事件监听
nodeCache.getListenable().addListener(listener);
// 启动节点监听
nodeCache.start();
// 第1次变更节点数据
client.setData().forPath(workerPath, "第1次更改内容".getBytes());
Thread.sleep(1000);
// 第2次变更节点数据
client.setData().forPath(workerPath, "第2次更改内容".getBytes());
Thread.sleep(1000);
// 第3次变更节点数据
client.setData().forPath(workerPath, "第3次更改内容".getBytes());
Thread.sleep(1000);
// 第4次变更节点数据
client.delete().forPath(workerPath);
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
log.error("创建NodeCache监听失败, path={}", workerPath);
}
}
}
</syntaxhighlight>
结果:
<syntaxhighlight lang="xml" highlight="">
//…省略前两次的输出
- ZNode节点状态改变, path=/test/listener/node
- ZNode节点状态改变, data=第3次更改内容
- ZNode节点状态改变, stat=17179869191,
...
</syntaxhighlight>
=== Path Cache(PathChildrenCache) ===
----
PathChildrenCache 子节点缓存用于子节点的监听,监控当前节点的子节点被创建、更新或者删除:
* 只能监听子节点,监听不到当前节点。
* 不能递归监听,子节点下的子节点不能递归监控。
使用方法:
# 构造一个 '''PathChildrenCache''' 缓存实例。其构造方法有多个重载版本:
#: <syntaxhighlight lang="Java" highlight="">
//重载版本一
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
//重载版本二
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
//重载版本三
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
//重载版本四
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
</syntaxhighlight>
#* client:Curator框架的客户端;
#* path:监听节点的路径;
#* cacheData:是否把节点的内容缓存起来。如果为true,那么接收到节点列表变更事件的同时会将获得节点内容;
#* dataIsCompressed:是否对节点数据进行压缩;
#* threadFactory:示线程池工厂,当PathChildrenCache内部需要启动新的线程执行时,使用该线程池工厂来创建线程;
#* executorService:和threadFactory参数差不多,表示通过传入的线程池或者线程工厂来异步处理监听事件。
# 构造一个 '''PathChildrenCacheListener''' 监听器实例:
#: <syntaxhighlight lang="Java" highlight="">
PathChildrenCacheListener listener = new PathChildrenCacheListener(){
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
try {
ChildData data = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
log.info("子节点增加, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8"));
break;
case CHILD_UPDATED:
log.info("子节点更新, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8"));
break;
case CHILD_REMOVED:
log.info("子节点删除, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8"));
break;
default:
break;
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
};
</syntaxhighlight>
#* PathChildrenCacheListener 接口的定义如下(只定义了一个简单的方法'''childEvent''',当子节点有变化时,这个方法就会被回调):
#*: <syntaxhighlight lang="Java" highlight="">
package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.CuratorFramework;
public interface PathChildrenCacheListener {
void childEvent(CuratorFramework client, PathChildrenCacheEvent e) throws Exception;
}
</syntaxhighlight>
# 将 PathChildrenCacheListener 实例注册到 PathChildrenCache 缓存实例,使用缓存实例的“addListener”方法;
# 调用缓存实例 PathChildrenCache 的 “start” 方法,启动节点的事件监听;
#: <syntaxhighlight lang="Java" highlight="">
cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
</syntaxhighlight>
#* start 方法可以传入启动的模式,定义在“'''PathChildrenCache.StartMode'''”枚举中,具体如下:
#*# “'''BUILD_INITIAL_CACHE'''”模式:启动时同步初始化cache。(表示创建cache后就从服务器提取对应的数据)
#*# “'''POST_INITIALIZED_EVENT'''”模式:启动时异步初始化cache。(表示创建cache后从服务器提取对应的数据,完成后触发“PathChildrenCacheEvent.Type#INITIALIZED”事件,cache中Listener会收到该事件的通知;)
#*# “'''NORMAL'''”模式:启动时异步初始化cache。(完成后不会发出通知)
示例:
<syntaxhighlight lang="Java" highlight="">
package com.crazymakercircle.zk.publishSubscribe;
import com.crazymakercircle.zk.ZKclient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.ZooKeeper.WatchedEvent;
import org.apache.ZooKeeper.Watcher;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
/**
* 客户端监听实践
**/
@Slf4j
@Data
public class ZkWatcherDemo {
private String workerPath = "/test/listener/remoteNode";
private String subWorkerPath = "/test/listener/remoteNode/id-";
/**
* 子节点监听
*/
@Test
public void testPathChildrenCache() {
//检查节点是否存在,没有则创建
boolean isExist = ZKclient.instance.isNodeExist(workerPath);
if (!isExist) {
ZKclient.instance.createNode(workerPath, null);
}
CuratorFramework client = ZKclient.instance.getClient();
try {
PathChildrenCache cache = new PathChildrenCache(client, workerPath, true);
PathChildrenCacheListener listener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
try {
ChildData data = event.getData();
switch (event.getType()) {
//子节点增加
case CHILD_ADDED:
log.info("子节点增加, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8"));
break;
//子节点更新
case CHILD_UPDATED:
log.info("子节点更新, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8"));
break;
//子节点删除
case CHILD_REMOVED:
log.info("子节点删除, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8"));
break;
default:
break;
}
} catch (
UnsupportedEncodingException e) {
e.printStackTrace();
}
}
};
//增加监听器
cache.getListenable().addListener(listener);
//设置启动模式
cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
Thread.sleep(1000);
//创建3个子节点
for (int i = 0; i< 3; i++) {
ZKclient.instance.createNode(subWorkerPath + i, null);
}
Thread.sleep(1000);
//删除3个子节点
for (int i = 0; i< 3; i++) {
ZKclient.instance.deleteNode(subWorkerPath + i);
}
} catch (Exception e) {
log.error("PathCache监听失败, path=", workerPath);
}
}
}
</syntaxhighlight>
结果:
<syntaxhighlight lang="xml" highlight="">
- 子节点增加, path=/test/listener/node/id-0, data=to set content
- 子节点增加, path=/test/listener/node/id-2, data=to set content
- 子节点增加, path=/test/listener/node/id-1, data=to set content
......
- 子节点删除, path=/test/listener/node/id-2, data=to set content
- 子节点删除, path=/test/listener/node/id-0, data=to set content
- 子节点删除, path=/test/listener/node/id-1, data=to set content
</syntaxhighlight>
=== Tree Cache(TreeCache) ===
----
Tree Cache可以看作是 Node Cache 和 Path Cache 的合体。Tree Cache 不光能监听子节点,还能监听节点自身。
使用方法:
# 构造一个 '''TreeCache''' 缓存实例。其构造方法有两种:(一般情况下,使用 TreeCache 的第一个构造函数即可)
#: <syntaxhighlight lang="Java" highlight="">
//TreeCache构造器之一
TreeCache(CuratorFramework client, String path)
//TreeCache构造器之二
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)
</syntaxhighlight>
#* dataIsCompressed:表示是否对数据进行压缩;
#* maxDepth:表示缓存的层次深度,默认为整数最大值。
#* executorService:表示监听的执行线程池,默认会创建一个单一线程的线程池。
#* createParentNodes:表示是否创建父亲节点,默认为false。
# 构造一个 '''TreeCacheListener''' 监听器实例:
#: <syntaxhighlight lang="Java" highlight="">
TreeCacheListener listener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client,TreeCacheEvent event) {
try {
ChildData data = event.getData();
if (data == null) {
log.info("数据为空");
return;
}
switch (event.getType()) {
case NODE_ADDED:
log.info("[TreeCache]节点增加, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case NODE_UPDATED:
log.info("[TreeCache]节点更新, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case NODE_REMOVED:
log.info("[TreeCache]节点删除, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
default:
break;
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
};
</syntaxhighlight>
#* TreeCacheListener 接口的定义如下:(只定义了一个简单的方法'''childEvent''',当子节点有变化时,这个方法就会被回调)
#*: <syntaxhighlight lang="Java" highlight="">
package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.CuratorFramework;
public interface TreeCacheListener {
  void childEvent(CuratorFramework var1, TreeCacheEvent var2) throws Exception;
}
</syntaxhighlight>
# 将 TreeCacheListener 实例'''注册'''到 TreeCache 缓存实例,使用缓存实例的“addListener”方法;
# 调用缓存实例 TreeCache 的 “start” 方法,启动节点的事件监听;
示例:
<syntaxhighlight lang="Java" highlight="">
package com.crazymakercircle.zk.publishSubscribe;
import com.crazymakercircle.zk.ZKclient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.ZooKeeper.WatchedEvent;
import org.apache.ZooKeeper.Watcher;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
/**
* 客户端监听实践
**/
@Slf4j
@Data
public class ZkWatcherDemo {
private String workerPath = "/test/listener/remoteNode";
private String subWorkerPath = "/test/listener/remoteNode/id-";
/**
* Tree Cache不光能监听子节点,还能监听节点自身
*/
@Test
public void testTreeCache() {
//检查节点是否存在,没有则创建
boolean isExist = ZKclient.instance.isNodeExist(workerPath);
if (!isExist) {
ZKclient.instance.createNode(workerPath, null);
}
CuratorFramework client = ZKclient.instance.getClient();
try {
// 构造 TreeCache 缓存实例
TreeCache treeCache = new TreeCache(client, workerPath);
// 构造 TreeCacheListener 监听实例
TreeCacheListener listener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client,TreeCacheEvent event) {
try {
ChildData data = event.getData();
if (data == null) {
log.info("数据为空");
return;
}
switch (event.getType()) {
case NODE_ADDED:
log.info("[TreeCache]节点增加, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case NODE_UPDATED:
log.info("[TreeCache]节点更新, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case NODE_REMOVED:
log.info("[TreeCache]节点删除, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
default:
break;
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
};
// 设置监听器
treeCache.getListenable().addListener(listener);
// 启动缓存视图
treeCache.start();
Thread.sleep(1000);
// 创建3个子节点
for (int i = 0; i< 3; i++) {
ZKclient.instance.createNode(subWorkerPath + i, null);
}
Thread.sleep(1000);
// 删除3个子节点
for (int i = 0; i< 3; i++) {
ZKclient.instance.deleteNode(subWorkerPath + i);
}
Thread.sleep(1000);
// 删除当前节点
ZKclient.instance.deleteNode(workerPath);
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
log.error("PathCache监听失败, path=", workerPath);
}
}
}
</syntaxhighlight>
结果:
<syntaxhighlight lang="xml" highlight="">
- [TreeCache]节点增加, path=/test/listener/node, data=to set content
- [TreeCache]节点增加, path=/test/listener/node/id-0, data=to set content
- [TreeCache]节点增加, path=/test/listener/node/id-1, data=to set content
- [TreeCache]节点增加, path=/test/listener/node/id-2, data=to set content
- [TreeCache]节点删除, path=/test/listener/node/id-2, data=to set content
- [TreeCache]节点删除, path=/test/listener/node/id-1, data=to set content
- [TreeCache]节点删除, path=/test/listener/node/id-0, data=to set content
- [TreeCache]节点删除, path=/test/listener/node, data=to set content
</syntaxhighlight>

2021年9月30日 (四) 19:02的最新版本


关于

实现对ZooKeeper服务器端的事件监听,是客户端操作服务器的一项重点工作。在 Curator 的API中,事件监听有两种模式:

  1. 一种是标准的观察者模式:通过Watcher监听器实现的;
  2. 另一种是缓存监听模式:通过引入了一种“本地缓存视图”Cache机制去实现的。


二者的最大的不同在于:

  • Cache机制提供了反复注册的能力,而观察模式的Watcher监听器只能监听一次。
  • 在类型上,Watcher监听器比较简单,只有一种;Cache事件监听的种类有3种:Path Cache,Node Cache 和 Tree Cache。


Cache机制监听,可以理解为一个本地缓存视图与远程 ZooKeeper 视图的对比过程。
    简单来说,Cache 在客户端缓存了 ZNode 的各种状态,当感知到 Zk 集群的 ZNode 状态变化时,会触发事件,注册的监听器会处理这些事件。

虽然,Cache 是一种缓存机制,但是可以借助 Cache 实现实际的监听。

Watcher监听器

在 ZooKeeper 中,Watcher 监听器的使用过程:

  1. 定义一个 Watcher 接口的实例:使用 Watcher 接口的事件回调方法:“process(WatchedEvent event)”定义回调处理逻辑。如下:
       //演示:定义一个监听器
       Watcher w = new Watcher() {
          @Override
          public void process(WatchedEventwatchedEvent) {
             log.info("监听器watchedEvent:" + watchedEvent);
          }
       };
    
  2. 通过“实现了Watchable<T>接口”的构造者的“usingWatcher(Watcher w)”方法,为构造者设置 Watcher 监听器实例。如下:
       //为GetDataBuilder实例设置监听器
       byte[] content = client.getData().usingWatcher(w).forPath(workerPath);
    
    • 可以分别通过 getData()exists()getChildren() 方法(对应的构造器“GetDataBuilder”、“GetChildrenBuilder”和“ExistsBuilder”实现了“Watchable<T>”接口)去设置监听器。
  3. 一个 Watcher 监听器在向服务器端完成注册后,当服务器端的一些事件触发了这个 Watcher,就会向注册过的客户端会话发送一个事件通知来实现分布式的通知功能。
    • 在 Curator 客户端收到服务器端的通知后,会封装一个通知事件“WatchedEvent”的实例,再传递给监听器的“process(WatchedEvent e)”回调方法。


示例:利用 Watcher 来对节点事件进行监听

package com.crazymakercircle.zk.publishSubscribe;

import com.crazymakercircle.zk.ZKclient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.ZooKeeper.WatchedEvent;
import org.apache.ZooKeeper.Watcher;
import org.junit.Test;
import java.io.UnsupportedEncodingException;

/**
* 客户端监听实践
**/
@Slf4j
@Data
public class ZkWatcherDemo {
	private String workerPath = "/test/listener/remoteNode";
	private String subWorkerPath = "/test/listener/remoteNode/id-";
	
	// 利用Watcher来对节点进行监听操作
	@Test
	public void testWatcher() {
		CuratorFramework client = ZKclient.instance.getClient();
		
		// 检查节点是否存在,没有则创建
		boolean isExist = ZKclient.instance.isNodeExist(workerPath);
		if (!isExist) {
			ZKclient.instance.createNode(workerPath, null);
		}
		
		try {
			Watcher w = new Watcher() {
					@Override
					public void process(WatchedEventwatchedEvent) {
						System.out.println("监听到的变化 watchedEvent = " + watchedEvent);
					}
				};
				
			byte[] content = client.getData().usingWatcher(w).forPath(workerPath);
			
			log.info("监听节点内容:" + new String(content));
			
			// 第一次变更节点数据
			client.setData().forPath(workerPath, "第1次更改内容".getBytes());
			
			// 第二次变更节点数据
			client.setData().forPath(workerPath, "第2次更改内容".getBytes());
			
			Thread.sleep(Integer.MAX_VALUE);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

结果:

//….省略其他的输出
监听到的变化 watchedEvent = WatchedEventstate:SyncConnectedtype:NodeDataChanged path:/test/listener/node
如上:虽然两次调用 setData 方法改变节点内容,但是监听器仅仅监听到了一个事件。即:“监听器的注册是一次性的”,当第二次改变节点内容时,注册已经失效,无法再次捕获节点的变动事件。


所以,Watcher 监听器不适用于节点的数据频繁变动或者节点频繁变动这样的业务场景,而是适用于一些特殊的、变动不频繁的场景,例如“会话超时”、“授权失败”等这样的特殊场景。


关于“Watcher”、“Watchable<T>”、“WatchedEvent”、“WatcherEvent”

Watcher接口:用于表示一个标准的事件处理器,用来定义收到事件通知后相关的回调处理逻辑

  • 接口中包含两个内部枚举类:“KeeperState”(“通知状态”)和“EventType”(“事件类型”)。
Watcher 接口中定义的通知状态和事件类型.jpg


Watchable<T>接口:定义了“usingWatcher(Watcher w)”用于为实现了此接口的构造者设置监视器。

  • 实现了 Watchable<T> 接口的构造者,如:GetDataBuilderGetChildrenBuilderExistsBuilder。(分别对应了前一节所述三个方法)
  • 在 Curator 中,Watchable<T> 接口的源代码如下:
    package org.apache.curator.framework.api;
    import org.apache.ZooKeeper.Watcher;
    public interface Watchable<T> {
       T watched();
       T usingWatcher(Watcher w);
       T usingWatcher(CuratorWatcher cw);
    }
    


WatchedEvent通知事件:Curator 客户端将服务器端的通知(WatcherEvent)封装为一个“WatchedEvent”实例,再传递给监听器的“process(WatchedEvent e)”回调方法。

  • WatchedEvent包含了三个基本属性:
    1. 通知状态(keeperState)
    2. 事件类型(EventType)
    3. 节点路径(path)
    • WatchedEvent 中所用到的通知状态和事件类型定义在 Watcher 接口中。(如上图)
  • 关于“WatchedEvent”与“WatcherEvent”:
WatcherEvent:从 ZooKeeper 服务器端直接通过网络传输传递过来的通知事件实例的类型;
WatchedEvent:Curator 将服务器端的通知事件封装后的事件实例的类型;【WatchedEvent 类型没有实现序列化接口 java.io.Serializable,因此不能用于网络传输】

WatcherEvent 传输实例 和 Curator的 WatchedEvent 封装实例,在名称上基本上一样,只有一个字母之差,而且功能也是一样的,都表示的是同一个服务器端事件。

watcher 原理

见:“Zookeeper:watcher 事件机制原理

Cache机制监听

Watcher需要反复注册比较烦琐,所以,Curator 引入了 Cache 来监听 ZooKeeper 服务器端的事件。Cache 机制对 ZooKeeper 事件监听进行了封装,能够自动处理反复注册监听。


Cache 缓存机制的实现拥有一个系列的类型:

  1. Node Cache:节点缓存,可用于ZNode节点的监听;
  2. Path Cache:子节点缓存,可用于ZNode的子节点的监听;
  3. Tree Cache:树缓存是Path Cache的增强,不光能监听子节点,还能监听 ZNode 节点自身。


TreeCacheEvent 的事件类型:
    “NODE_ADDED:对应于节点的增加。
    “NODE_UPDATED:对应于节点的修改。
    “NODE_REMOVED:对应于节点的删除。
    
PathChildrenCacheEvent 的事件类型:
    “CHILD_ADDED”:对应于子节点的增加。
    “CHILD_UPDATED”:对应子于节点的修改。
    “CHILD_REMOVED”:对应子于节点的删除。


Node Cache(NodeCache)


Node Cache,用于监控节点的增加,删除和更新。

  • note:如果在监听的时候 NodeCache 监听的节点为空(也就是说 ZNode 路径不存在),也是可以的。之后,如果创建了对应的节点,也是会触发事件从而回调 nodeChanged 方法。


使用方法:

  1. 构造一个 NodeCache 缓存实例。其构造方法有两种:
    // client:Curator的框架客户端
    // path:监听节点的路径
    NodeCache(CuratorFramework client, String path)
    
    // client:Curator的框架客户端
    // path:监听节点的路径
    // dataIsCompressed:是否对数据进行压缩
    NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
    
  2. 构造一个 NodeCacheListener 监听器实例:
    NodeCacheListenerlistener = new NodeCacheListener() {
    	@Override
    	public void nodeChanged() throws Exception {
    		ChildData childData = nodeCache.getCurrentData();
    		
    		log.info("ZNode节点状态改变, path={}", childData.getPath());
    		log.info("ZNode节点状态改变, data={}",
    		new String(childData.getData(),"Utf-8"));
    		log.info("ZNode节点状态改变, stat={}", childData.getStat());
    	}
    };
    
    • NodeCacheListener 接口的定义如下:(只定义了一个简单的方法 nodeChanged,当节点变化时,这个方法就会被回调)
      package org.apache.curator.framework.recipes.cache;
      public interface NodeCacheListener {
         void nodeChanged() throws Exception;
      }
      
  3. 将 NodeCacheListener 实例注册到 NodeCache 缓存实例,使用缓存实例的“addListener”方法:
    //启动节点的事件监听
    nodeCache.getListenable().addListener(listener);
    
  4. 使用缓存实例 nodeCache 的“start”方法启动节点的事件监听。该方法有两种方式:
    // 启动节点
    void start()
    
    // 启动节点
    // buildInitial 代表着是否将该节点的数据立即进行缓存。(true:在start启动时立即调用NodeCache的getCurrentData方法,就能够得到对应节点的信息ChildData类)
    void start(boolean buildInitial)
    


示例:

package com.crazymakercircle.zk.publishSubscribe;

import com.crazymakercircle.zk.ZKclient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.ZooKeeper.WatchedEvent;
import org.apache.ZooKeeper.Watcher;
import org.junit.Test;

import java.io.UnsupportedEncodingException;

/**
* 客户端监听实践
**/
@Slf4j
@Data
public class ZkWatcherDemo {
	private String workerPath = "/test/listener/remoteNode";
	private String subWorkerPath = "/test/listener/remoteNode/id-";
	/**
	* NodeCache节点缓存的监听
	*/
	@Test
	public void testNodeCache() {
		//检查节点是否存在,没有则创建
		boolean isExist = ZKclient.instance.isNodeExist(workerPath);
		if (!isExist) {
			ZKclient.instance.createNode(workerPath, null);
		}
	
		CuratorFramework client = ZKclient.instance.getClient();
		
		try {
			// 构造 NodeCache 缓存实例
			NodeCache nodeCache = new NodeCache(client, workerPath, false);
			// 构造 NodeCacheListener 监听实例
			NodeCacheListener listener = new NodeCacheListener() {
					@Override
					public void nodeChanged() throws Exception {
						ChildData childData = nodeCache.getCurrentData();
						log.info("ZNode节点状态改变, path={}", childData.getPath());
						log.info("ZNode节点状态改变, data={}", new String(childData.getData(), "Utf-8"));
						log.info("ZNode节点状态改变, stat={}", childData.getStat());
					}
				};
				
			// 注册节点的事件监听
			nodeCache.getListenable().addListener(listener);
			// 启动节点监听
			nodeCache.start();
			
			// 第1次变更节点数据
			client.setData().forPath(workerPath, "第1次更改内容".getBytes());
			Thread.sleep(1000);
			// 第2次变更节点数据
			client.setData().forPath(workerPath, "第2次更改内容".getBytes());
			Thread.sleep(1000);
			// 第3次变更节点数据
			client.setData().forPath(workerPath, "第3次更改内容".getBytes());
			Thread.sleep(1000);
			// 第4次变更节点数据
			client.delete().forPath(workerPath);
			Thread.sleep(Integer.MAX_VALUE);
		} catch (Exception e) {
			log.error("创建NodeCache监听失败, path={}", workerPath);
		}
	}
}

结果:

//…省略前两次的输出
- ZNode节点状态改变, path=/test/listener/node
- ZNode节点状态改变, data=第3次更改内容
- ZNode节点状态改变, stat=17179869191,
...

Path Cache(PathChildrenCache)


PathChildrenCache 子节点缓存用于子节点的监听,监控当前节点的子节点被创建、更新或者删除:

  • 只能监听子节点,监听不到当前节点。
  • 不能递归监听,子节点下的子节点不能递归监控。


使用方法:

  1. 构造一个 PathChildrenCache 缓存实例。其构造方法有多个重载版本:
    //重载版本一
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
    
    //重载版本二
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
    
    //重载版本三
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
    
    //重载版本四
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
    
    • client:Curator框架的客户端;
    • path:监听节点的路径;
    • cacheData:是否把节点的内容缓存起来。如果为true,那么接收到节点列表变更事件的同时会将获得节点内容;
    • dataIsCompressed:是否对节点数据进行压缩;
    • threadFactory:示线程池工厂,当PathChildrenCache内部需要启动新的线程执行时,使用该线程池工厂来创建线程;
    • executorService:和threadFactory参数差不多,表示通过传入的线程池或者线程工厂来异步处理监听事件。
  2. 构造一个 PathChildrenCacheListener 监听器实例:
    PathChildrenCacheListener listener = new PathChildrenCacheListener(){
    		@Override
    		public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
    			try {
    				ChildData data = event.getData();
    				switch (event.getType()) {
    					case CHILD_ADDED:
    						log.info("子节点增加, path={}, data={}", data.getPath(),	new String(data.getData(), "UTF-8"));
    						break;
    					case CHILD_UPDATED:
    						log.info("子节点更新, path={}, data={}", data.getPath(),	new String(data.getData(), "UTF-8"));
    						break;
    					case CHILD_REMOVED:
    						log.info("子节点删除, path={}, data={}", data.getPath(),	new String(data.getData(), "UTF-8"));
    						break;
    					default:
    						break;
    				}
    			} catch (UnsupportedEncodingException e) {
    				e.printStackTrace();
    			}
    		}
    	};
    
    • PathChildrenCacheListener 接口的定义如下(只定义了一个简单的方法childEvent,当子节点有变化时,这个方法就会被回调):
      package org.apache.curator.framework.recipes.cache;
      import org.apache.curator.framework.CuratorFramework;
      public interface PathChildrenCacheListener {
      	void childEvent(CuratorFramework client, PathChildrenCacheEvent e) throws Exception;
      }
      
  3. 将 PathChildrenCacheListener 实例注册到 PathChildrenCache 缓存实例,使用缓存实例的“addListener”方法;
  4. 调用缓存实例 PathChildrenCache 的 “start” 方法,启动节点的事件监听;
    cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
    
    • start 方法可以传入启动的模式,定义在“PathChildrenCache.StartMode”枚举中,具体如下:
      1. BUILD_INITIAL_CACHE”模式:启动时同步初始化cache。(表示创建cache后就从服务器提取对应的数据)
      2. POST_INITIALIZED_EVENT”模式:启动时异步初始化cache。(表示创建cache后从服务器提取对应的数据,完成后触发“PathChildrenCacheEvent.Type#INITIALIZED”事件,cache中Listener会收到该事件的通知;)
      3. NORMAL”模式:启动时异步初始化cache。(完成后不会发出通知)


示例:

package com.crazymakercircle.zk.publishSubscribe;

import com.crazymakercircle.zk.ZKclient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.ZooKeeper.WatchedEvent;
import org.apache.ZooKeeper.Watcher;
import org.junit.Test;

import java.io.UnsupportedEncodingException;

/**
* 客户端监听实践
**/
@Slf4j
@Data
public class ZkWatcherDemo {
	private String workerPath = "/test/listener/remoteNode";
	private String subWorkerPath = "/test/listener/remoteNode/id-";
	
	/**
	* 子节点监听
	*/
	@Test
	public void testPathChildrenCache() {
		//检查节点是否存在,没有则创建
		boolean isExist = ZKclient.instance.isNodeExist(workerPath);
		if (!isExist) {
			ZKclient.instance.createNode(workerPath, null);
		}
		
		CuratorFramework client = ZKclient.instance.getClient();
		
		try {
			PathChildrenCache cache = new PathChildrenCache(client, workerPath, true);
			
			PathChildrenCacheListener listener = new PathChildrenCacheListener() {
				@Override
				public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
					try {
						ChildData data = event.getData();
						switch (event.getType()) {
							//子节点增加
							case CHILD_ADDED:
								log.info("子节点增加, path={}, data={}", data.getPath(), new String(data.getData(), "UTF-8"));
								break;
							//子节点更新
							case CHILD_UPDATED:
								log.info("子节点更新, path={}, data={}",	data.getPath(), new String(data.getData(), "UTF-8"));
								break;
							//子节点删除
							case CHILD_REMOVED:
								log.info("子节点删除, path={}, data={}",	data.getPath(),	new String(data.getData(), "UTF-8"));
								break;
							default:
								break;
						}
					} catch (
						UnsupportedEncodingException e) {
						e.printStackTrace();
					}
				}
			};
		
			//增加监听器
			cache.getListenable().addListener(listener);
			//设置启动模式
			cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
			Thread.sleep(1000);
			
			//创建3个子节点
			for (int i = 0; i< 3; i++) {
				ZKclient.instance.createNode(subWorkerPath + i, null);
			}
			Thread.sleep(1000);
			
			//删除3个子节点
			for (int i = 0; i< 3; i++) {
				ZKclient.instance.deleteNode(subWorkerPath + i);
			}
		} catch (Exception e) {
			log.error("PathCache监听失败, path=", workerPath);
		}
	}
}

结果:

- 子节点增加, path=/test/listener/node/id-0, data=to set content
- 子节点增加, path=/test/listener/node/id-2, data=to set content
- 子节点增加, path=/test/listener/node/id-1, data=to set content
......
- 子节点删除, path=/test/listener/node/id-2, data=to set content
- 子节点删除, path=/test/listener/node/id-0, data=to set content
- 子节点删除, path=/test/listener/node/id-1, data=to set content

Tree Cache(TreeCache)


Tree Cache可以看作是 Node Cache 和 Path Cache 的合体。Tree Cache 不光能监听子节点,还能监听节点自身。


使用方法:

  1. 构造一个 TreeCache 缓存实例。其构造方法有两种:(一般情况下,使用 TreeCache 的第一个构造函数即可)
    //TreeCache构造器之一
    TreeCache(CuratorFramework client, String path)
    
    //TreeCache构造器之二
    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)
    
    • dataIsCompressed:表示是否对数据进行压缩;
    • maxDepth:表示缓存的层次深度,默认为整数最大值。
    • executorService:表示监听的执行线程池,默认会创建一个单一线程的线程池。
    • createParentNodes:表示是否创建父亲节点,默认为false。
  2. 构造一个 TreeCacheListener 监听器实例:
    TreeCacheListener listener = new TreeCacheListener() {
    	@Override
    	public void childEvent(CuratorFramework client,TreeCacheEvent event) {
    		try {
    			ChildData data = event.getData();
    			if (data == null) {
    				log.info("数据为空");
    				return;
    			}
    			switch (event.getType()) {
    				case NODE_ADDED:
    					log.info("[TreeCache]节点增加, path={}, data={}",
    					data.getPath(), new String(data.getData(), "UTF-8"));
    					break;
    				case NODE_UPDATED:
    					log.info("[TreeCache]节点更新, path={}, data={}",
    					data.getPath(), new String(data.getData(), "UTF-8"));
    					break;
    				case NODE_REMOVED:
    					log.info("[TreeCache]节点删除, path={}, data={}",
    					data.getPath(), new String(data.getData(), "UTF-8"));
    					break;
    				default:
    					break;
    			}
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    	}
    };
    
    • TreeCacheListener 接口的定义如下:(只定义了一个简单的方法childEvent,当子节点有变化时,这个方法就会被回调)
      package org.apache.curator.framework.recipes.cache;
      import org.apache.curator.framework.CuratorFramework;
      public interface TreeCacheListener {
         void childEvent(CuratorFramework var1, TreeCacheEvent var2) throws Exception;
      }
      
  3. 将 TreeCacheListener 实例注册到 TreeCache 缓存实例,使用缓存实例的“addListener”方法;
  4. 调用缓存实例 TreeCache 的 “start” 方法,启动节点的事件监听;


示例:

package com.crazymakercircle.zk.publishSubscribe;

import com.crazymakercircle.zk.ZKclient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.ZooKeeper.WatchedEvent;
import org.apache.ZooKeeper.Watcher;
import org.junit.Test;

import java.io.UnsupportedEncodingException;

/**
* 客户端监听实践
**/
@Slf4j
@Data
public class ZkWatcherDemo {
	private String workerPath = "/test/listener/remoteNode";
	private String subWorkerPath = "/test/listener/remoteNode/id-";
	
	/**
	* Tree Cache不光能监听子节点,还能监听节点自身
	*/
	@Test
	public void testTreeCache() {
		//检查节点是否存在,没有则创建
		boolean isExist = ZKclient.instance.isNodeExist(workerPath);
		if (!isExist) {
			ZKclient.instance.createNode(workerPath, null);
		}
		
		CuratorFramework client = ZKclient.instance.getClient();
	
		try {
			// 构造 TreeCache 缓存实例
			TreeCache treeCache = new TreeCache(client, workerPath);
			
			// 构造 TreeCacheListener 监听实例
			TreeCacheListener listener = new TreeCacheListener() {
				@Override
				public void childEvent(CuratorFramework client,TreeCacheEvent event) {
					try {
						ChildData data = event.getData();
						if (data == null) {
							log.info("数据为空");
							return;
						}
						switch (event.getType()) {
							case NODE_ADDED:
								log.info("[TreeCache]节点增加, path={}, data={}",
								data.getPath(), new String(data.getData(), "UTF-8"));
								break;
							case NODE_UPDATED:
								log.info("[TreeCache]节点更新, path={}, data={}",
								data.getPath(), new String(data.getData(), "UTF-8"));
								break;
							case NODE_REMOVED:
								log.info("[TreeCache]节点删除, path={}, data={}",
								data.getPath(), new String(data.getData(), "UTF-8"));
								break;
							default:
								break;
						}
					} catch (UnsupportedEncodingException e) {
						e.printStackTrace();
					}
				}
			};
			
			// 设置监听器
			treeCache.getListenable().addListener(listener);
			// 启动缓存视图
			treeCache.start();
			Thread.sleep(1000);
			
			
			// 创建3个子节点
			for (int i = 0; i< 3; i++) {
				ZKclient.instance.createNode(subWorkerPath + i, null);
			}
			Thread.sleep(1000);
			
			// 删除3个子节点
			for (int i = 0; i< 3; i++) {
				ZKclient.instance.deleteNode(subWorkerPath + i);
			}
			Thread.sleep(1000);
			
			// 删除当前节点
			ZKclient.instance.deleteNode(workerPath);
			Thread.sleep(Integer.MAX_VALUE);
		} catch (Exception e) {
			log.error("PathCache监听失败, path=", workerPath);
		}
	}
}

结果:

- [TreeCache]节点增加, path=/test/listener/node, data=to set content

- [TreeCache]节点增加, path=/test/listener/node/id-0, data=to set content
- [TreeCache]节点增加, path=/test/listener/node/id-1, data=to set content
- [TreeCache]节点增加, path=/test/listener/node/id-2, data=to set content

- [TreeCache]节点删除, path=/test/listener/node/id-2, data=to set content
- [TreeCache]节点删除, path=/test/listener/node/id-1, data=to set content
- [TreeCache]节点删除, path=/test/listener/node/id-0, data=to set content

- [TreeCache]节点删除, path=/test/listener/node, data=to set content