查看“Zookeeper:分布式锁”的源代码
←
Zookeeper:分布式锁
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:Zookeeper]] == 关于 == <pre> 在单体的应用开发场景中涉及并发同步的时候,大家往往采用 Synchronized(同步)或者其他同一个 JVM 内 Lock 机制来解决多线程间的同步问题。 在分布式集群工作的开发场景中,就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题。这种跨机器的锁就是“分布式锁”。 </pre> 分布式锁是控制分布式系统之间同步访问共享资源的一种方式。 == Zookeeper分布式锁思路 == 根据网络和书籍上的资料,主要有以下三种方式来实现分布式锁: # 非公平排他锁:利用 Zookeeper 同一目录下 Znode 的唯一性; # 公平排他锁:利用 Zookeeper 顺序节点的递增有序; # 公平读写锁:(同上) 此外,以上还用到了:会话失效时临时节点自动删除的特性,以及节点的监听机制等。 * 以上都可以有可重入的实现方式。 === 思路一:非公平排他锁 === '''排他锁'''(Exclusive Locks),又被称为'''写锁'''或'''独占锁''': : 如果事务 T1 对数据对象 O1 加上排他锁,那么整个加锁期间,只允许事务 T1 对 O1 进行读取和更新操作,其他任何事务都不能进行读或写。 定义锁: '''<syntaxhighlight lang="java" highlight=""> /exclusive_lock/lock </syntaxhighlight>''' 实现方式:利用 zookeeper 的'''同级节点的唯一性'''特性: # 获取锁:每个线程都试图在“/exclusive_lock”节点下创建'''临时节点'''“/exclusive_lock/lock”,最终只有一个线程能创建成功,即获得了锁。 # 监听:所有没有获取到锁的线程在“/exclusive_lock”节点上注册一个子节点变更的 '''watcher''' 监听事件,以便重新争取获得锁。 * 竞争锁的线程不会进行排队,意味着此为“非公平锁”。 * 可能出现的问题:当锁被释放或者获得锁的线程宕机,所有监听“/exclusive_lock”子节点的线程都会作出反应,这样会给服务器带来巨大压力。【“'''羊群效应'''”】 === 思路二:公平排他锁 === 定义锁: '''<syntaxhighlight lang="java" highlight=""> /exclusive_lock/[hostname]-序号 </syntaxhighlight>''' 实现方式: # 获取锁: ## 每个线程都试图在“/exclusive_lock”节点下创建'''临时顺序节点'''; ## 线程获取所有已创建的子节点列表,通过判断“自己是否是序号最小的节点”来确定是否获得锁; # 监听:所有没有获得锁的线程向比自己序号小的'''前一个节点'''注册 '''watcher''' 监听事件,以便重新争取获得锁。 * 竞争锁的线程按照创建的临时顺序节点的序号进行排队,意味着此为“公平锁”。 * 每次的所释放只会由下一个临时顺序节点作出反应,故不会出现“羊群效应”。 === 思路三:公平读写锁 === 定义锁: '''<syntaxhighlight lang="java" highlight=""> /shared_lock/[hostname]-请求类型W/R-序号 </syntaxhighlight>''' 实现方式: # 获取锁: ## 每个线程都试图在“/exclusive_lock”节点下创建'''临时顺序节点'''; ## 线程获取所有已创建的子节点列表,并判断获是否得锁: ##* 读请求:如果“所有比自己小的节点都是读请求”,或者“自己是序号最小的节点”,则可以获取读锁; ##* 写请求:如果“自己是序号最小的节点”,则可以获取写锁。 # 监听: #* 读请求:向比自己序号小的'''前一个写请求节点'''注册 '''watcher''' 监听; #* 写请求:向比自己序号小的'''前一个节点'''注册 '''watcher''' 监听。 == Zookeeper分布式锁的实现 == 以最经典的分布式锁:“可重入的公平锁”来实现。(思路二) 利用 ZooKeeper 的'''临时顺序节点''': # ZooKeeper 临时顺序节点的递增有序性可以确保锁的公平; #* 由于网络异常或者其他原因造成集群中占用锁的客户端失联时,该临时节点自动被删除,锁能够被有效释放; # ZooKeeper的节点监听机制可以保障占有锁的传递有序而且高效; #* 等待锁的线程只需要监听前一个 ZNode 即可; #* ZooKeeper 的节点监听机制能避免羊群效应; 实现: # 定义了一个锁的接口 Lock,仅仅两个抽象方法:一个加锁方法,一个解锁方法 #: <syntaxhighlight lang="Java" highlight=""> package com.crazymakercircle.zk.distributedLock; /** * 锁的接口 **/ public interface Lock { /** * 加锁方法 * * @return 是否成功加锁 */ boolean lock(); /** * 解锁方法 * * @return 是否成功解锁 */ boolean unlock(); } </syntaxhighlight> # 接口实现: #: <syntaxhighlight lang="Java" highlight=""> package com.crazymakercircle.zk.distributedLock; import com.crazymakercircle.zk.ZKclient; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.ZooKeeper.WatchedEvent; import org.apache.ZooKeeper.Watcher; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 分布式锁的实现 **/ @Slf4j public class ZkLock implements Lock { // Zk客户端 CuratorFramework client = null; // ZkLock的节点链接 private static final String ZK_PATH = "/test/lock"; private static final String LOCK_PREFIX = ZK_PATH + "/"; private static final long WAIT_TIME = 1000; // 创建的ZNode的全路径 private String locked_path = null; // 创建的ZNode的序号 private String locked_short_path = null; // 前一个ZNode的路径 private String prior_path = null; // 锁的重入计数器:加锁时+1,解锁时-1(为了线程安全,使用原子类型AtomicInteger,而非int) final AtomicInteger lockCount = new AtomicInteger(0); // 尝试加锁的线程 private Thread thread; // 构造函数:获取zk客户端 public ZkLock() { ZKclient.instance.init(); if (!ZKclient.instance.isNodeExist(ZK_PATH)) { ZKclient.instance.createNode(ZK_PATH, null); } client = ZKclient.instance.getClient(); } /*************************************** 加锁 ***************************************/ /** * 加锁的实现: * 1、重入锁的判断 * 2、尝试加锁:如果是第一个等待锁的子节点则成功,如果不是则循环等待判定 * * @return 是否加锁成功 */ @Override public boolean lock() { // 同步块用于可重入判断,确保同一线程可以重复加锁 synchronized (this) { if (lockCount.get() == 0) { // 初次加锁:1、将thread设置为当前线程;2、重入计数器+1 thread = Thread.currentThread(); lockCount.incrementAndGet(); } else { // 如果锁已被获取,且不是当前线程持有,直接返回:获取锁失败 if (!thread.equals(Thread.currentThread())) { return false; } // 同一线程获取锁:不再执行加锁逻辑,重入计数器+1后直接返回:获取锁成功 lockCount.incrementAndGet(); return true; } } try { boolean locked = false; // 首先尝试着去加锁 locked = tryLock(); // 如果加锁成功,直接返回:获取锁成功 if (locked) { return true; } // 如果加锁失败,循环等待判定 while (!locked) { // 等待:监听前个节点的删除事件 await(); // 获取等待的子节点列表 List<String> waiters = getWaiters(); // 判断能否加锁:是则直接返回:获取锁成功 if (checkLocked(waiters)) { locked = true; } } return true; } catch (Exception e) { e.printStackTrace(); unlock(); } return false; } /** * 尝试加锁: * 1、创建临时顺序节点,并且保存自己的节点路径。 * 2、判断是否第一个节点:是则加锁成功,否则找到前一个ZNode节点,并保存其路径到prior_path。 * * @return 是否加锁成功 * @throws Exception 异常 */ private boolean tryLock() throws Exception { // 创建临时顺序ZNode节点 locked_path = ZKclient.instance.createEphemeralSeqNode(LOCK_PREFIX); if (null == locked_path) { throw new Exception("zk error"); } // 取得临时顺序ZNode节点的序号(后十位的序列数字) locked_short_path = getShortPath(locked_path); // 获取等待的子节点列表 List<String> waiters = getWaiters(); // 判断能否加锁:是则直接返回:获取锁成功 if (checkLocked(waiters)) { return true; } // 获取自己在等待子节点中的位置 int index = Collections.binarySearch(waiters, locked_short_path); if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了 throw new Exception("节点没有找到: " + locked_short_path); } // 自己不是第一个等待锁的子节点:保存前一个节点用于监听,返回尝试加锁失败 prior_path = ZK_PATH + "/" + waiters.get(index - 1); return false; } /** * 判断能否加锁:判断自己是否第一个等待的子节点 * * @param waiters 排队列表 * @return 成功状态 */ private boolean checkLocked(List<String> waiters) { // 节点按照编号,升序排列 Collections.sort(waiters); // 如果是第一个等待的子节点,代表自己已经获得了锁 if (locked_short_path.equals(waiters.get(0))) { log.info("成功地获取分布式锁,节点为{}", locked_short_path); return true; } return false; } /** * 等待:监听前个节点的删除事件 * * @throws Exception 可能会有Zk异常、网络异常 */ private void await() throws Exception { if (null == prior_path) { throw new Exception("prior_path error"); } final CountDownLatch latch = new CountDownLatch(1); // 监听方式一: Watcher 一次性订阅 Watcher w = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听到的变化watchedEvent = " + watchedEvent); log.info("[WatchedEvent]节点删除"); latch.countDown(); } }; // 开始监听 client.getData().usingWatcher(w).forPath(prior_path); // 监听方式二:TreeCache 订阅 /* TreeCache treeCache = new TreeCache(client, prior_path); TreeCacheListener listener = new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if (data != null) { switch (event.getType()) { case NODE_REMOVED: log.debug("[TreeCache]节点删除, path={}, data={}", data.getPath(), data.getData()); latch.countDown(); break; default: break; } } } }; //开始监听 treeCache.getListenable().addListener(listener); treeCache.start(); */ // 限时等待,最长等待时间为3s latch.await(WAIT_TIME, TimeUnit.SECONDS); } /*************************************** 释放锁 ***************************************/ /** * 释放锁:减少重入计数器 * 1、如果计数器!=0,直接返回,表示成功地释放了一次 * 2、如果计数器==0,移除Watchers监听器,并且删除创建的 ZNode 临时节点 * * @return 是否成功释放锁 */ @Override public boolean unlock() { // 只有加锁的线程能够解锁 if (!thread.equals(Thread.currentThread())) { return false; } // 减小重入计数器 int newLockCount = lockCount.decrementAndGet(); // 计数不能小于0 if (newLockCount < 0) { throw new IllegalMonitorStateException("计数不对: " + locked_path); } // 如果计数器不为0,直接返回 if (newLockCount != 0) { return true; } try { // 如果计数器为0,删除临时节点 if (ZKclient.instance.isNodeExist(locked_path)) { client.delete().forPath(locked_path); } } catch (Exception e) { e.printStackTrace(); return false; } return true; } } </syntaxhighlight> * 以上实现的 ZLock 仅仅对应一个 ZNode 路径,也就是说,仅仅代表一把锁。如果需要代表不同的 ZNode 路径,还需要进行简单改造。 测试: <syntaxhighlight lang="Java" highlight=""> package com.crazymakercircle.zk.distributedLock; import com.crazymakercircle.cocurrent.FutureTaskScheduler; import com.crazymakercircle.zk.ZKclient; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.junit.Test; /** * 测试分布式锁 **/ @Slf4j public class ZkLockTester { // 变量:需要锁来保护的公共资源 int count = 0; /** * 测试自定义分布式锁 * * @throws InterruptedException异常 */ @Test public void testLock() throws InterruptedException { // 10个并发任务 for (int i = 0; i< 10; i++) { FutureTaskScheduler.add(() -> { // 创建锁 ZkLock lock = new ZkLock(); lock.lock(); // 每条线程执行10次累加 for (int j = 0; j < 10; j++) { // 公共的资源变量累加 count++; } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("count = " + count); // 释放锁 lock.unlock(); }); } Thread.sleep(Integer.MAX_VALUE); } } </syntaxhighlight> == curator 提供的分布式锁 == 实际开发过程中,可以 curator 工具包封装的API('''curator-recipes''')帮助我们实现分布式锁。 <syntaxhighlight lang="java" highlight=""> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>x.x.x</version> </dependency> </syntaxhighlight> === curator 的几种锁方案 === curator 的几种锁方案: # '''InterProcessMutex''':分布式可重入排它锁 # '''InterProcessSemaphoreMutex''':分布式排它锁 # '''InterProcessReadWriteLock''':分布式读写锁 === 示例 === 下面例子模拟 50 个线程使用重入排它锁 InterProcessMutex 同时争抢锁: : <syntaxhighlight lang="java" highlight=""> public class InterprocessLock { private static CuratorFramework getZkClient() { String zkServerAddress = "192.168.3.39:2181"; ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000); CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectString(zkServerAddress) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); zkClient.start(); return zkClient; } public static void main(String[] args) { CuratorFramework zkClient = getZkClient(); String lockPath = "/lock"; InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath); //模拟50个线程抢锁 for (int i = 0; i < 50; i++) { new Thread(new TestThread(i, lock)).start(); } } static class TestThread implements Runnable { private Integer threadFlag; private InterProcessMutex lock; public TestThread(Integer threadFlag, InterProcessMutex lock) { this.threadFlag = threadFlag; this.lock = lock; } @Override public void run() { try { lock.acquire(); System.out.println("第"+threadFlag+"线程获取到了锁"); //等到1秒后释放锁 Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); }finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } } </syntaxhighlight> : 控制台每间隔一秒钟输出一条记录: : [[File:Zookeeper:分布式锁:curator:InterProcessMutex示例.png|600px]] == 总结 == ZooKeeper 分布式锁: * 优点:ZooKeeper 分布式锁(如 InterProcessMutex),能有效地解决分布式问题,不可重入问题,使用起来也较为简单。 * 缺点:ZooKeeper 实现的分布式锁,性能并不太高(因为每次在创建锁和释放锁的过程中,都要动态创建、销毁暂时节点来实现锁功能)。 *: 【ZooKeeper 中创建和删除节点只能通过 Leader(主)服务器来执行,然后 Leader 服务器还需要将数据同步到所有的Follower(从)服务器上】 目前分布式锁,比较成熟、主流的方案有两种: # 基于 Redis 的分布式锁。适用于'''并发量很大'''、性能要求很高而可靠性问题可以通过其他方案去弥补的场景。 # 基于 ZooKeeper 的分布式锁。适用于'''高可靠'''(高可用),而并发量不是太高的场景。 总之,在高性能、高并发的应用场景下,不建议使用 ZooKeeper 的分布式锁。而由于 ZooKeeper 的高可用性,因此在并发量不是太高的应用场景中,还是推荐使用 ZooKeeper 的分布式锁。
返回至“
Zookeeper:分布式锁
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息