“Zookeeper:分布式锁”的版本间差异
跳到导航
跳到搜索
第425行: | 第425行: | ||
== curator 提供的分布式锁 == | == curator 提供的分布式锁 == | ||
实际开发过程中,可以 curator 工具包封装的API('''curator-recipes''' | 实际开发过程中,可以 curator 工具包封装的API('''curator-recipes''')帮助我们实现分布式锁: | ||
<syntaxhighlight lang="java" highlight=""> | <syntaxhighlight lang="java" highlight=""> | ||
<dependency> | <dependency> | ||
第436行: | 第434行: | ||
</syntaxhighlight> | </syntaxhighlight> | ||
curator 的几种锁方案: | curator 的几种锁方案: | ||
# '''InterProcessMutex''':分布式可重入排它锁 | # '''InterProcessMutex''':分布式可重入排它锁 | ||
#: <syntaxhighlight lang="Java" highlight=""> | |||
@Test | |||
public void sharedReentrantLock() throws Exception { | |||
// 创建共享锁 | |||
final InterProcessLock lock = new InterProcessMutex(client, lockPath); | |||
final InterProcessLock lock2 = new InterProcessMutex(client2, lockPath); | |||
final CountDownLatch countDownLatch = new CountDownLatch(2); | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
// 获取锁对象 | |||
try { | |||
lock.acquire(); | |||
System.out.println("1获取锁==============="); | |||
// 测试锁重入 | |||
lock.acquire(); | |||
System.out.println("1再次获取锁==============="); | |||
Thread.sleep(5 * 1000); | |||
lock.release(); | |||
System.out.println("1释放锁==============="); | |||
lock.release(); | |||
System.out.println("1再次释放锁==============="); | |||
countDownLatch.countDown(); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}).start(); | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
// 获取锁对象 | |||
try { | |||
lock2.acquire(); | |||
System.out.println("2获取锁==============="); | |||
// 测试锁重入 | |||
lock2.acquire(); | |||
System.out.println("2再次获取锁==============="); | |||
Thread.sleep(5 * 1000); | |||
lock2.release(); | |||
System.out.println("2释放锁==============="); | |||
lock2.release(); | |||
System.out.println("2再次释放锁==============="); | |||
countDownLatch.countDown(); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}).start(); | |||
countDownLatch.await(); | |||
} | |||
</syntaxhighlight> | |||
#: <syntaxhighlight lang="xml" highlight=""> | |||
1获取锁=============== | |||
1再次获取锁=============== | |||
1释放锁=============== | |||
1再次释放锁=============== | |||
2获取锁=============== | |||
2再次获取锁=============== | |||
2释放锁=============== | |||
2再次释放锁=============== | |||
</syntaxhighlight> | |||
# '''InterProcessSemaphoreMutex''':分布式排它锁 | # '''InterProcessSemaphoreMutex''':分布式排它锁 | ||
#*(注意,不可重入的锁很容易在一些情况导致死锁) | |||
#: <syntaxhighlight lang="Java" highlight=""> | |||
@Test | |||
public void sharedLock() throws Exception { | |||
// 创建共享锁 | |||
final InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath); | |||
final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath); | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
// 获取锁对象 | |||
try { | |||
lock.acquire(); | |||
System.out.println("1获取锁==============="); | |||
// 锁不可重入 | |||
Thread.sleep(5 * 1000); | |||
lock.release(); | |||
System.out.println("1释放锁==============="); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}).start(); | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
// 获取锁对象 | |||
try { | |||
lock2.acquire(); | |||
System.out.println("2获取锁==============="); | |||
Thread.sleep(5 * 1000); | |||
lock2.release(); | |||
System.out.println("2释放锁==============="); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}).start(); | |||
Thread.sleep(20 * 1000); | |||
} | |||
</syntaxhighlight> | |||
#: <syntaxhighlight lang="xml" highlight=""> | |||
1获取锁=============== | |||
1释放锁=============== | |||
2获取锁=============== | |||
2释放锁=============== | |||
</syntaxhighlight> | |||
# '''InterProcessReadWriteLock''':分布式读写锁 | # '''InterProcessReadWriteLock''':分布式读写锁 | ||
#* 读锁和读锁不互斥,只要有写锁就互斥。 | |||
#: <syntaxhighlight lang="Java" highlight=""> | |||
@Test | |||
public void sharedReentrantReadWriteLock() throws Exception { | |||
// 创建共享可重入读写锁 | |||
final InterProcessReadWriteLock locl1 = new InterProcessReadWriteLock(client, lockPath); | |||
final InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath); | |||
// 获取读写锁(使用 InterProcessMutex 实现, 所以是可以重入的) | |||
final InterProcessLock readLock = locl1.readLock(); | |||
final InterProcessLock readLockw = lock2.readLock(); | |||
final CountDownLatch countDownLatch = new CountDownLatch(2); | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
// 获取锁对象 | |||
try { | |||
readLock.acquire(); | |||
System.out.println("1获取读锁==============="); | |||
// 测试锁重入 | |||
readLock.acquire(); | |||
System.out.println("1再次获取读锁==============="); | |||
Thread.sleep(5 * 1000); | |||
readLock.release(); | |||
System.out.println("1释放读锁==============="); | |||
readLock.release(); | |||
System.out.println("1再次释放读锁==============="); | |||
countDownLatch.countDown(); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}).start(); | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
// 获取锁对象 | |||
try { | |||
Thread.sleep(500); | |||
readLockw.acquire(); | |||
System.out.println("2获取读锁==============="); | |||
// 测试锁重入 | |||
readLockw.acquire(); | |||
System.out.println("2再次获取读锁=============="); | |||
Thread.sleep(5 * 1000); | |||
readLockw.release(); | |||
System.out.println("2释放读锁==============="); | |||
readLockw.release(); | |||
System.out.println("2再次释放读锁==============="); | |||
countDownLatch.countDown(); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}).start(); | |||
countDownLatch.await(); | |||
} | |||
</syntaxhighlight> | |||
#: <syntaxhighlight lang="xml" highlight=""> | |||
1获取读锁=============== | |||
1再次获取读锁=============== | |||
2获取读锁=============== | |||
2再次获取读锁============== | |||
1释放读锁=============== | |||
2释放读锁=============== | |||
1再次释放读锁=============== | |||
2再次释放读锁=============== | |||
</syntaxhighlight> | |||
#: (如上所示,读锁不是互斥的) | |||
# '''InterProcessMultiLock''':多重锁,将多个锁作为单个实体管理的容器 | |||
#: <syntaxhighlight lang="Java" highlight="9"> | |||
@Test | |||
public void multiLock() throws Exception { | |||
// 可重入锁 | |||
final InterProcessLock interProcessLock1 = new InterProcessMutex(client, lockPath); | |||
// 不可重入锁 | |||
final InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client2, lockPath); | |||
// 创建多重锁对象 | |||
final InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2)); | |||
final CountDownLatch countDownLatch = new CountDownLatch(1); | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
// 获取锁对象 | |||
try { | |||
// 获取参数集合中的所有锁 | |||
lock.acquire(); | |||
// 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入 | |||
System.out.println(lock.acquire(2, TimeUnit.SECONDS)); | |||
// interProcessLock1 是可重入锁, 所以可以继续获取锁 | |||
System.out.println(interProcessLock1.acquire(2, TimeUnit.SECONDS)); | |||
// interProcessLock2 是不可重入锁, 所以获取锁失败 | |||
System.out.println(interProcessLock2.acquire(2, TimeUnit.SECONDS)); | |||
countDownLatch.countDown(); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}).start(); | |||
countDownLatch.await(); | |||
} | |||
</syntaxhighlight> | |||
#: <syntaxhighlight lang="xml" highlight=""> | |||
false | |||
true | |||
false | |||
</syntaxhighlight> | |||
curator 还提供了信号量: | |||
: '''InterProcessSemaphoreV2''': | |||
<pre> | |||
InterProcessSemaphoreV2 实现了一个跨 jvm 的信号量。 | |||
主要工作原理是: | |||
acquire时创建一个临时顺序节点,如果创建成功且临时节点数小于等于 maxLeases 则说明信号量获取成功,否则 wait 等待,等待目录发生变化或计数改变时唤醒。 | |||
整个 acquire 过程持 InterProcessMutex 而互斥,因为其中带有等待,性能不会太高。实际使用时等待时间最好不要太长。 | |||
</pre> | |||
: <syntaxhighlight lang="Java" highlight=""> | |||
@Test | |||
public void semaphore() throws Exception { | |||
// 创建一个信号量, Curator 以公平锁的方式进行实现 | |||
final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, 1); | |||
final CountDownLatch countDownLatch = new CountDownLatch(2); | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
// 获取锁对象 | |||
try { | |||
// 获取一个许可 | |||
Lease lease = semaphore.acquire(); | |||
logger.info("1获取读信号量==============="); | |||
Thread.sleep(5 * 1000); | |||
semaphore.returnLease(lease); | |||
logger.info("1释放读信号量==============="); | |||
countDownLatch.countDown(); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}).start(); | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
// 获取锁对象 | |||
try { | |||
// 获取一个许可 | |||
Lease lease = semaphore.acquire(); | |||
logger.info("2获取读信号量==============="); | |||
Thread.sleep(5 * 1000); | |||
semaphore.returnLease(lease); | |||
logger.info("2释放读信号量==============="); | |||
countDownLatch.countDown(); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}).start(); | |||
countDownLatch.await(); | |||
} | |||
</syntaxhighlight> | |||
: <syntaxhighlight lang="xml" highlight=""> | |||
2获取读信号量=============== | |||
2释放读信号量=============== | |||
1获取读信号量=============== | |||
1释放读信号量=============== | |||
</syntaxhighlight> | |||
* 当然可以一次获取多个信号量: | |||
*: <syntaxhighlight lang="Java" highlight=""> | |||
@Test | |||
public void semaphore() throws Exception { | |||
// 创建一个信号量, Curator 以公平锁的方式进行实现 | |||
final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, 3); | |||
final CountDownLatch countDownLatch = new CountDownLatch(2); | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
// 获取锁对象 | |||
try { | |||
// 获取2个许可 | |||
Collection<Lease> acquire = semaphore.acquire(2); | |||
logger.info("1获取读信号量==============="); | |||
Thread.sleep(5 * 1000); | |||
semaphore.returnAll(acquire); | |||
logger.info("1释放读信号量==============="); | |||
countDownLatch.countDown(); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}).start(); | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
// 获取锁对象 | |||
try { | |||
// 获取1个许可 | |||
Collection<Lease> acquire = semaphore.acquire(1); | |||
logger.info("2获取读信号量==============="); | |||
Thread.sleep(5 * 1000); | |||
semaphore.returnAll(acquire); | |||
logger.info("2释放读信号量==============="); | |||
countDownLatch.countDown(); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}).start(); | |||
countDownLatch.await(); | |||
} | |||
</syntaxhighlight> | |||
*: <syntaxhighlight lang="xml" highlight=""> | |||
2获取读信号量=============== | |||
1获取读信号量=============== | |||
2释放读信号量=============== | |||
1释放读信号量=============== | |||
</syntaxhighlight> | |||
=== 示例 === | === 示例 === | ||
第449行: | 第816行: | ||
String zkServerAddress = "192.168.3.39:2181"; | String zkServerAddress = "192.168.3.39:2181"; | ||
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000); | ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000); | ||
CuratorFramework zkClient = CuratorFrameworkFactory.builder() | CuratorFramework zkClient = CuratorFrameworkFactory.builder() | ||
.connectString(zkServerAddress) | .connectString(zkServerAddress) | ||
第455行: | 第823行: | ||
.retryPolicy(retryPolicy) | .retryPolicy(retryPolicy) | ||
.build(); | .build(); | ||
zkClient.start(); | zkClient.start(); | ||
return zkClient; | return zkClient; |
2021年10月1日 (五) 17:40的版本
关于
在单体的应用开发场景中涉及并发同步的时候,大家往往采用 Synchronized(同步)或者其他同一个 JVM 内 Lock 机制来解决多线程间的同步问题。 在分布式集群工作的开发场景中,就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题。这种跨机器的锁就是“分布式锁”。
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。
Zookeeper分布式锁思路
根据网络和书籍上的资料,主要有以下三种方式来实现分布式锁:
- 非公平排他锁:利用 Zookeeper 同一目录下 Znode 的唯一性;
- 公平排他锁:利用 Zookeeper 顺序节点的递增有序;
- 公平读写锁:(同上)
此外,以上还用到了:会话失效时临时节点自动删除的特性,以及节点的监听机制等。
- 以上都可以有可重入的实现方式。
思路一:非公平排他锁
排他锁(Exclusive Locks),又被称为写锁或独占锁:
- 如果事务 T1 对数据对象 O1 加上排他锁,那么整个加锁期间,只允许事务 T1 对 O1 进行读取和更新操作,其他任何事务都不能进行读或写。
定义锁:
/exclusive_lock/lock
实现方式:利用 zookeeper 的同级节点的唯一性特性:
- 获取锁:每个线程都试图在“/exclusive_lock”节点下创建临时节点“/exclusive_lock/lock”,最终只有一个线程能创建成功,即获得了锁。
- 监听:所有没有获取到锁的线程在“/exclusive_lock”节点上注册一个子节点变更的 watcher 监听事件,以便重新争取获得锁。
- 竞争锁的线程不会进行排队,意味着此为“非公平锁”。
- 可能出现的问题:当锁被释放或者获得锁的线程宕机,所有监听“/exclusive_lock”子节点的线程都会作出反应,这样会给服务器带来巨大压力。【“羊群效应”】
思路二:公平排他锁
定义锁:
/exclusive_lock/[hostname]-序号
实现方式:
- 获取锁:
- 每个线程都试图在“/exclusive_lock”节点下创建临时顺序节点;
- 线程获取所有已创建的子节点列表,通过判断“自己是否是序号最小的节点”来确定是否获得锁;
- 监听:所有没有获得锁的线程向比自己序号小的前一个节点注册 watcher 监听事件,以便重新争取获得锁。
- 竞争锁的线程按照创建的临时顺序节点的序号进行排队,意味着此为“公平锁”。
- 每次的所释放只会由下一个临时顺序节点作出反应,故不会出现“羊群效应”。
思路三:公平读写锁
定义锁:
/shared_lock/[hostname]-请求类型W/R-序号
实现方式:
- 获取锁:
- 每个线程都试图在“/exclusive_lock”节点下创建临时顺序节点;
- 线程获取所有已创建的子节点列表,并判断获是否得锁:
- 读请求:如果“所有比自己小的节点都是读请求”,或者“自己是序号最小的节点”,则可以获取读锁;
- 写请求:如果“自己是序号最小的节点”,则可以获取写锁。
- 监听:
- 读请求:向比自己序号小的前一个写请求节点注册 watcher 监听;
- 写请求:向比自己序号小的前一个节点注册 watcher 监听。
Zookeeper分布式锁的实现
以最经典的分布式锁:“可重入的公平锁”来实现。(思路二)
利用 ZooKeeper 的临时顺序节点:
- ZooKeeper 临时顺序节点的递增有序性可以确保锁的公平;
- 由于网络异常或者其他原因造成集群中占用锁的客户端失联时,该临时节点自动被删除,锁能够被有效释放;
- ZooKeeper的节点监听机制可以保障占有锁的传递有序而且高效;
- 等待锁的线程只需要监听前一个 ZNode 即可;
- ZooKeeper 的节点监听机制能避免羊群效应;
实现:
- 定义了一个锁的接口 Lock,仅仅两个抽象方法:一个加锁方法,一个解锁方法
package com.crazymakercircle.zk.distributedLock; /** * 锁的接口 **/ public interface Lock { /** * 加锁方法 * * @return 是否成功加锁 */ boolean lock(); /** * 解锁方法 * * @return 是否成功解锁 */ boolean unlock(); }
- 接口实现:
package com.crazymakercircle.zk.distributedLock; import com.crazymakercircle.zk.ZKclient; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.ZooKeeper.WatchedEvent; import org.apache.ZooKeeper.Watcher; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 分布式锁的实现 **/ @Slf4j public class ZkLock implements Lock { // Zk客户端 CuratorFramework client = null; // ZkLock的节点链接 private static final String ZK_PATH = "/test/lock"; private static final String LOCK_PREFIX = ZK_PATH + "/"; private static final long WAIT_TIME = 1000; // 创建的ZNode的全路径 private String locked_path = null; // 创建的ZNode的序号 private String locked_short_path = null; // 前一个ZNode的路径 private String prior_path = null; // 锁的重入计数器:加锁时+1,解锁时-1(为了线程安全,使用原子类型AtomicInteger,而非int) final AtomicInteger lockCount = new AtomicInteger(0); // 尝试加锁的线程 private Thread thread; // 构造函数:获取zk客户端 public ZkLock() { ZKclient.instance.init(); if (!ZKclient.instance.isNodeExist(ZK_PATH)) { ZKclient.instance.createNode(ZK_PATH, null); } client = ZKclient.instance.getClient(); } /*************************************** 加锁 ***************************************/ /** * 加锁的实现: * 1、重入锁的判断 * 2、尝试加锁:如果是第一个等待锁的子节点则成功,如果不是则循环等待判定 * * @return 是否加锁成功 */ @Override public boolean lock() { // 同步块用于可重入判断,确保同一线程可以重复加锁 synchronized (this) { if (lockCount.get() == 0) { // 初次加锁:1、将thread设置为当前线程;2、重入计数器+1 thread = Thread.currentThread(); lockCount.incrementAndGet(); } else { // 如果锁已被获取,且不是当前线程持有,直接返回:获取锁失败 if (!thread.equals(Thread.currentThread())) { return false; } // 同一线程获取锁:不再执行加锁逻辑,重入计数器+1后直接返回:获取锁成功 lockCount.incrementAndGet(); return true; } } try { boolean locked = false; // 首先尝试着去加锁 locked = tryLock(); // 如果加锁成功,直接返回:获取锁成功 if (locked) { return true; } // 如果加锁失败,循环等待判定 while (!locked) { // 等待:监听前个节点的删除事件 await(); // 获取等待的子节点列表 List<String> waiters = getWaiters(); // 判断能否加锁:是则直接返回:获取锁成功 if (checkLocked(waiters)) { locked = true; } } return true; } catch (Exception e) { e.printStackTrace(); unlock(); } return false; } /** * 尝试加锁: * 1、创建临时顺序节点,并且保存自己的节点路径。 * 2、判断是否第一个节点:是则加锁成功,否则找到前一个ZNode节点,并保存其路径到prior_path。 * * @return 是否加锁成功 * @throws Exception 异常 */ private boolean tryLock() throws Exception { // 创建临时顺序ZNode节点 locked_path = ZKclient.instance.createEphemeralSeqNode(LOCK_PREFIX); if (null == locked_path) { throw new Exception("zk error"); } // 取得临时顺序ZNode节点的序号(后十位的序列数字) locked_short_path = getShortPath(locked_path); // 获取等待的子节点列表 List<String> waiters = getWaiters(); // 判断能否加锁:是则直接返回:获取锁成功 if (checkLocked(waiters)) { return true; } // 获取自己在等待子节点中的位置 int index = Collections.binarySearch(waiters, locked_short_path); if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了 throw new Exception("节点没有找到: " + locked_short_path); } // 自己不是第一个等待锁的子节点:保存前一个节点用于监听,返回尝试加锁失败 prior_path = ZK_PATH + "/" + waiters.get(index - 1); return false; } /** * 判断能否加锁:判断自己是否第一个等待的子节点 * * @param waiters 排队列表 * @return 成功状态 */ private boolean checkLocked(List<String> waiters) { // 节点按照编号,升序排列 Collections.sort(waiters); // 如果是第一个等待的子节点,代表自己已经获得了锁 if (locked_short_path.equals(waiters.get(0))) { log.info("成功地获取分布式锁,节点为{}", locked_short_path); return true; } return false; } /** * 等待:监听前个节点的删除事件 * * @throws Exception 可能会有Zk异常、网络异常 */ private void await() throws Exception { if (null == prior_path) { throw new Exception("prior_path error"); } final CountDownLatch latch = new CountDownLatch(1); // 监听方式一: Watcher 一次性订阅 Watcher w = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听到的变化watchedEvent = " + watchedEvent); log.info("[WatchedEvent]节点删除"); latch.countDown(); } }; // 开始监听 client.getData().usingWatcher(w).forPath(prior_path); // 监听方式二:TreeCache 订阅 /* TreeCache treeCache = new TreeCache(client, prior_path); TreeCacheListener listener = new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if (data != null) { switch (event.getType()) { case NODE_REMOVED: log.debug("[TreeCache]节点删除, path={}, data={}", data.getPath(), data.getData()); latch.countDown(); break; default: break; } } } }; //开始监听 treeCache.getListenable().addListener(listener); treeCache.start(); */ // 限时等待,最长等待时间为3s latch.await(WAIT_TIME, TimeUnit.SECONDS); } /*************************************** 释放锁 ***************************************/ /** * 释放锁:减少重入计数器 * 1、如果计数器!=0,直接返回,表示成功地释放了一次 * 2、如果计数器==0,移除Watchers监听器,并且删除创建的 ZNode 临时节点 * * @return 是否成功释放锁 */ @Override public boolean unlock() { // 只有加锁的线程能够解锁 if (!thread.equals(Thread.currentThread())) { return false; } // 减小重入计数器 int newLockCount = lockCount.decrementAndGet(); // 计数不能小于0 if (newLockCount < 0) { throw new IllegalMonitorStateException("计数不对: " + locked_path); } // 如果计数器不为0,直接返回 if (newLockCount != 0) { return true; } try { // 如果计数器为0,删除临时节点 if (ZKclient.instance.isNodeExist(locked_path)) { client.delete().forPath(locked_path); } } catch (Exception e) { e.printStackTrace(); return false; } return true; } }
- 以上实现的 ZLock 仅仅对应一个 ZNode 路径,也就是说,仅仅代表一把锁。如果需要代表不同的 ZNode 路径,还需要进行简单改造。
测试:
package com.crazymakercircle.zk.distributedLock;
import com.crazymakercircle.cocurrent.FutureTaskScheduler;
import com.crazymakercircle.zk.ZKclient;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.junit.Test;
/**
* 测试分布式锁
**/
@Slf4j
public class ZkLockTester {
// 变量:需要锁来保护的公共资源
int count = 0;
/**
* 测试自定义分布式锁
*
* @throws InterruptedException异常
*/
@Test
public void testLock() throws InterruptedException {
// 10个并发任务
for (int i = 0; i< 10; i++) {
FutureTaskScheduler.add(() -> {
// 创建锁
ZkLock lock = new ZkLock();
lock.lock();
// 每条线程执行10次累加
for (int j = 0; j < 10; j++) {
// 公共的资源变量累加
count++;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("count = " + count);
// 释放锁
lock.unlock();
});
}
Thread.sleep(Integer.MAX_VALUE);
}
}
curator 提供的分布式锁
实际开发过程中,可以 curator 工具包封装的API(curator-recipes)帮助我们实现分布式锁:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>x.x.x</version>
</dependency>
curator 的几种锁方案:
- InterProcessMutex:分布式可重入排它锁
@Test public void sharedReentrantLock() throws Exception { // 创建共享锁 final InterProcessLock lock = new InterProcessMutex(client, lockPath); final InterProcessLock lock2 = new InterProcessMutex(client2, lockPath); final CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { lock.acquire(); System.out.println("1获取锁==============="); // 测试锁重入 lock.acquire(); System.out.println("1再次获取锁==============="); Thread.sleep(5 * 1000); lock.release(); System.out.println("1释放锁==============="); lock.release(); System.out.println("1再次释放锁==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { lock2.acquire(); System.out.println("2获取锁==============="); // 测试锁重入 lock2.acquire(); System.out.println("2再次获取锁==============="); Thread.sleep(5 * 1000); lock2.release(); System.out.println("2释放锁==============="); lock2.release(); System.out.println("2再次释放锁==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); countDownLatch.await(); }
1获取锁=============== 1再次获取锁=============== 1释放锁=============== 1再次释放锁=============== 2获取锁=============== 2再次获取锁=============== 2释放锁=============== 2再次释放锁===============
- InterProcessSemaphoreMutex:分布式排它锁
- (注意,不可重入的锁很容易在一些情况导致死锁)
@Test public void sharedLock() throws Exception { // 创建共享锁 final InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath); final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { lock.acquire(); System.out.println("1获取锁==============="); // 锁不可重入 Thread.sleep(5 * 1000); lock.release(); System.out.println("1释放锁==============="); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { lock2.acquire(); System.out.println("2获取锁==============="); Thread.sleep(5 * 1000); lock2.release(); System.out.println("2释放锁==============="); } catch (Exception e) { e.printStackTrace(); } } }).start(); Thread.sleep(20 * 1000); }
1获取锁=============== 1释放锁=============== 2获取锁=============== 2释放锁===============
- InterProcessReadWriteLock:分布式读写锁
- 读锁和读锁不互斥,只要有写锁就互斥。
@Test public void sharedReentrantReadWriteLock() throws Exception { // 创建共享可重入读写锁 final InterProcessReadWriteLock locl1 = new InterProcessReadWriteLock(client, lockPath); final InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath); // 获取读写锁(使用 InterProcessMutex 实现, 所以是可以重入的) final InterProcessLock readLock = locl1.readLock(); final InterProcessLock readLockw = lock2.readLock(); final CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { readLock.acquire(); System.out.println("1获取读锁==============="); // 测试锁重入 readLock.acquire(); System.out.println("1再次获取读锁==============="); Thread.sleep(5 * 1000); readLock.release(); System.out.println("1释放读锁==============="); readLock.release(); System.out.println("1再次释放读锁==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { Thread.sleep(500); readLockw.acquire(); System.out.println("2获取读锁==============="); // 测试锁重入 readLockw.acquire(); System.out.println("2再次获取读锁=============="); Thread.sleep(5 * 1000); readLockw.release(); System.out.println("2释放读锁==============="); readLockw.release(); System.out.println("2再次释放读锁==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); countDownLatch.await(); }
1获取读锁=============== 1再次获取读锁=============== 2获取读锁=============== 2再次获取读锁============== 1释放读锁=============== 2释放读锁=============== 1再次释放读锁=============== 2再次释放读锁===============
- (如上所示,读锁不是互斥的)
- InterProcessMultiLock:多重锁,将多个锁作为单个实体管理的容器
@Test public void multiLock() throws Exception { // 可重入锁 final InterProcessLock interProcessLock1 = new InterProcessMutex(client, lockPath); // 不可重入锁 final InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client2, lockPath); // 创建多重锁对象 final InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2)); final CountDownLatch countDownLatch = new CountDownLatch(1); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { // 获取参数集合中的所有锁 lock.acquire(); // 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入 System.out.println(lock.acquire(2, TimeUnit.SECONDS)); // interProcessLock1 是可重入锁, 所以可以继续获取锁 System.out.println(interProcessLock1.acquire(2, TimeUnit.SECONDS)); // interProcessLock2 是不可重入锁, 所以获取锁失败 System.out.println(interProcessLock2.acquire(2, TimeUnit.SECONDS)); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); countDownLatch.await(); }
false true false
curator 还提供了信号量:
- InterProcessSemaphoreV2:
InterProcessSemaphoreV2 实现了一个跨 jvm 的信号量。 主要工作原理是: acquire时创建一个临时顺序节点,如果创建成功且临时节点数小于等于 maxLeases 则说明信号量获取成功,否则 wait 等待,等待目录发生变化或计数改变时唤醒。 整个 acquire 过程持 InterProcessMutex 而互斥,因为其中带有等待,性能不会太高。实际使用时等待时间最好不要太长。
@Test public void semaphore() throws Exception { // 创建一个信号量, Curator 以公平锁的方式进行实现 final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, 1); final CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { // 获取一个许可 Lease lease = semaphore.acquire(); logger.info("1获取读信号量==============="); Thread.sleep(5 * 1000); semaphore.returnLease(lease); logger.info("1释放读信号量==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { // 获取一个许可 Lease lease = semaphore.acquire(); logger.info("2获取读信号量==============="); Thread.sleep(5 * 1000); semaphore.returnLease(lease); logger.info("2释放读信号量==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); countDownLatch.await(); }
2获取读信号量=============== 2释放读信号量=============== 1获取读信号量=============== 1释放读信号量===============
- 当然可以一次获取多个信号量:
@Test public void semaphore() throws Exception { // 创建一个信号量, Curator 以公平锁的方式进行实现 final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, 3); final CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { // 获取2个许可 Collection<Lease> acquire = semaphore.acquire(2); logger.info("1获取读信号量==============="); Thread.sleep(5 * 1000); semaphore.returnAll(acquire); logger.info("1释放读信号量==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { // 获取1个许可 Collection<Lease> acquire = semaphore.acquire(1); logger.info("2获取读信号量==============="); Thread.sleep(5 * 1000); semaphore.returnAll(acquire); logger.info("2释放读信号量==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); countDownLatch.await(); }
2获取读信号量=============== 1获取读信号量=============== 2释放读信号量=============== 1释放读信号量===============
示例
下面例子模拟 50 个线程使用重入排它锁 InterProcessMutex 同时争抢锁:
public class InterprocessLock { private static CuratorFramework getZkClient() { String zkServerAddress = "192.168.3.39:2181"; ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000); CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectString(zkServerAddress) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); zkClient.start(); return zkClient; } public static void main(String[] args) { CuratorFramework zkClient = getZkClient(); String lockPath = "/lock"; InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath); //模拟50个线程抢锁 for (int i = 0; i < 50; i++) { new Thread(new TestThread(i, lock)).start(); } } static class TestThread implements Runnable { private Integer threadFlag; private InterProcessMutex lock; public TestThread(Integer threadFlag, InterProcessMutex lock) { this.threadFlag = threadFlag; this.lock = lock; } @Override public void run() { try { lock.acquire(); System.out.println("第"+threadFlag+"线程获取到了锁"); //等到1秒后释放锁 Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); }finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
- 控制台每间隔一秒钟输出一条记录:
总结
ZooKeeper 分布式锁:
- 优点:ZooKeeper 分布式锁(如 InterProcessMutex),能有效地解决分布式问题,不可重入问题,使用起来也较为简单。
- 缺点:ZooKeeper 实现的分布式锁,性能并不太高(因为每次在创建锁和释放锁的过程中,都要动态创建、销毁暂时节点来实现锁功能)。
- 【ZooKeeper 中创建和删除节点只能通过 Leader(主)服务器来执行,然后 Leader 服务器还需要将数据同步到所有的Follower(从)服务器上】
目前分布式锁,比较成熟、主流的方案有两种:
- 基于 Redis 的分布式锁。适用于并发量很大、性能要求很高而可靠性问题可以通过其他方案去弥补的场景。
- 基于 ZooKeeper 的分布式锁。适用于高可靠(高可用),而并发量不是太高的场景。
总之,在高性能、高并发的应用场景下,不建议使用 ZooKeeper 的分布式锁。而由于 ZooKeeper 的高可用性,因此在并发量不是太高的应用场景中,还是推荐使用 ZooKeeper 的分布式锁。