“Zookeeper:分布式锁”的版本间差异
跳到导航
跳到搜索
无编辑摘要 |
|||
第45行: | 第45行: | ||
# 获取锁: | # 获取锁: | ||
## 每个线程都试图在“/exclusive_lock”节点下创建'''临时顺序节点'''; | ## 每个线程都试图在“/exclusive_lock”节点下创建'''临时顺序节点'''; | ||
## | ## 线程获取所有已创建的子节点列表,通过判断“自己是否是序号最小的节点”来确定是否获得锁; | ||
# 监听:所有没有获得锁的线程向比自己序号小的'''前一个节点'''注册 '''watcher''' 监听事件,以便重新争取获得锁。 | # 监听:所有没有获得锁的线程向比自己序号小的'''前一个节点'''注册 '''watcher''' 监听事件,以便重新争取获得锁。 | ||
2021年9月30日 (四) 23:28的版本
关于
在单体的应用开发场景中涉及并发同步的时候,大家往往采用 Synchronized(同步)或者其他同一个 JVM 内 Lock 机制来解决多线程间的同步问题。 在分布式集群工作的开发场景中,就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题。这种跨机器的锁就是“分布式锁”。
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。
Zookeeper分布式锁思路
根据网络和书籍上的资料,主要有以下三种方式来实现分布式锁:
- 非公平排他锁:利用 Zookeeper 同一目录下 Znode 的唯一性;
- 公平排他锁:利用 Zookeeper 顺序节点的递增有序;
- 公平读写锁:(同上)
此外,以上还用到了:会话失效时临时节点自动删除的特性,以及节点的监听机制等。
- 以上都可以有可重入的实现方式。
思路一:非公平排他锁
排他锁(Exclusive Locks),又被称为写锁或独占锁:
- 如果事务 T1 对数据对象 O1 加上排他锁,那么整个加锁期间,只允许事务 T1 对 O1 进行读取和更新操作,其他任何事务都不能进行读或写。
定义锁:
/exclusive_lock/lock
实现方式:利用 zookeeper 的同级节点的唯一性特性:
- 获取锁:每个线程都试图在“/exclusive_lock”节点下创建临时节点“/exclusive_lock/lock”,最终只有一个线程能创建成功,即获得了锁。
- 监听:所有没有获取到锁的线程在“/exclusive_lock”节点上注册一个子节点变更的 watcher 监听事件,以便重新争取获得锁。
- 竞争锁的线程不会进行排队,意味着此为“非公平锁”。
- 可能出现的问题:当锁被释放或者获得锁的线程宕机,所有监听“/exclusive_lock”子节点的线程都会作出反应,这样会给服务器带来巨大压力。(“羊群效应”)
思路二:公平排他锁
定义锁:
/exclusive_lock/[hostname]-序号
实现方式:
- 获取锁:
- 每个线程都试图在“/exclusive_lock”节点下创建临时顺序节点;
- 线程获取所有已创建的子节点列表,通过判断“自己是否是序号最小的节点”来确定是否获得锁;
- 监听:所有没有获得锁的线程向比自己序号小的前一个节点注册 watcher 监听事件,以便重新争取获得锁。
- 竞争锁的线程按照创建的临时顺序节点的序号进行排队,意味着此为“公平锁”。
- 每次的所释放只会由下一个临时顺序节点作出反应,故不会出现“羊群效应”。
思路三:公平读写锁
定义锁:
/shared_lock/[hostname]-请求类型W/R-序号
实现方式:
- 获取锁:
- 每个线程都试图在“/exclusive_lock”节点下创建临时顺序节点;
- 线程获取所有已创建的子节点列表,并判断获得锁:
- 读请求:如果所有比自己小的节点都是读请求,或者没有比自己序号小的节点,则可以获取读锁;
- 写请求:如果自己是序号最小的节点,则可以获取写锁。
- 监听:
- 读请求:向比自己序号小的前一个写请求节点注册 watcher 监听;
- 写请求:向比自己序号小的前一个节点注册 watcher 监听。
Zookeeper分布式锁的实现
以最经典的分布式锁:“可重入的公平锁”来实现。(思路二)
curator 提供的分布式锁
实际开发过程中,可以 curator 工具包封装的API(curator-recipes)帮助我们实现分布式锁。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>x.x.x</version>
</dependency>
curator 的几种锁方案
curator 的几种锁方案:
- InterProcessMutex:分布式可重入排它锁
- InterProcessSemaphoreMutex:分布式排它锁
- InterProcessReadWriteLock:分布式读写锁
示例
下面例子模拟 50 个线程使用重入排它锁 InterProcessMutex 同时争抢锁:
public class InterprocessLock { private static CuratorFramework getZkClient() { String zkServerAddress = "192.168.3.39:2181"; ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000); CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectString(zkServerAddress) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); zkClient.start(); return zkClient; } public static void main(String[] args) { CuratorFramework zkClient = getZkClient(); String lockPath = "/lock"; InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath); //模拟50个线程抢锁 for (int i = 0; i < 50; i++) { new Thread(new TestThread(i, lock)).start(); } } static class TestThread implements Runnable { private Integer threadFlag; private InterProcessMutex lock; public TestThread(Integer threadFlag, InterProcessMutex lock) { this.threadFlag = threadFlag; this.lock = lock; } @Override public void run() { try { lock.acquire(); System.out.println("第"+threadFlag+"线程获取到了锁"); //等到1秒后释放锁 Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); }finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
- 控制台每间隔一秒钟输出一条记录: