基于Redis+Lua实现分布式锁模拟秒杀扣减库存业务(非常详细,良心解析) 您所在的位置:网站首页 redis处理秒杀 基于Redis+Lua实现分布式锁模拟秒杀扣减库存业务(非常详细,良心解析)

基于Redis+Lua实现分布式锁模拟秒杀扣减库存业务(非常详细,良心解析)

2024-07-05 07:51| 来源: 网络整理| 查看: 265

最近和几个小伙伴聊了聊基于Redis的分布式锁实现秒杀扣减库存业务的一些技术细节,刚好最近钻研了一段时间,本篇内容通过1个详细的案例,把这个实现方案作个记录,当做自己对知识的总结积累,同时也欢迎广大开发者朋友一起交流,学习,大家可以留言讨论,原创写作不易,请勿喷,如果觉得有用,不要忘了关注点赞哦。

本案例我们通过以下6个部分来讲解基于Redis+Lua实现分布式锁的详细过程,案例背景是模拟秒杀扣减库存的经典业务:

1、什么是分布式锁 ?

要说起分布式锁,首先要提到与分布式锁相对应的是线程锁、进程锁。

线程锁:主要用来给方法、代码块加锁。当某个方法或代码块使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是不同线程之间对于同一JVM共享其内存实现的,比如synchronized是共享对象头,显示锁Lock是共享某个变量(state)。

进程锁:为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁。

分布式锁:当多个进程(多个应用程序)不在同一个系统中,比如微服务部署多节点,就要用分布式锁控制多个进程对资源的访问。

2、什么场景下需要分布式锁 ?

随着互联网的高速发展,一些电商系统承载的业务体量也在大大增加,那么企业为了应对巨大的业务体量,很多时候会用到微服务,单个服务会部署多个节点,以此来处理瞬间激增的用户请求,比如秒杀促销的时候,某个界面一下子有10000人同时抢购,那么这10000个人的请求会按照特定的负载均衡策略,把10000个用户请求路由到不同的多个节点去处理,大大减轻了单台应用处理业务的能力,提升了效率,提升了用户的体验度。那么这个时候就要用到分布式锁去满足技术方案。

3、用了分布式锁会带来什么好处 ?

接着上面的举例,如果没用到分布式锁,那么很可能出出现,用户1下单的代码过程中,还没等用户1处理完订单数据,用户2的请求进入到用户1下单的线程中了,这2个线程在同1个应用中执行,是共享的同1个JVM内存,那么可能会出现订单数据错乱,最终导致订单业务完全发生重大严重错误。但是如果采用了分布式锁,并且使用得当的情况下完全可以避免这个问题。这时就凸显分布式锁的重要作用了。

4、实现分布式锁可以采用哪些方案实现 ?

目前行业中分布式锁实现众所周知的主要有3种方案: 1.采用数据库的事务锁

2.采用Zookeeper框架

3.基于redis实现

目前主流的比较成熟并且比较大众化的方案就是基于redis实现,当然再组合上Lua,实现分布式锁实现过程会更简单,功能更强大。本案例笔者就以springboot+Jedis+redis+Lua通过具体的例子详细讲解分布式锁的实现过程和思路。

5、分布式锁需要满足的4个的必要条件 互斥性。在任意时刻,只有一个客户端线程能持有锁。不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也要保证后续其他客户端能加锁。具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。 6、代码实现

(1)、引入依赖

首先我们要通过Maven引入Jedis开源组件,在pom.xml文件加入下面的代码:

redis.clients jedis 2.9.0

(2)、正确加锁方式

/** * @author guobh * @date 2020年8月27日 上午11:10:50 * @version 1.0 * @since jdk1.8 * @description */ @Slf4j @Component public class RedisPool { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; // 锁的过期时间 private static int EXPIRE_TIME = 500; private static JedisPool pool;//jedis连接池对象 private static int maxTotal = 20;//最大连接数 private static int maxIdle = 10;//最大空闲连接数 private static int minIdle = 5;//最小空闲连接数 private static boolean testOnBorrow = true;//在取连接时测试连接的可用性 private static boolean testOnReturn = false;//再还连接时不测试连接的可用性 static { initPool();//初始化连接池 } public static Jedis getJedis(){ return pool.getResource(); } public static void close(Jedis jedis){ jedis.close(); } @Autowired public static JedisPool getPool(String host,int port) { JedisPoolConfig config = new JedisPoolConfig();//连接池配置类 config.setMaxTotal(maxTotal); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setTestOnBorrow(testOnBorrow); config.setTestOnReturn(testOnReturn); config.setBlockWhenExhausted(true); return new JedisPool(config, host, port); } private static void initPool(){ JedisPoolConfig config = new JedisPoolConfig();//连接池配置类 config.setMaxTotal(maxTotal); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setTestOnBorrow(testOnBorrow); config.setTestOnReturn(testOnReturn); config.setBlockWhenExhausted(true); pool = new JedisPool(config, "XXXXXXXX", 6379, 5000, "*******"); } //加锁方法 //加锁之后返回锁的持有者(锁的value使用唯一时间戳标志每个客户端,保证只有锁的持有者才可以释放锁) public static String lock(Jedis jedis, String key,Long waitEnd,String requestId) { try { // 1秒内数次加锁如果失败,则不断请求重新获取锁,超过1秒还没能加锁,就加锁失败(为了每个线程拥有公平的机会获取锁) while (System.currentTimeMillis() < waitEnd) {// 1秒类不断尝试加锁(加锁之后返回锁的持有者) String result = jedis.set(key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, EXPIRE_TIME); if (LOCK_SUCCESS.equals(result)) { return requestId; } } } catch (Exception ex) { log.error("lock error", ex); } return null; } }

(3)、加锁方法代码解读

我们加锁关键的1行代码:jedis.set(String key, String value, String nxxx, String expx, int time),这个set()方法一共有五个形参:

第一个为key,我们一般使用某1业务的唯一属性来当做作为key加锁,因为key是唯一的。

第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用时间戳,UUID等等生成。

第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;

第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。

第五个为time,与第四个参数相呼应,代表key的过期时间,单位是毫秒

      总的来说,执行上面的set()方法就只会导致两种结果:

     1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,返回锁的持有者客户端。

     2. 对于当前key已有锁存在,不做任何操作。

     心细的朋友就会发现,我们的加锁代码满足我们描述的三个条件。首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动释放锁(即key被删除),不会发生死锁。最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识(锁的持有者),那么在客户端在解锁的时候就可以进行校验是否是同一个客户端,同1个客户端只能释放自己的锁,而不能释放别的客户端的锁。由于我们只考虑Redis单机部署的场景,所以容错性我们暂不考虑。

(4)、错误的加锁代码解析

比较常见的错误示例就是使用jedis.setnx()和jedis.expire()这2个操作组合实现加锁,代码如下:

public static void tryLock1(Jedis jedis, String lockKey, String requestId, int expireTime) { Long result = jedis.setnx(lockKey, requestId); if (result == 1) { // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁 jedis.expire(lockKey, expireTime); } }

setnx()方法作用就是SET IF NOT EXIST,expire()方法就是给锁加一个过期时间。乍一看好像和前面的set()方法结果一样,然而由于这是两条Redis命令,不具有原子性,如果程序在执行完setnx()之后突然崩溃,导致锁没有设置过期时间。那么将会发生死锁。网上之所以有人这样实现,是因为低版本的jedis并不支持多参数的set()方法。

(5)、正确的释放锁

private static final Long RELEASE_SUCCESS = 1L; /** * 释放分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public static boolean unLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; }

可以看到,我们解锁只需要两行代码就搞定了!第一行代码,我们写了一个简单的Lua脚本代码,第二行代码,我们将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给Redis服务端执行。

那么这段Lua代码的功能是什么呢?其实很简单,首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?因为要确保上述多个操作是原子性的。简单来说,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个整体命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。是不是瞬间感觉高大上。

(6)、错误的释放锁代码解析

最常见的解锁代码就是直接使用jedis.del()方法删除锁,这种不先判断锁的拥有者而直接解锁的方式,会导致不管客户端有没有加锁,任何客户端都可以随时进行解锁,即使这把锁不是它的,或者即使没有加锁。

public static void wrongReleaseLock(Jedis jedis, String lockKey) { jedis.del(lockKey); }  

(7)、实战案例代码

public static void main(String[] args) { //使用固定线程数为4 的线程池处理并发请求 ExecutorService pool1 = new ThreadPoolExecutor(4, 10, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue(),Executors.defaultThreadFactory()); //10个人抢购编码为101的共计5个商品 for(int i=0;io.priority?-1:1; } public ThreadTask(String key,String userCode,Long num){ this.userCode = userCode; this.num = num; this.key = key; } private static String STOCK_KEY_PREFIX = "sku-"; @Override public void run() { //让线程阻塞,使后续任务进入缓存队列 System.out.println("当前系统线程:"+Thread.currentThread().getName()); Jedis jedis = RedisPool.getJedis(); Long waitEnd = System.currentTimeMillis() + WAIT_TIME; String bb = jedis.get("sku-101"); String value = RedisPool.lock(jedis,key,waitEnd,userCode);//加锁成功,获取锁的持有者 String lockKey = null; if(!StringUtils.isBlank(value) && value.equals(userCode)){//加锁成功后查询库存 try { //synchronized (key){ lockKey = key; Long stock = Long.valueOf(jedis.get(STOCK_KEY_PREFIX+lockKey)); if( stock >= num){ String script = "return redis.call('incrby', KEYS[1], ARGV[1])"; Object eval = jedis.eval(script,Lists.newArrayList(STOCK_KEY_PREFIX+lockKey), Lists.newArrayList(String.valueOf(0-num))); if(null != eval && Integer.valueOf(String.valueOf(eval)) >= 0){ System.out.println("用户【"+userCode+"】秒杀商品【"+lockKey+"】成功,库存剩余:"+String.valueOf(eval)); } }else{ System.out.println("用户【"+userCode+"】秒杀商品【"+lockKey+"】失败,库存剩余:"+stock.toString()); } Thread.sleep(100);//这里为了更真实的模拟多线程并发,这里线程获取到锁后,线程休眠100ms //} } catch (Exception e) { // TODO: handle exception } finally { System.out.println("用户【"+userCode+"】秒杀商品【"+lockKey+"】的线程释放锁"); RedisPool.unLock(lockKey,userCode);//处理完扣减库存业务释放锁,把抢购这件商品的机会留给其余用户 } }else{ System.out.println("当前用户【"+userCode+"】抢购商品【"+key+"】的线程加锁失败,未抢购到,请再试"); } } } 执行上面main方法,无论执行几次,打印出如下日志,可见都不会出现超卖现象,并且加锁和加锁的都是同1个客户端,每个用户加锁并且只能释放自己的锁。 当前系统线程:pool-1-thread-1 当前系统线程:pool-1-thread-3 当前系统线程:pool-1-thread-4 当前系统线程:pool-1-thread-2 用户【20】秒杀商品【101】成功,库存剩余:4 用户【20】秒杀商品【101】的线程释放锁 当前系统线程:pool-1-thread-2 用户【10】秒杀商品【101】成功,库存剩余:3 用户【10】秒杀商品【101】的线程释放锁 当前系统线程:pool-1-thread-1 用户【50】秒杀商品【101】成功,库存剩余:2 用户【50】秒杀商品【101】的线程释放锁 当前系统线程:pool-1-thread-2 用户【40】秒杀商品【101】成功,库存剩余:1 用户【40】秒杀商品【101】的线程释放锁 当前系统线程:pool-1-thread-4 当前用户【30】抢购商品【101】的线程加锁失败,未抢购到,请再试 当前系统线程:pool-1-thread-3 用户【70】秒杀商品【101】成功,库存剩余:0 用户【70】秒杀商品【101】的线程释放锁 当前系统线程:pool-1-thread-1 用户【110】秒杀商品【101】失败,库存剩余:0 用户【110】秒杀商品【101】的线程释放锁 当前系统线程:pool-1-thread-2 用户【190】秒杀商品【101】失败,库存剩余:0 用户【190】秒杀商品【101】的线程释放锁 当前系统线程:pool-1-thread-4 用户【490】秒杀商品【101】失败,库存剩余:0 用户【490】秒杀商品【101】的线程释放锁 当前系统线程:pool-1-thread-2 当前用户【350】抢购商品【101】的线程加锁失败,未抢购到,请再试 当前系统线程:pool-1-thread-3 用户【480】秒杀商品【101】失败,库存剩余:0 用户【480】秒杀商品【101】的线程释放锁 当前用户【500】抢购商品【101】的线程加锁失败,未抢购到,请再试 当前系统线程:pool-1-thread-4 当前系统线程:pool-1-thread-1 用户【460】秒杀商品【101】失败,库存剩余:0 用户【460】秒杀商品【101】的线程释放锁 当前系统线程:pool-1-thread-3 用户【450】秒杀商品【101】失败,库存剩余:0 用户【450】秒杀商品【101】的线程释放锁 当前系统线程:pool-1-thread-4 用户【420】秒杀商品【101】失败,库存剩余:0 用户【420】秒杀商品【101】的线程释放锁 当前用户【470】抢购商品【101】的线程加锁失败,未抢购到,请再试 当前系统线程:pool-1-thread-2 当前系统线程:pool-1-thread-4 用户【410】秒杀商品【101】失败,库存剩余:0 用户【410】秒杀商品【101】的线程释放锁 当前用户【440】抢购商品【101】的线程加锁失败,未抢购到,请再试 当前系统线程:pool-1-thread-1 用户【430】秒杀商品【101】失败,库存剩余:0 用户【430】秒杀商品【101】的线程释放锁 用户【400】秒杀商品【101】失败,库存剩余:0 用户【400】秒杀商品【101】的线程释放锁 总结

本文主要介绍了使用Java代码正确实现Redis分布式锁,对于加锁和解锁也分别给出了两个比较经典的错误示例。其实想要通过Redis实现分布式锁并不难,只要保证能满足上面可靠性里的四个必要条件。本案例讲解到这来,欢迎大家交流

 

 



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有