查看“Zookeeper:Java API”的源代码
←
Zookeeper:Java API
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:Zookeeper]] __TOC__ == 关于 == 与ZooKeeper集合进行交互的应用程序称为 '''ZooKeeper客户端'''或简称'''客户端'''。ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。 可供选择的Java客户端API有: # ZooKeeper官方的Java客户端API; # ZkClient; # '''Curator'''; == Zookeeper API(官方API) == ZooKeeper 官方API的核心部分是'''ZooKeeper类'''(“org.apache.zookeeper.ZooKeeper”),它提供了基本的操作。例如,“创建会话”、“创建节点”、“读取节点”、“更新数据”、“删除节点”和“检查节点是否存在”等。 === 创建连接 === ZooKeeper 类通过其构造函数提供 connect 功能。构造函数的签名如下 : '''<syntaxhighlight lang="java" highlight=""> ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher) </syntaxhighlight>''' * connectionString - ZooKeeper集合主机。 * sessionTimeout - 会话超时(以毫秒为单位)。 * watcher - 实现“监视器”界面的对象。ZooKeeper集合通过监视器对象返回连接状态。 示例: <syntaxhighlight lang="java" highlight=""> // import java classes import java.io.IOException; import java.util.concurrent.CountDownLatch; // import zookeeper classes import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; public class ZooKeeperConnection { // declare zookeeper instance to access ZooKeeper ensemble private ZooKeeper zoo; final CountDownLatch connectedSignal = new CountDownLatch(1); // Method to connect zookeeper ensemble. public ZooKeeper connect(String host) throws IOException,InterruptedException { zoo = new ZooKeeper(host, 5000, new Watcher() { public void process(WatchedEvent we) { if (we.getState() == KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); connectedSignal.await(); return zoo; } // Method to disconnect from zookeeper server public void close() throws InterruptedException { zoo.close(); } } </syntaxhighlight> === 创建ZNode === '''<syntaxhighlight lang="java" highlight=""> create(String path, byte[] data, List<ACL> acl, CreateMode createMode) </syntaxhighlight>''' * path - Znode 路径。例如,/myapp1,/myapp2,/myapp1/mydata1,myapp2/mydata1/myanothersubdata * data - 要存储在指定 znode 路径中的数据 * acl - 要创建的节点的访问控制列表。ZooKeeper API提供了一个静态接口 '''ZooDefs.Ids''' 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE返回打开znode的acl列表。 * createMode - 节点的类型,即临时,顺序或两者。这是一个'''枚举'''。 示例: <syntaxhighlight lang="java" highlight=""> public class ZKCreate { private static ZooKeeper zk; private static ZooKeeperConnection conn; public static void createNode() { String path = "/MyFirstZnode"; byte[] data = "My first zookeeper app".getBytes(); try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (Exception e) { System.out.println(e.getMessage()); } finally { conn.close(); } } } </syntaxhighlight> === 检查ZNode的存在 === 如果指定的znode存在,则返回一个znode的'''元数据'''('''Stat'''); '''<syntaxhighlight lang="java" highlight=""> exists(String path, boolean watcher) </syntaxhighlight>''' * path- Znode路径 * watcher - 布尔值,用于指定是否监视指定的 znode 示例: <syntaxhighlight lang="java" highlight=""> public class ZKExists { private static ZooKeeper zk; private static ZooKeeperConnection conn; public static void existsNode() { String path = "/MyFirstZnode"; // Assign znode to the specified path try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = zk.exists(path, true); if(stat != null) { System.out.println("Node exists and the node version is " + stat.getVersion()); } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); } finally { conn.close(); } } } </syntaxhighlight> <syntaxhighlight lang="java" highlight=""> Node exists and the node version is 1. </syntaxhighlight> === 读取ZNode === 获取附加在指定znode中的数据及其状态: '''<syntaxhighlight lang="java" highlight=""> getData(String path, Watcher watcher, Stat stat) </syntaxhighlight>''' * path - Znode路径。 * watcher - 监视器类型的回调函数。当指定的znode的数据改变时,ZooKeeper集合将通过监视器回调进行通知。这是一次性通知。 * stat - 返回znode的元数据。 示例: <syntaxhighlight lang="java" highlight=""> public class ZKGetData { private static ZooKeeper zk; private static ZooKeeperConnection conn; public static void readNode() { String path = "/MyFirstZnode"; final CountDownLatch connectedSignal = new CountDownLatch(1); try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = zk.exists(path,true); if(stat != null) { byte[] b = zk.getData(path, new Watcher() { public void process(WatchedEvent we) { if (we.getType() == Event.EventType.None) { switch(we.getState()) { case Expired: connectedSignal.countDown(); break; } } else { String path = "/MyFirstZnode"; try { byte[] bn = zk.getData(path, false, null); String data = new String(bn, "UTF-8"); System.out.println(data); connectedSignal.countDown(); } catch(Exception ex) { System.out.println(ex.getMessage()); } } } }, null); String data = new String(b, "UTF-8"); System.out.println(data); connectedSignal.await(); } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); } finally { conn.close(); } } } </syntaxhighlight> === 更新ZNode === 修改指定znode中附加的数据: '''<syntaxhighlight lang="java" highlight=""> setData(String path, byte[] data, int version) </syntaxhighlight>''' * path- Znode路径 * data - 要存储在指定znode路径中的数据。 * version- znode的当前版本。每当数据更改时,ZooKeeper会更新znode的版本号。 示例: <syntaxhighlight lang="java" highlight=""> public class ZKSetData { private static ZooKeeper zk; private static ZooKeeperConnection conn; public static void updateNode() { String path= "/MyFirstZnode"; byte[] data = "Success".getBytes(); try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); zk.setData(path, data, zk.exists(path,true).getVersion()); } catch(Exception e) { System.out.println(e.getMessage()); } finally { conn.close(); } } } </syntaxhighlight> === 获取ZNode子节点 === 获取特定znode的所有子节点: '''<syntaxhighlight lang="java" highlight=""> getChildren(String path, Watcher watcher) </syntaxhighlight>''' * path - Znode路径。 * watcher - 监视器类型的回调函数。当指定的znode被删除或znode下的子节点被创建/删除时,ZooKeeper集合将进行通知。这是一次性通知。 示例: <syntaxhighlight lang="java" highlight=""> public class ZKGetChildren { private static ZooKeeper zk; private static ZooKeeperConnection conn; public static void getNodeChildren() { String path = "/MyFirstZnode"; try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = zk.exists(path,true); if(stat!= null) { //“getChildren" method- get all the children of znode.It has two args, path and watch List <String> children = zk.getChildren(path, false); for(int i = 0; i < children.size(); i++) System.out.println(children.get(i)); //Print children's } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); } finally { conn.close(); } } } </syntaxhighlight> === 删除ZNode === '''<syntaxhighlight lang="java" highlight=""> delete(String path, int version) </syntaxhighlight>''' * path - Znode路径。 * version - znode的当前版本。 示例: <syntaxhighlight lang="java" highlight=""> public class ZKDelete { private static ZooKeeper zk; private static ZooKeeperConnection conn; public static void deleteNode() { String path = "/MyFirstZnode"; try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); zk.delete(path, zk.exists(path,true).getVersion()); } catch(Exception e) { System.out.println(e.getMessage()); } finally { conn.close(); } } } </syntaxhighlight> === 总结 === 使用ZooKeeper API,应用程序可以连接,交互,操作数据,协调,最后断开与ZooKeeper集合的连接。 * ZooKeeper API具有丰富的功能,以简单和安全的方式获得ZooKeeper集合的所有功能。 * ZooKeeper API提供'''同步'''和'''异步'''方法。【???】 不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,具体如下: # ZooKeeper的'''Watcher监测是一次性的''',每次触发之后都需要重新进行注册。 # 会话超时之后'''没有实现重连机制'''。 # '''异常处理烦琐''',ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。 # 仅提供了简单的 byte[] 数组类型的接口,'''没有提供 Java POJO 级别的序列化数据处理接口'''。 # 创建节点时如果抛出异常,'''需要自行检查节点是否存在'''。 # '''无法实现级联删除'''。 总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用。可供选择的Java客户端API之二,即第三方开源客户端API,主要有'''ZkClient'''和'''Curator'''。 == ZkClient == ZkClient 是一个开源客户端,在ZooKeeper原生API接口的基础上进行了包装,更便于开发人员使用。 : ZkClient客户端在一些著名的互联网开源项目中得到了应用,例如,阿里的分布式Dubbo框架对它进行了无缝集成。 ZkClient解决了ZooKeeper原生API接口的很多问题。例如,ZkClient提供了更加简洁的API,实现了'''会话超时重连'''、'''反复注册Watcher'''等问题。 虽然ZkClient对原生API进行了封装,但也有它自身的不足之处,具体如下: # ZkClient社区不活跃,文档不够完善,几乎没有参考文档。 # 异常处理简化(抛出RuntimeException)。 # 重试机制比较难用。 # 没有提供各种使用场景的参考实现。 示例:使用ZkClient客户端创建ZNode节点 <syntaxhighlight lang="Java" highlight=""> ZkClient zkClient = new ZkClient("192.168.1.105:2181", 10000, 10000, new SerializableSerializer()); //根节点路径 String PATH = "/test"; //判断是否存在 boolean rootExists = zkClient.exists(PATH); //如果存在,获取地址列表 if(!rootExists){ zkClient.createPersistent(PATH); } String zkPath = "/test/node-1"; boolean serviceExists = zkClient.exists(zkPath); if(!serviceExists){ zkClient.createPersistent(zkPath); } </syntaxhighlight> == Curator == <pre> Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。 “Guava is to Java that Curator to ZooKeeper”——Patrixck Hunt </pre> Curator 是 Netflix 公司开源的一套ZooKeeper客户端框架,其优点如下: # 和 ZkClient 一样它解决了非常底层的细节开发工作,包括'''连接'''、'''重连'''、'''反复注册Watcher'''的问题以及'''NodeExistsException异常'''等。 # 还提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如'''Recipe'''、'''共享锁服务'''、'''Master选举机制'''和'''分布式计算器'''等, # 另外,Curator还提供了一套非常优雅的'''链式调用'''API,与ZkClient客户端API相比,Curator的API优雅太多了,以创建ZNode节点为例,让大家实际对比一下。 示例:使用Curator客户端创建ZNode节点 <syntaxhighlight lang="Java" highlight=""> CuratorFramework client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy); String zkPath = "/test/node-1"; client.create().withMode(mode).forPath(zkPath); </syntaxhighlight> === 环境准备 === Curator 包含了以下几个包: # '''curator-framework''':是对ZooKeeper的底层API的一些封装。 # '''curator-client''':提供了一些客户端的操作,例如重试策略等。 # '''curator-recipes''':封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。 Maven 依赖: <syntaxhighlight lang="xml" highlight=""> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.ZooKeeper</groupId> <artifactId>ZooKeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.ZooKeeper</groupId> <artifactId>ZooKeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.ZooKeeper</groupId> <artifactId>ZooKeeper</artifactId> </exclusion> </exclusions> </dependency> </syntaxhighlight> === Curator === ---- Curator 是 Netflix 公司开源的一套 zookeeper 客户端框架,解决了很多 Zookeeper 客户端非常底层的细节开发工作,包括连接重连、反复注册 Watcher 和 NodeExistsException 异常等。 * curator 相关参考链接:[http://curator.apache.org/ http://curator.apache.org/]。 Curator 包含了几个包: * '''curator-framework''':对 zookeeper 的底层 api 的一些封装。 * '''curator-client''':提供一些客户端的操作,例如重试策略等。 * '''curator-recipes''':封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器、分布式 Barrier 等。【!!!】 示例: : <syntaxhighlight lang="java" highlight=""> public class CuratorDemo { public static void main(String[] args) throws Exception { CuratorFramework curatorFramework = CuratorFrameworkFactory. builder().connectString("192.168.3.33:2181," + "192.168.3.35:2181,192.168.3.37:2181"). sessionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000,3)). namespace("").build(); curatorFramework.start(); Stat stat=new Stat(); //查询节点数据 byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/runoob"); System.out.println(new String(bytes)); curatorFramework.close(); } } </syntaxhighlight> : 上一步设置了 /runoob 节点值,所以控制台输出。 : [[File:Zookeeper:Java客户端:Curator示例.png|800px]]
返回至“
Zookeeper:Java API
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息