查看“Zookeeper:分布式锁”的源代码
←
Zookeeper:分布式锁
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:Zookeeper]] == 关于 == <pre> 在单体的应用开发场景中涉及并发同步的时候,大家往往采用 Synchronized(同步)或者其他同一个 JVM 内 Lock 机制来解决多线程间的同步问题。 在分布式集群工作的开发场景中,就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题。这种跨机器的锁就是“分布式锁”。 </pre> 分布式锁是控制分布式系统之间同步访问共享资源的一种方式。 == Zookeeper分布式锁思路 == 根据网络和书籍上的资料,主要有以下三种方式来实现分布式锁: # 非公平排他锁:利用 Zookeeper 同一目录下 Znode 的唯一性; # 公平排他锁:利用 Zookeeper 顺序节点的递增有序; # 公平读写锁:(同上) 此外,以上还用到了:会话失效时临时节点自动删除的特性,以及节点的监听机制等。 * 以上都可以有可重入的实现方式。 === 思路一:非公平排他锁 === '''排他锁'''(Exclusive Locks),又被称为'''写锁'''或'''独占锁''': : 如果事务 T1 对数据对象 O1 加上排他锁,那么整个加锁期间,只允许事务 T1 对 O1 进行读取和更新操作,其他任何事务都不能进行读或写。 定义锁: '''<syntaxhighlight lang="java" highlight=""> /exclusive_lock/lock </syntaxhighlight>''' 实现方式:利用 zookeeper 的'''同级节点的唯一性'''特性: # 获取锁:每个线程都试图在“/exclusive_lock”节点下创建'''临时节点'''“/exclusive_lock/lock”,最终只有一个线程能创建成功,即获得了锁。 # 监听:所有没有获取到锁的线程在“/exclusive_lock”节点上注册一个子节点变更的 '''watcher''' 监听事件,以便重新争取获得锁。 * 竞争锁的线程不会进行排队,意味着此为“非公平锁”。 * 可能出现的问题:当锁被释放或者获得锁的线程宕机,所有监听“/exclusive_lock”子节点的线程都会作出反应,这样会给服务器带来巨大压力。(“羊群效应”) === 思路二:公平排他锁 === 定义锁: '''<syntaxhighlight lang="java" highlight=""> /exclusive_lock/[hostname]-序号 </syntaxhighlight>''' 实现方式: # 获取锁: ## 每个线程都试图在“/exclusive_lock”节点下创建'''临时顺序节点'''; ## 线程获取所有已创建的子节点列表,通过判断“自己是否是序号最小的节点”来确定是否获得锁; # 监听:所有没有获得锁的线程向比自己序号小的'''前一个节点'''注册 '''watcher''' 监听事件,以便重新争取获得锁。 * 竞争锁的线程按照创建的临时顺序节点的序号进行排队,意味着此为“公平锁”。 * 每次的所释放只会由下一个临时顺序节点作出反应,故不会出现“羊群效应”。 === 思路三:公平读写锁 === 定义锁: '''<syntaxhighlight lang="java" highlight=""> /shared_lock/[hostname]-请求类型W/R-序号 </syntaxhighlight>''' 实现方式: # 获取锁: ## 每个线程都试图在“/exclusive_lock”节点下创建'''临时顺序节点'''; ## 线程获取所有已创建的子节点列表,并判断获得锁: ##* 读请求:如果所有比自己小的节点都是读请求,或者没有比自己序号小的节点,则可以获取读锁; ##* 写请求:如果自己是序号最小的节点,则可以获取写锁。 # 监听: #* 读请求:向比自己序号小的'''前一个写请求节点'''注册 '''watcher''' 监听; #* 写请求:向比自己序号小的'''前一个节点'''注册 '''watcher''' 监听。 == Zookeeper分布式锁的实现 == 以最经典的分布式锁:“可重入的公平锁”来实现。(思路二) == curator 提供的分布式锁 == 实际开发过程中,可以 curator 工具包封装的API('''curator-recipes''')帮助我们实现分布式锁。 <syntaxhighlight lang="java" highlight=""> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>x.x.x</version> </dependency> </syntaxhighlight> === curator 的几种锁方案 === curator 的几种锁方案: # '''InterProcessMutex''':分布式可重入排它锁 # '''InterProcessSemaphoreMutex''':分布式排它锁 # '''InterProcessReadWriteLock''':分布式读写锁 === 示例 === 下面例子模拟 50 个线程使用重入排它锁 InterProcessMutex 同时争抢锁: : <syntaxhighlight lang="java" highlight=""> public class InterprocessLock { private static CuratorFramework getZkClient() { String zkServerAddress = "192.168.3.39:2181"; ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000); CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectString(zkServerAddress) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); zkClient.start(); return zkClient; } public static void main(String[] args) { CuratorFramework zkClient = getZkClient(); String lockPath = "/lock"; InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath); //模拟50个线程抢锁 for (int i = 0; i < 50; i++) { new Thread(new TestThread(i, lock)).start(); } } static class TestThread implements Runnable { private Integer threadFlag; private InterProcessMutex lock; public TestThread(Integer threadFlag, InterProcessMutex lock) { this.threadFlag = threadFlag; this.lock = lock; } @Override public void run() { try { lock.acquire(); System.out.println("第"+threadFlag+"线程获取到了锁"); //等到1秒后释放锁 Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); }finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } } </syntaxhighlight> : 控制台每间隔一秒钟输出一条记录: : [[File:Zookeeper:分布式锁:curator:InterProcessMutex示例.png|600px]]
返回至“
Zookeeper:分布式锁
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息