“Zookeeper:watcher 事件机制原理”的版本间差异
跳到导航
跳到搜索
(建立内容为“category:Zookeeper == 关于 ==”的新页面) |
小 (Eijux移动页面Zookeeper:监视(Watches)至Zookeeper:watcher 事件机制原理,不留重定向) |
||
(未显示同一用户的4个中间版本) | |||
第1行: | 第1行: | ||
[[category:Zookeeper]] | [[category:Zookeeper]] | ||
__TOC__ | |||
== 关于 == | == 关于 == | ||
监听是一种简单的机制,使客户端收到关于 ZooKeeper 集合中的更改的通知: | |||
: 客户端可以在读取特定 znode 时设置监听(Watcher),Watcher 会向注册的客户端发送任何 znode 更改的通知。 | |||
* Znode 更改是“与znode相关的数据的修改”或“znode的子项中的更改”。 | |||
* Watcher 只触发一次就失效。(如果客户端想要再次通知,则必须通过另一个读取操作来完成) | |||
* 当连接会话过期时,客户端将与服务器断开连接,相关的watches也将被删除。 | |||
== 原理 == | |||
zookeeper 的 watcher 机制,可以分为四个过程: | |||
# 客户端注册 watcher; | |||
# 服务端处理 watcher; | |||
# 服务端触发 watcher 事件; | |||
# 客户端回调 watcher。 | |||
* 其中“客户端注册 watcher”有三种方式,调用客户端 API 可以分别通过 '''getData'''、'''exists'''、'''getChildren''' 实现。 | |||
以 exists 方法举例说明其原理: | |||
: <syntaxhighlight lang="java" highlight=""> | |||
public class WatcherDemo implements Watcher { | |||
static ZooKeeper zooKeeper; | |||
static { | |||
try { | |||
zooKeeper = new ZooKeeper("192.168.3.39:2181", 4000,new WatcherDemo()); | |||
} catch (IOException e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
@Override | |||
public void process(WatchedEvent event) { | |||
System.out.println("eventType:"+event.getType()); | |||
if(event.getType()==Event.EventType.NodeDataChanged){ | |||
try { | |||
zooKeeper.exists(event.getPath(),true); | |||
} catch (KeeperException e) { | |||
e.printStackTrace(); | |||
} catch (InterruptedException e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
} | |||
public static void main(String[] args) throws IOException, KeeperException, InterruptedException { | |||
String path="/watcher"; | |||
if(zooKeeper.exists(path,false)==null) { | |||
zooKeeper.create("/watcher", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); | |||
} | |||
Thread.sleep(1000); | |||
System.out.println("-----------"); | |||
// true表示使用zookeeper实例中配置的watcher | |||
Stat stat=zooKeeper.exists(path,true); | |||
System.in.read(); | |||
} | |||
} | |||
</syntaxhighlight> | |||
: 运行完程序,控制台显示: | |||
: [[File:Zookeeper:watcher机制:示例1.png|600px]] | |||
: 此时启动 zookeeper 命令行终端,查看并且删除 watcher 节点: | |||
: [[File:Zookeeper:watcher机制:示例2.png|600px]] | |||
: IDE 控制台输出,触发了节点删除事件: | |||
: [[File:Zookeeper:watcher机制:示例3.png|600px]] | |||
=== 流程及源码分析 === | |||
客户端发送请求给服务端是通过 TCP 长连接建立网络通道,底层默认是通过 '''java 的 NIO''' 方式,也可以配置 '''netty''' 实现方式。 | |||
: [[File:Zookeeper:watcher机制:客户端请求流程.png|600px]] | |||
注册 watcher 监听事件流程图: | |||
: [[File:Zookeeper:watcher机制:注册 watcher 监听事件流程.png|600px]] | |||
# '''客户端发送事件通知请求''': | |||
#: 在 Zookeeper 类调用 exists 方法时候,把创建事件监听封装到 request 对象中,watch 属性设置为 true,待服务端返回 response 后把监听事件封装到客户端的 ZKWatchManager 类中。 | |||
#: [[File:Zookeeper:watcher机制:流程及源码分析:客户端发送事件通知请求.png|500px]] | |||
# '''服务端处理 watcher 事件的请求:''' | |||
#: 服务端 NIOServerCnxn 类用来处理客户端发送过来的请求,最终调用到 FinalRequestProcessor,其中有一段源码添加客户端发送过来的 watcher 事件: | |||
#: [[File:Zookeeper:watcher机制:流程及源码分析:服务端处理 watcher 事件的请求1.png|500px]] | |||
#: 然后进入 statNode 方法,在 DataTree 类方法中添加 watcher 事件,并保存至 WatchManager 的 watchTable 与 watchTable 中。 | |||
#: [[File:Zookeeper:watcher机制:流程及源码分析:服务端处理 watcher 事件的请求2.png|500px]] | |||
# '''服务端触发 watcher 事件流程:''' | |||
#: 若服务端某个被监听的节点发生事务请求,服务端处理请求过程中调用 FinalRequestProcessor 类 processRequest 方法中的代码如下所示: | |||
#: [[File:Zookeeper:watcher机制:流程及源码分析:服务端触发 watcher 事件流程1.png|500px]] | |||
#: 删除调用链最终到 DataTree 类中删除节点分支的触发代码段: | |||
#: [[File:Zookeeper:watcher机制:流程及源码分析:服务端触发 watcher 事件流程2.png|500px]] | |||
#: 进入 WatchManager 类的 triggerWatch 方法: | |||
#: [[File:Zookeeper:watcher机制:流程及源码分析:服务端触发 watcher 事件流程3.png|500px]] | |||
#: 继续跟踪进入 NIOServerCnxn,构建了一个 xid 为 -1,zxid 为 -1 的 ReplyHeader 对象,然后再调用 sendResonpe 方法: | |||
#: [[File:Zookeeper:watcher机制:流程及源码分析:服务端触发 watcher 事件流程4.png|500px]] | |||
# '''客户端回调 watcher 事件:''' | |||
#: 客户端 SendThread 类 readResponse 方法接收服务端触发的事件通知,进入 xid 为 -1 流程,处理 Event 事件: | |||
#: [[File:Zookeeper:watcher机制:流程及源码分析:客户端回调 watcher 事件1.png|500px]] | |||
#: [[File:Zookeeper:watcher机制:流程及源码分析:客户端回调 watcher 事件2.png|500px]] |
2021年9月30日 (四) 19:01的最新版本
关于
监听是一种简单的机制,使客户端收到关于 ZooKeeper 集合中的更改的通知:
- 客户端可以在读取特定 znode 时设置监听(Watcher),Watcher 会向注册的客户端发送任何 znode 更改的通知。
- Znode 更改是“与znode相关的数据的修改”或“znode的子项中的更改”。
- Watcher 只触发一次就失效。(如果客户端想要再次通知,则必须通过另一个读取操作来完成)
- 当连接会话过期时,客户端将与服务器断开连接,相关的watches也将被删除。
原理
zookeeper 的 watcher 机制,可以分为四个过程:
- 客户端注册 watcher;
- 服务端处理 watcher;
- 服务端触发 watcher 事件;
- 客户端回调 watcher。
- 其中“客户端注册 watcher”有三种方式,调用客户端 API 可以分别通过 getData、exists、getChildren 实现。
以 exists 方法举例说明其原理:
public class WatcherDemo implements Watcher { static ZooKeeper zooKeeper; static { try { zooKeeper = new ZooKeeper("192.168.3.39:2181", 4000,new WatcherDemo()); } catch (IOException e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { System.out.println("eventType:"+event.getType()); if(event.getType()==Event.EventType.NodeDataChanged){ try { zooKeeper.exists(event.getPath(),true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { String path="/watcher"; if(zooKeeper.exists(path,false)==null) { zooKeeper.create("/watcher", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } Thread.sleep(1000); System.out.println("-----------"); // true表示使用zookeeper实例中配置的watcher Stat stat=zooKeeper.exists(path,true); System.in.read(); } }
- 运行完程序,控制台显示:
- 此时启动 zookeeper 命令行终端,查看并且删除 watcher 节点:
- IDE 控制台输出,触发了节点删除事件:
流程及源码分析
客户端发送请求给服务端是通过 TCP 长连接建立网络通道,底层默认是通过 java 的 NIO 方式,也可以配置 netty 实现方式。
注册 watcher 监听事件流程图:
- 客户端发送事件通知请求:
- 服务端处理 watcher 事件的请求:
- 服务端触发 watcher 事件流程:
- 客户端回调 watcher 事件: