本文正在参与「金石方案 . 瓜分6万现金大奖」
锁是为了在多线程的场景中保证数据安全而添加的一种手法,Java中常用的有CountdownLatch,ReentrantLock等单运用中的锁,在现在处处都是分布式的场景需求下就不能满意了,所以就出现了分布式锁。
不同的物理节点有各自的线程,可是他们会拜访同一个资源,可是不允许同一时刻拜访,所以就有了分布式锁
例如
咱们能够经过数据库编写sql来完成分布式锁,可是这种在高并发下功能会出问题,
还有常用的redis完成分布式锁,这个是咱们用的最多的一种高功能高并发的完成方式。
今日介绍的一种是经过中间件zookeeper完成分布式锁,也是支撑高功能高并发的。
想想完成一个锁想到哪些要害点 ?
争抢锁:只要一个人能够获取锁
取得锁的节点挂了,暂时节点 会主动开释
取得锁的人,能够主动开释锁
锁被开释,删除 其他人怎样知道
主动轮训,监听心跳:存在延迟,节点多的状况话压力很大。
依据zk的节点看看是否满意锁
创立耐久化节点
zk是创立节点保存数据的,相同节点只允许创立一次,所以咱们能够经过成功创立节点完成获取锁的状况。
要害代码
zk.create("/lock", threadId.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
结果:
如图只要一个线程获取锁,其他线程都出现异常:NodeExists for /lock ,能够保证同一时刻只要一个线程获取锁。然后取得锁的线程逻辑履行结束后应该删除锁。
存在的问题:假如线程溃散了,锁就无法开释了,最终导致死锁
耐久化节点不行,耐久化次序节点自然也不行了
创立暂时节点
暂时节点:在客户端断开衔接的时分就会主动删除
要害代码:
zk.create("/lock", threadId.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
CreateMode.EPHEMERAL
意思便是创立暂时节点 ,也便是当线程溃散,无法主动开释锁的时分,会主动删除,防止死锁。
可是
还存在的问题:没有获取锁的线程会出现错误,则需要不断重试,经过死循环直到获取锁。
暂时节点+watch
watch:设置监听回调,当监听的节点或其子节点有变更,则会告诉客户端,可参考上一篇
要害代码
zk.create("/lock", threadId.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk.getChildren("/testLock",true, this,"ajisun");
getChildren
获取父节点/testlock
下的子节点并设置监听。当子节点/lock
被删除 就会触发回调,再次创立节点。
经过watch 防止运用死循环设置堵塞,看似还不错哦。
可是
还还存在问题:一切客户端都去监听同一个父节点,当锁开释的时分,也会告诉一切的客户端,带来的压力仍是很大。
创立暂时次序节点+watch
暂时次序节点:每个线程都会创立一个暂时且有序的节点,相互不抵触
代码如下,十个线程模仿十个客户端
public static void main(String[] args) throws KeeperException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zk = null;
try {
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/testLock", 3000, new DefaultWatch().setCountDownLatch(countDownLatch));
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
CountDownLatch countDown = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
ZooKeeper finalZk = zk;
new Thread() {
@Override
public void run() {
String threadId = Thread.currentThread().getId() + "";
try {
finalZk.create("/lock", threadId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception e) {
e.printStackTrace();
}
countDown.countDown();
}
}.start();
}
try {
countDown.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
List<String> list = zk.getChildren("/", false);
list.forEach(s -> System.out.println(s));
}
CreateMode.EPHEMERAL_SEQUENTIAL
意思便是创立暂时次序节点。
输出节点如下
lock0000000110
lock0000000114
lock0000000113
lock0000000112
lock0000000111
lock0000000107
lock0000000106
lock0000000105
lock0000000109
lock0000000108
每次只要一个客户端能够加锁成功,假如一起有100个客户端,当其中一个开释锁后,告诉剩余99个客户端,然后99个客户端一起抢锁,其实只要一个会成功,剩余的98个仅仅陪跑的,做无用功,白白浪费系统资源。
既然每次只要一个会加锁成功,当一个客户端开释锁的时分,只告诉一个客户端不就能够了吗。
怎样做到呢?
便是用到暂时次序节点这个特色,不在监听父节点,而是监听前一个节点。
首要创立节点成功后,获取父节点下的一切子节点,因为各个节点是有次序,能够依照从小到大的次序排列后,然后判断自己的节点是不是最小的,假如是则获取锁,不是则监听前一个节点。
要害代码如下
public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
ZooKeeper zk;
String threadId;
CountDownLatch cc = new CountDownLatch(1);
String pathName;
// set/get省掉
public void tryLock() {
try {
// 创立暂时有序的锁
zk.create("/lock", threadId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "abc");
cc.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void unLock() {
try {
zk.delete(pathName, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
// 假如第一个锁开释了,只要第二个收到回调事情
// 假如是其他的挂了,对应的后一个也能收到告诉
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
zk.getChildren("/", false, this, "bcd");
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
}
}
// create call back
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (name != null) {
System.out.println(threadId + "create node:" + name);
pathName = name;
// 获取一切创立的目录,即参与锁争夺的线程
zk.getChildren("/", false, this, "bcd");
}
}
/**
* getChildren call back
* pathName= /lock00000000003
* children=[lock0000000002,lock0000000008,lock0000000005,lock0000000003]
* 进入这个回调之后 阐明已经创立节点成功,能够看的已经创立的一切节点
*/
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
Collections.sort(children);
// 所在位置
int i = children.indexOf(pathName.substring(1));
// 判断是不是第一个
if (i == 0) {
System.out.println(threadId + " first");
cc.countDown();
} else {
// 不是第一个则监控前一个是否存在,假如前一个删除了需要回调我这个session
zk.exists("/" + children.get(i - 1), this, this, "xyz");
}
}
/**
* @param rc
* @param path
* @param ctx
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
}
}
调用方:
public void lock() {
for (int i = 0; i < 10; i++) {
new Thread() {
@Override
public void run() {
WatchCallBack watchCallBack = new WatchCallBack();
watchCallBack.setZk(zk);
String threadId = Thread.currentThread().getId() + "";
watchCallBack.setThreadId(threadId);
// 抢锁
watchCallBack.tryLock();
//干活
System.out.println(threadId + " working......");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//开释锁
watchCallBack.unLock();
}
}.start();
}
}
总结总结
zk是经过暂时节点,防止死锁问题(session消失,节点消失,锁开释)
经过次序节点,完成阻塞功能(暂时次序的节点数据)。
经过watch,watch前一个节点,最小的取得锁,一旦最小的锁开释,zk只会给下一个节点回调。防止了抢锁带来的不必要的损耗和压力。
我是纪先生,用输出倒逼输入而继续学习,继续分享技能系列文章,以及全网值得保藏好文,欢迎关注大众号,做一个继续生长的技能人。