查看“Zookeeper:Java API”的源代码
←
Zookeeper:Java API
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:Zookeeper]] __TOC__ == 关于 == Java 客户端包括两种方式: # 原生 API; # '''Curator'''; === Java 客户端搭建 === 使用的 IDE 为 IntelliJ IDEA,创建一个 maven 工程,命名为 zookeeper-demo,并且引入如下依赖,可以自行在maven中央仓库选择合适的版本: <syntaxhighlight lang="xml" highlight=""> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.8</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> </syntaxhighlight> maven 工程目录结构: : [[File:Zookeeper:Java客户端:maven工程目录结构.png|400px]] === 原生 API(Zookeeper API) === ---- ZooKeeper 有一个绑定 Java 和 C 的官方API。Zookeeper 社区为大多数语言(.NET,python等)提供非官方API。 使用ZooKeeper API,应用程序可以连接,交互,操作数据,协调,最后断开与ZooKeeper集合的连接。 * ZooKeeper API具有丰富的功能,以简单和安全的方式获得ZooKeeper集合的所有功能。 * ZooKeeper API提供'''同步'''和'''异步'''方法。 ZooKeeper集合和ZooKeeper API在各个方面都完全相辅相成,对开发人员有很大的帮助。 ==== ZooKeeper API的基础知识 ==== 与ZooKeeper集合进行交互的应用程序称为 '''ZooKeeper客户端'''或简称'''客户端'''。 * '''Znode''' 是ZooKeeper集合的核心组件,ZooKeeper API提供了一小组方法使用ZooKeeper集合来操纵znode的所有细节。 客户端应该遵循以步骤,与ZooKeeper集合进行清晰和干净的交互: # 连接到ZooKeeper集合。ZooKeeper集合为客户端分配会话ID。 # 定期向服务器发送心跳。否则,ZooKeeper集合将过期会话ID,客户端需要重新连接。 # 只要会话ID处于活动状态,就可以获取/设置znode。 # 所有任务完成后,断开与ZooKeeper集合的连接。如果客户端长时间不活动,则ZooKeeper集合将自动断开客户端。 ==== org.apache.zookeeper.ZooKeeper ==== ZooKeeper API的核心部分是'''ZooKeeper类'''。它提供了在其构造函数中连接ZooKeeper集合的选项,并具有以下方法: * '''connect''':连接到 ZooKeeper集合; * '''create''':创建 znode; * '''exists''':检查 znode 是否存在及其信息; * '''getData''':从特定的 znode 获取数据; * '''setData''':在特定的 znode 中设置数据; * '''getChildren''':获取特定 znode 中的所有子节点; * '''delete''':删除特定的 znode 及其所有子项; * '''close''':关闭连接; 示例: # '''连接到 ZooKeeper 集合''': #: ZooKeeper 类通过其构造函数提供connect功能。构造函数的签名如下 : #: '''<syntaxhighlight lang="java" highlight=""> ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher) </syntaxhighlight>''' #* connectionString - ZooKeeper集合主机。 #* sessionTimeout - 会话超时(以毫秒为单位)。 #* watcher - 实现“监视器”界面的对象。ZooKeeper集合通过监视器对象返回连接状态。 #:: <syntaxhighlight lang="java" highlight=""> // import java classes import java.io.IOException; import java.util.concurrent.CountDownLatch; // import zookeeper classes import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; public class ZooKeeperConnection { // declare zookeeper instance to access ZooKeeper ensemble private ZooKeeper zoo; final CountDownLatch connectedSignal = new CountDownLatch(1); // Method to connect zookeeper ensemble. public ZooKeeper connect(String host) throws IOException,InterruptedException { zoo = new ZooKeeper(host,5000,new Watcher() { public void process(WatchedEvent we) { if (we.getState() == KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); connectedSignal.await(); return zoo; } // Method to disconnect from zookeeper server public void close() throws InterruptedException { zoo.close(); } } </syntaxhighlight> # '''create 方法''': #: ZooKeeper类提供了在ZooKeeper集合中创建一个新的 znode 的 create 方法。create 方法的签名如下: #: '''<syntaxhighlight lang="java" highlight=""> create(String path, byte[] data, List<ACL> acl, CreateMode createMode) </syntaxhighlight>''' #* path - Znode 路径。例如,/myapp1,/myapp2,/myapp1/mydata1,myapp2/mydata1/myanothersubdata #* data - 要存储在指定 znode 路径中的数据 #* acl - 要创建的节点的访问控制列表。ZooKeeper API提供了一个静态接口 '''ZooDefs.Ids''' 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE返回打开znode的acl列表。 #* createMode - 节点的类型,即临时,顺序或两者。这是一个'''枚举'''。 #:: <syntaxhighlight lang="java" highlight=""> import java.io.IOException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; public class ZKCreate { // create static instance for zookeeper class. private static ZooKeeper zk; // create static instance for ZooKeeperConnection class. private static ZooKeeperConnection conn; // Method to create znode in zookeeper ensemble public static void create(String path, byte[] data) throws KeeperException,InterruptedException { zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } public static void main(String[] args) { // znode path String path = "/MyFirstZnode"; // Assign path to znode // data in byte array byte[] data = "My first zookeeper app".getBytes(); // Declare data try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); create(path, data); // Create the data to the specified path conn.close(); } catch (Exception e) { System.out.println(e.getMessage()); //Catch error message } } } </syntaxhighlight> # '''exists 方法''': #: ZooKeeper类提供了 exists 方法来检查znode的存在。如果指定的znode存在,则返回一个znode的'''元数据'''('''Stat''')。exists 方法的签名如下: #: '''<syntaxhighlight lang="java" highlight=""> exists(String path, boolean watcher) </syntaxhighlight>''' #* path- Znode路径 #* watcher - 布尔值,用于指定是否监视指定的 znode #:: <syntaxhighlight lang="java" highlight=""> import java.io.IOException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.Stat; public class ZKExists { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to check existence of znode and its status, if znode is available. public static Stat znode_exists(String path) throws KeeperException,InterruptedException { return zk.exists(path, true); } public static void main(String[] args) throws InterruptedException,KeeperException { String path = "/MyFirstZnode"; // Assign znode to the specified path try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = znode_exists(path); // Stat checks the path of the znode if(stat != null) { System.out.println("Node exists and the node version is " + stat.getVersion()); } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); // Catches error messages } } } </syntaxhighlight> #:: <syntaxhighlight lang="java" highlight=""> Node exists and the node version is 1. </syntaxhighlight> # '''getData 方法''': #: ZooKeeper类提供 getData 方法来获取附加在指定znode中的数据及其状态。 getData 方法的签名如下: #: '''<syntaxhighlight lang="java" highlight=""> getData(String path, Watcher watcher, Stat stat) </syntaxhighlight>''' #* path - Znode路径。 #* watcher - 监视器类型的回调函数。当指定的znode的数据改变时,ZooKeeper集合将通过监视器回调进行通知。这是一次性通知。 #* stat - 返回znode的元数据。 #:: <syntaxhighlight lang="java" highlight=""> import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.Stat; public class ZKGetData { private static ZooKeeper zk; private static ZooKeeperConnection conn; public static Stat znode_exists(String path) throws KeeperException,InterruptedException { return zk.exists(path,true); } public static void main(String[] args) throws InterruptedException, KeeperException { String path = "/MyFirstZnode"; final CountDownLatch connectedSignal = new CountDownLatch(1); try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = znode_exists(path); if(stat != null) { byte[] b = zk.getData(path, new Watcher() { public void process(WatchedEvent we) { if (we.getType() == Event.EventType.None) { switch(we.getState()) { case Expired: connectedSignal.countDown(); break; } } else { String path = "/MyFirstZnode"; try { byte[] bn = zk.getData(path, false, null); String data = new String(bn, "UTF-8"); System.out.println(data); connectedSignal.countDown(); } catch(Exception ex) { System.out.println(ex.getMessage()); } } } }, null); String data = new String(b, "UTF-8"); System.out.println(data); connectedSignal.await(); } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); } } } </syntaxhighlight> # '''setData 方法''': #: ZooKeeper类提供 setData 方法来修改指定znode中附加的数据。 setData 方法的签名如下: #: '''<syntaxhighlight lang="java" highlight=""> setData(String path, byte[] data, int version) </syntaxhighlight>''' #* path- Znode路径 #* data - 要存储在指定znode路径中的数据。 #* version- znode的当前版本。每当数据更改时,ZooKeeper会更新znode的版本号。 #:: <syntaxhighlight lang="java" highlight=""> import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import java.io.IOException; public class ZKSetData { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to update the data in a znode. Similar to getData but without watcher. public static void update(String path, byte[] data) throws KeeperException,InterruptedException { zk.setData(path, data, zk.exists(path,true).getVersion()); } public static void main(String[] args) throws InterruptedException,KeeperException { String path= "/MyFirstZnode"; byte[] data = "Success".getBytes(); //Assign data which is to be updated. try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); update(path, data); // Update znode data to the specified path } catch(Exception e) { System.out.println(e.getMessage()); } } } </syntaxhighlight> # '''getChildren 方法''': #: ZooKeeper类提供 getChildren 方法来获取特定znode的所有子节点。 getChildren 方法的签名如下: #: '''<syntaxhighlight lang="java" highlight=""> getChildren(String path, Watcher watcher) </syntaxhighlight>''' #* path - Znode路径。 #* watcher - 监视器类型的回调函数。当指定的znode被删除或znode下的子节点被创建/删除时,ZooKeeper集合将进行通知。这是一次性通知。 #:: <syntaxhighlight lang="java" highlight=""> import java.io.IOException; import java.util.*; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.Stat; public class ZKGetChildren { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to check existence of znode and its status, if znode is available. public static Stat znode_exists(String path) throws KeeperException,InterruptedException { return zk.exists(path,true); } public static void main(String[] args) throws InterruptedException,KeeperException { String path = "/MyFirstZnode"; // Assign path to the znode try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = znode_exists(path); // Stat checks the path if(stat!= null) { //“getChildren" method- get all the children of znode.It has two args, path and watch List <String> children = zk.getChildren(path, false); for(int i = 0; i < children.size(); i++) System.out.println(children.get(i)); //Print children's } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); } } } </syntaxhighlight> # '''delete 方法''': #: ZooKeeper类提供了 delete 方法来删除指定的znode。 delete 方法的签名如下: #: '''<syntaxhighlight lang="java" highlight=""> delete(String path, int version) </syntaxhighlight>''' #* path - Znode路径。 #* version - znode的当前版本。 #:: <syntaxhighlight lang="java" highlight=""> import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; public class ZKDelete { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to check existence of znode and its status, if znode is available. public static void delete(String path) throws KeeperException,InterruptedException { zk.delete(path,zk.exists(path,true).getVersion()); } public static void main(String[] args) throws InterruptedException,KeeperException { String path = "/MyFirstZnode"; //Assign path to the znode try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); delete(path); //delete the node with the specified path } catch(Exception e) { System.out.println(e.getMessage()); // catches error messages } } } </syntaxhighlight> === Curator === ---- Curator 是 Netflix 公司开源的一套 zookeeper 客户端框架,解决了很多 Zookeeper 客户端非常底层的细节开发工作,包括连接重连、反复注册 Watcher 和 NodeExistsException 异常等。 * curator 相关参考链接:[http://curator.apache.org/ http://curator.apache.org/]。 Curator 包含了几个包: * '''curator-framework''':对 zookeeper 的底层 api 的一些封装。 * '''curator-client''':提供一些客户端的操作,例如重试策略等。 * '''curator-recipes''':封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器、分布式 Barrier 等。【!!!】 示例: : <syntaxhighlight lang="java" highlight=""> public class CuratorDemo { public static void main(String[] args) throws Exception { CuratorFramework curatorFramework = CuratorFrameworkFactory. builder().connectString("192.168.3.33:2181," + "192.168.3.35:2181,192.168.3.37:2181"). sessionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000,3)). namespace("").build(); curatorFramework.start(); Stat stat=new Stat(); //查询节点数据 byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/runoob"); System.out.println(new String(bytes)); curatorFramework.close(); } } </syntaxhighlight> : 上一步设置了 /runoob 节点值,所以控制台输出。 : [[File:Zookeeper:Java客户端:Curator示例.png|800px]]
返回至“
Zookeeper:Java API
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息