ZooKeeper(五) 使用Zookeeper有序临时节点实现分布式锁 您所在的位置:网站首页 zookeeper分布式调度 ZooKeeper(五) 使用Zookeeper有序临时节点实现分布式锁

ZooKeeper(五) 使用Zookeeper有序临时节点实现分布式锁

2024-07-12 21:42| 来源: 网络整理| 查看: 265

当使用zookeeper实现分布式锁时,当有新的请求需要进入需要同步加锁代码时,在zookeeper加锁代码中会在加锁的共同父节点下创建一个新的临时有需节点。创建完成后会获取加锁父节点下所有子节点。判断自己是否为最小的一个如果是则获得锁,进行执行加锁代码,执行完毕后删除当前临时节点。如果判断自己不是最小的一个节点时,则获取比自己小的最近的那个节点,并对其设置被删除监听。并且当之前节点不存在时直接获取锁执行加锁代码逻辑。当之前一个节点被删除时,当前节点获得锁,执行加锁逻辑代码,当执行完毕后,当前节点进行对当前节点删除。

代码实现:

pom.xmlapplication.properties springboot 启动类 MainApplication.javazkLock 实现类web测试类WebController.java

一、项目依赖 pom.xml

4.0.0 com.xiaohui zklockdemo 1.0-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.1.7.RELEASE ch.qos.logback logback-classic 1.2.3 org.apache.zookeeper zookeeper 3.4.7 org.slf4j slf4j-log4j12 org.springframework.boot spring-boot-starter-web org.slf4j slf4j-log4j12

二、SpringBoot配置文件application.properties 

server.port=8888

三、springboot 启动类 MainApplication.java

package com.xiaohui; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class MainApplication { public static void main(String[] args) { SpringApplication.run(MainApplication.class,args); } }

四、zookeeper 锁实现类ZkLock.java(重点)

package com.xiaohui.bean; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.springframework.stereotype.Component; import java.util.List; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class ZkLock implements Lock { //zk 客户端对象 private ZooKeeper zk; //zk的目录结构 根节点 private String root ="/locks"; //锁名称 private String lockName; //当前线程创建的序列node private ThreadLocal nodeId = new ThreadLocal(); //用来同步等待zkClient连接到了服务端 private CountDownLatch connectedSignal = new CountDownLatch(1); //超时时间 private final static int sessionTimeout = 3000; private final static byte[] data = new byte[0]; public ZkLock(String config, String lockName) { this.lockName = lockName; try{ //创建连接 zk = new ZooKeeper(config, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState() == Event.KeeperState.SyncConnected){ //将count值减1 connectedSignal.countDown(); } } }); //[等待zk客户端链接上服务器后再继续执行]调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 connectedSignal.await(); Stat stat = zk.exists(root,false); if(null == stat){ //创建根节点 zk.create(root,data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//持久的节点 } }catch (Exception e){ e.printStackTrace(); throw new RuntimeException(); } } @Override public void lock() { try{ //创建临时子节点[当前节点路径] String curNodePath = zk.create(root+"/"+lockName, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println(Thread.currentThread().getName()+" "+curNodePath+ "--- created."); //取出所有子节点 List children = zk.getChildren(root, false); TreeSet set = new TreeSet(); for (String child : children) { System.out.println(curNodePath+"----------------------------child:"+child); set.add(root+"/"+child); } String smallNode = set.first(); if(curNodePath.equals(smallNode)){ //如果是最小的节点,则表示获取到锁 System.out.println(Thread.currentThread().getName()+" "+root+"/"+lockName+ "--- 获得锁."); this.nodeId.set(curNodePath); return; } //============此处如果有延时,上一个节点 在此刻被删除,自己最小缺无法实现监听============== String preNode = set.lower(curNodePath); CountDownLatch latch = new CountDownLatch(1); Stat stat = zk.exists(preNode,new LockWatcher(latch));//注册监听 // 判断比自己小一个数的节点是否存在,如果不存在则无需等待解锁,同时注册监听 if(stat != null){ System.out.println(Thread.currentThread().getName()+" "+curNodePath+ "等待"+preNode +" 解锁"); latch.await();//此处等待。。。 nodeId.set(curNodePath); latch = null; }else{ System.out.println(Thread.currentThread().getName()+" "+curNodePath+"上一个节点不存在?我直接获取的锁"); nodeId.set(curNodePath); latch = null; } }catch (Exception e){ e.printStackTrace(); throw new RuntimeException(e); } } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { try{ System.out.println(Thread.currentThread().getName()+" 解锁:"+nodeId.get()); if(null != nodeId){ zk.delete(nodeId.get(),-1); } nodeId.remove(); }catch (Exception e){ e.printStackTrace(); } } @Override public Condition newCondition() { return null; } //添加wacther 监听临时顺序节点的删除 class LockWatcher implements Watcher{ private CountDownLatch latch = null; public LockWatcher(CountDownLatch latch) { this.latch = latch; } @Override public void process(WatchedEvent event) { if(event.getType() == Event.EventType.NodeDeleted){ latch.countDown(); } } } }

在该实现锁中主要使用了 CountDownLatch 实现等待、继续执行锁的效果。

五、web测试类WebController.java

package com.xiaohui.web; import com.xiaohui.bean.Stock; import com.xiaohui.bean.ZkLock; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class WebController { ZkLock zkLock = new ZkLock("172.18.230.184:2181","stock_zk1"); @GetMapping("/startReduce") public String startReduce(){ new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); try { Thread.sleep(1000); }catch (Exception e){ e.printStackTrace(); } System.out.println("Stock.count = " +Stock.count); return "set ok!"; } }

访问后效果如下:我们可以看到只有一个线程执行成功,符合预期效果。

Thread-15 /locks/stock_zk10000000032--- created. Thread-16 /locks/stock_zk10000000033--- created. Thread-11 /locks/stock_zk10000000034--- created. Thread-17 /locks/stock_zk10000000035--- created. Thread-13 /locks/stock_zk10000000037--- created. Thread-10 /locks/stock_zk10000000038--- created. Thread-12 /locks/stock_zk10000000036--- created. Thread-14 /locks/stock_zk10000000039--- created. Thread-15 /locks/stock_zk1--- 获得锁. Thread-10 /locks/stock_zk10000000038等待/locks/stock_zk10000000037 解锁 Thread-17 /locks/stock_zk10000000035等待/locks/stock_zk10000000034 解锁 Thread-11 /locks/stock_zk10000000034等待/locks/stock_zk10000000033 解锁 Thread-16 /locks/stock_zk10000000033等待/locks/stock_zk10000000032 解锁 Thread-12 /locks/stock_zk10000000036等待/locks/stock_zk10000000035 解锁 Thread-13 /locks/stock_zk10000000037等待/locks/stock_zk10000000036 解锁 Thread-14 /locks/stock_zk10000000039等待/locks/stock_zk10000000038 解锁 Thread-15下单:成功 Thread-15 解锁:/locks/stock_zk10000000032 Thread-16下单:失败 Thread-16 解锁:/locks/stock_zk10000000033 Thread-11下单:失败 Thread-11 解锁:/locks/stock_zk10000000034 Thread-17下单:失败 Thread-17 解锁:/locks/stock_zk10000000035 Thread-12下单:失败 Thread-12 解锁:/locks/stock_zk10000000036 Thread-13下单:失败 Thread-13 解锁:/locks/stock_zk10000000037 Thread-10下单:失败 Thread-10 解锁:/locks/stock_zk10000000038 Thread-14下单:失败 Thread-14 解锁:/locks/stock_zk10000000039 Stock.count = 0

 



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有