查看“Zookeeper:分布式事件监听”的源代码
←
Zookeeper:分布式事件监听
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:Zookeeper]] __TOC__ == 关于 == 实现对ZooKeeper服务器端的事件监听,是客户端操作服务器的一项重点工作。在 Curator 的API中,事件监听有两种模式: # 一种是标准的观察者模式:通过'''Watcher监听器'''实现的; # 另一种是缓存监听模式:通过引入了一种“本地缓存视图”'''Cache机制'''去实现的。 二者的最大的不同在于: * Cache机制提供了反复注册的能力,而观察模式的Watcher监听器只能监听一次。 * 在类型上,Watcher监听器比较简单,只有一种;Cache事件监听的种类有3种:Path Cache,Node Cache 和 Tree Cache。 <pre> Cache机制监听,可以理解为一个本地缓存视图与远程 ZooKeeper 视图的对比过程。 简单来说,Cache 在客户端缓存了 ZNode 的各种状态,当感知到 Zk 集群的 ZNode 状态变化时,会触发事件,注册的监听器会处理这些事件。 虽然,Cache 是一种缓存机制,但是可以借助 Cache 实现实际的监听。 </pre> == Watcher监听器 == 在 ZooKeeper 中,Watcher 监听器的使用过程: # 定义一个 '''Watcher''' 接口的实例:使用 Watcher 接口的事件回调方法:“process(WatchedEvent event)”定义回调处理逻辑。如下: #: <syntaxhighlight lang="Java" highlight=""> //演示:定义一个监听器 Watcher w = new Watcher() { @Override public void process(WatchedEventwatchedEvent) { log.info("监听器watchedEvent:" + watchedEvent); } }; </syntaxhighlight> # 通过“实现了'''Watchable<T>'''接口”的构造者的“'''usingWatcher'''(Watcher w)”方法,为构造者设置 Watcher 监听器实例。如下: #: <syntaxhighlight lang="Java" highlight=""> //为GetDataBuilder实例设置监听器 byte[] content = client.getData().usingWatcher(w).forPath(workerPath); </syntaxhighlight> #* 可以分别通过 '''getData()'''、'''exists()'''、'''getChildren()''' 方法去设置监听器。 # 一个 Watcher 监听器在向服务器端完成注册后,当服务器端的一些事件触发了这个 Watcher,就会向注册过的客户端会话发送一个事件通知来实现分布式的通知功能。 #* 在 Curator 客户端收到服务器端的通知后,会封装一个通知事件“'''WatchedEvent'''”的实例,再传递给监听器的“process(WatchedEvent e)”回调方法。 示例:利用 Watcher 来对节点事件进行监听 : <syntaxhighlight lang="Java" highlight="34,41"> package com.crazymakercircle.zk.publishSubscribe; import com.crazymakercircle.zk.ZKclient; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.*; import org.apache.ZooKeeper.WatchedEvent; import org.apache.ZooKeeper.Watcher; import org.junit.Test; import java.io.UnsupportedEncodingException; /** * 客户端监听实践 **/ @Slf4j @Data public class ZkWatcherDemo { private String workerPath = "/test/listener/remoteNode"; private String subWorkerPath = "/test/listener/remoteNode/id-"; // 利用Watcher来对节点进行监听操作 @Test public void testWatcher() { CuratorFramework client = ZKclient.instance.getClient(); // 检查节点是否存在,没有则创建 boolean isExist = ZKclient.instance.isNodeExist(workerPath); if (!isExist) { ZKclient.instance.createNode(workerPath, null); } try { Watcher w = new Watcher() { @Override public void process(WatchedEventwatchedEvent) { System.out.println("监听到的变化 watchedEvent = " + watchedEvent); } }; byte[] content = client.getData().usingWatcher(w).forPath(workerPath); log.info("监听节点内容:" + new String(content)); // 第一次变更节点数据 client.setData().forPath(workerPath, "第1次更改内容".getBytes()); // 第二次变更节点数据 client.setData().forPath(workerPath, "第2次更改内容".getBytes()); Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } </syntaxhighlight> 结果: : <syntaxhighlight lang="bash" highlight=""> //….省略其他的输出 监听到的变化 watchedEvent = WatchedEventstate:SyncConnectedtype:NodeDataChanged path:/test/listener/node </syntaxhighlight> : 如上:虽然两次调用 setData 方法改变节点内容,但是监听器仅仅监听到了一个事件。即:“'''监听器的注册是一次性的'''”,当第二次改变节点内容时,注册已经失效,无法再次捕获节点的变动事件。 所以,Watcher 监听器不适用于节点的数据频繁变动或者节点频繁变动这样的业务场景,而是适用于一些特殊的、变动不频繁的场景,例如“会话超时”、“授权失败”等这样的特殊场景。 ---- === 关于“Watcher”、“Watchable<T>”、“WatchedEvent”、“WatcherEvent” === '''Watcher'''接口:用于表示一个标准的事件处理器,用来'''定义收到事件通知后相关的回调处理逻辑'''。 * 接口中包含两个内部枚举类:“'''KeeperState'''”(“通知状态”)和“'''EventType'''”(“事件类型”)。 : [[File:Watcher 接口中定义的通知状态和事件类型.jpg|600px]] '''Watchable<T>'''接口:定义了 usingWatcher(Watcher w) 用于为实现了此接口的构造者设置监视器。 * 实现了 Watchable<T> 接口的构造者,如:'''GetDataBuilder'''、'''GetChildrenBuilder''' 和 '''ExistsBuilder'''。(分别对应了前一节所述三个方法) * 在 Curator 中,Watchable<T> 接口的源代码如下: *: <syntaxhighlight lang="Java" highlight=""> package org.apache.curator.framework.api; import org.apache.ZooKeeper.Watcher; public interface Watchable<T> { T watched(); T usingWatcher(Watcher w); T usingWatcher(CuratorWatcher cw); } </syntaxhighlight> '''WatchedEvent'''通知事件:Curator 客户端将服务器端的通知(WatcherEvent)封装为一个“WatchedEvent”实例,再传递给监听器的“process(WatchedEvent e)”回调方法。 * WatchedEvent包含了三个基本属性: *# 通知状态(keeperState) *# 事件类型(EventType) *# 节点路径(path) ** WatchedEvent 中所用到的通知状态和事件类型定义在 '''Watcher''' 接口中。(如上图) * 关于“Watche'''d'''Event”与“Watche'''r'''Event”: <pre> WatcherEvent:从 ZooKeeper 服务器端直接通过网络传输传递过来的通知事件实例的类型; WatchedEvent:Curator 将服务器端的通知事件封装后的事件实例的类型;【WatchedEvent 类型没有实现序列化接口 java.io.Serializable,因此不能用于网络传输】 WatcherEvent 传输实例 和 Curator的 WatchedEvent 封装实例,在名称上基本上一样,只有一个字母之差,而且功能也是一样的,都表示的是同一个服务器端事件。 </pre> == Cache机制监听 ==
返回至“
Zookeeper:分布式事件监听
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息