“Zookeeper:分布式事件监听”的版本间差异
		
		
		
		
		
		跳到导航
		跳到搜索
		
				
		
		
	
| 第39行: | 第39行: | ||
示例:利用 Watcher 来对节点事件进行监听  | |||
<syntaxhighlight lang="Java" highlight="">  | <syntaxhighlight lang="Java" highlight="34,41">  | ||
package com.crazymakercircle.zk.publishSubscribe;  | |||
import com.crazymakercircle.zk.ZKclient;  | |||
import lombok.Data;  | |||
import lombok.extern.slf4j.Slf4j;  | |||
import org.apache.curator.framework.CuratorFramework;  | |||
import org.apache.curator.framework.recipes.cache.*;  | |||
import org.apache.ZooKeeper.WatchedEvent;  | |||
import org.apache.ZooKeeper.Watcher;  | |||
import org.junit.Test;  | |||
import java.io.UnsupportedEncodingException;  | |||
/**  | |||
* 客户端监听实践  | |||
**/  | |||
@Slf4j  | |||
@Data  | |||
public class ZkWatcherDemo {  | |||
	private String workerPath = "/test/listener/remoteNode";  | |||
	private String subWorkerPath = "/test/listener/remoteNode/id-";  | |||
	// 利用Watcher来对节点进行监听操作  | |||
	@Test  | |||
	public void testWatcher() {  | |||
		CuratorFramework client = ZKclient.instance.getClient();  | |||
		// 检查节点是否存在,没有则创建  | |||
		boolean isExist = ZKclient.instance.isNodeExist(workerPath);  | |||
		if (!isExist) {  | |||
			ZKclient.instance.createNode(workerPath, null);  | |||
		}  | |||
		try {  | |||
			Watcher w = new Watcher() {  | |||
					@Override  | |||
					public void process(WatchedEventwatchedEvent) {  | |||
						System.out.println("监听到的变化 watchedEvent = " + watchedEvent);  | |||
					}  | |||
				};  | |||
			byte[] content = client.getData().usingWatcher(w).forPath(workerPath);  | |||
			log.info("监听节点内容:" + new String(content));  | |||
			// 第一次变更节点数据  | |||
			client.setData().forPath(workerPath, "第1次更改内容".getBytes());  | |||
			// 第二次变更节点数据  | |||
			client.setData().forPath(workerPath, "第2次更改内容".getBytes());  | |||
			Thread.sleep(Integer.MAX_VALUE);  | |||
		} catch (InterruptedException e) {  | |||
			e.printStackTrace();  | |||
		} catch (Exception e) {  | |||
			e.printStackTrace();  | |||
		}  | |||
	}  | |||
}  | |||
</syntaxhighlight>  | |||
结果:  | |||
<syntaxhighlight lang="bash" highlight="">  | |||
//….省略其他的输出  | |||
监听到的变化 watchedEvent = WatchedEventstate:SyncConnectedtype:NodeDataChanged path:/test/listener/node  | |||
</syntaxhighlight>  | </syntaxhighlight>  | ||
如上,虽然两次调用 setData 方法改变节点内容,但是监听器仅仅监听到了一个事件。即:“'''监听器的注册是一次性的'''”,当第二次改变节点内容时,注册已经失效,无法再次捕获节点的变动事件。  | |||
所以,Watcher 监听器不适用于节点的数据频繁变动或者节点频繁变动这样的业务场景,而是适用于一些特殊的、变动不频繁的场景,例如“会话超时”、“授权失败”等这样的特殊场景。  | |||
=== 关于“Watcher”、“Watchable<T>”、“WatchedEvent”、“WatcherEvent” ===  | === 关于“Watcher”、“Watchable<T>”、“WatchedEvent”、“WatcherEvent” ===  | ||
2021年9月30日 (四) 00:11的版本
关于
实现对ZooKeeper服务器端的事件监听,是客户端操作服务器的一项重点工作。在 Curator 的API中,事件监听有两种模式:
- 一种是标准的观察者模式:通过Watcher监听器实现的;
 - 另一种是缓存监听模式:通过引入了一种“本地缓存视图”Cache机制去实现的。
- 可以理解为一个本地缓存视图与远程ZooKeeper视图的对比过程,简单来说,Cache在客户端缓存了ZNode的各种状态,当感知到Zk集群的ZNode状态变化时,会触发事件,注册的监听器会处理这些事件。
 - 虽然,Cache是一种缓存机制,但是可以借助Cache实现实际的监听。
 
 
二者的最大的不同在于:
- Cache机制提供了反复注册的能力,而观察模式的Watcher监听器只能监听一次。
 - 在类型上,Watcher监听器比较简单,只有一种。Cache事件监听的种类有3种:Path Cache,Node Cache 和 Tree Cache。
 
Watcher监听器
在 ZooKeeper 中,Watcher 监听器的使用过程:
- 定义一个 Watcher 接口的实例:使用 Watcher 接口的事件回调方法:“process(WatchedEvent event)”定义回调处理逻辑。如下:
//演示:定义一个监听器 Watcher w = new Watcher() { @Override public void process(WatchedEventwatchedEvent) { log.info("监听器watchedEvent:" + watchedEvent); } };
 - 通过“实现了Watchable<T>接口”的构造者的“usingWatcher(Watcher w)”方法,为构造者设置 Watcher 监听器实例。如下:
//为GetDataBuilder实例设置监听器 byte[] content = client.getData().usingWatcher(w).forPath(workerPath);
- 可以分别通过 getData()、exists()、getChildren() 方法去设置监听器。
 
 
一个 Watcher 监听器在向服务器端完成注册后,当服务器端的一些事件触发了这个 Watcher,就会向注册过的客户端会话发送一个事件通知来实现分布式的通知功能。
- 在 Curator 客户端收到服务器端的通知后,会封装一个通知事件“WatchedEvent”的实例,再传递给监听器的“process(WatchedEvent e)”回调方法。
 
示例:利用 Watcher 来对节点事件进行监听
package com.crazymakercircle.zk.publishSubscribe;
import com.crazymakercircle.zk.ZKclient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.ZooKeeper.WatchedEvent;
import org.apache.ZooKeeper.Watcher;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
/**
* 客户端监听实践
**/
@Slf4j
@Data
public class ZkWatcherDemo {
	private String workerPath = "/test/listener/remoteNode";
	private String subWorkerPath = "/test/listener/remoteNode/id-";
	
	// 利用Watcher来对节点进行监听操作
	@Test
	public void testWatcher() {
		CuratorFramework client = ZKclient.instance.getClient();
		
		// 检查节点是否存在,没有则创建
		boolean isExist = ZKclient.instance.isNodeExist(workerPath);
		if (!isExist) {
			ZKclient.instance.createNode(workerPath, null);
		}
		
		try {
			Watcher w = new Watcher() {
					@Override
					public void process(WatchedEventwatchedEvent) {
						System.out.println("监听到的变化 watchedEvent = " + watchedEvent);
					}
				};
				
			byte[] content = client.getData().usingWatcher(w).forPath(workerPath);
			
			log.info("监听节点内容:" + new String(content));
			
			// 第一次变更节点数据
			client.setData().forPath(workerPath, "第1次更改内容".getBytes());
			
			// 第二次变更节点数据
			client.setData().forPath(workerPath, "第2次更改内容".getBytes());
			
			Thread.sleep(Integer.MAX_VALUE);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
结果:
//….省略其他的输出
监听到的变化 watchedEvent = WatchedEventstate:SyncConnectedtype:NodeDataChanged path:/test/listener/node
如上,虽然两次调用 setData 方法改变节点内容,但是监听器仅仅监听到了一个事件。即:“监听器的注册是一次性的”,当第二次改变节点内容时,注册已经失效,无法再次捕获节点的变动事件。
所以,Watcher 监听器不适用于节点的数据频繁变动或者节点频繁变动这样的业务场景,而是适用于一些特殊的、变动不频繁的场景,例如“会话超时”、“授权失败”等这样的特殊场景。
关于“Watcher”、“Watchable<T>”、“WatchedEvent”、“WatcherEvent”
Watcher接口:用于表示一个标准的事件处理器,用来定义收到事件通知后相关的回调处理逻辑。
- 接口中包含两个内部枚举类:“KeeperState”(“通知状态”)和“EventType”(“事件类型”)。
 
Watchable<T>接口:定义了 usingWatcher(Watcher w) 用于为实现了此接口的构造者设置监视器。
- 实现了 Watchable<T> 接口的构造者,如:GetDataBuilder、GetChildrenBuilder 和 ExistsBuilder。(分别对应了前一节所述三个方法)
 - 在 Curator 中,Watchable<T> 接口的源代码如下:
package org.apache.curator.framework.api; import org.apache.ZooKeeper.Watcher; public interface Watchable<T> { T watched(); T usingWatcher(Watcher w); T usingWatcher(CuratorWatcher cw); }
 
WatchedEvent通知事件:Curator 客户端将服务器端的通知(WatcherEvent)封装为一个“WatchedEvent”实例,再传递给监听器的“process(WatchedEvent e)”回调方法。
- WatchedEvent包含了三个基本属性:
- 通知状态(keeperState)
 - 事件类型(EventType)
 - 节点路径(path)
 
- WatchedEvent 中所用到的通知状态和事件类型定义在 Watcher 接口中。(如上图)
 
 - 关于“WatchedEvent”与“WatcherEvent”:
 
WatcherEvent:从 ZooKeeper 服务器端直接通过网络传输传递过来的通知事件实例的类型; WatchedEvent:Curator 将服务器端的通知事件封装后的事件实例的类型;【WatchedEvent 类型没有实现序列化接口 java.io.Serializable,因此不能用于网络传输】 WatcherEvent 传输实例 和 Curator的 WatchedEvent 封装实例,在名称上基本上一样,只有一个字母之差,而且功能也是一样的,都表示的是同一个服务器端事件。