【精选】springBoot+Lua+redis 实现限量秒杀抢购模块 您所在的位置:网站首页 抢购脚本实现什么功能 【精选】springBoot+Lua+redis 实现限量秒杀抢购模块

【精选】springBoot+Lua+redis 实现限量秒杀抢购模块

2023-10-26 17:58| 来源: 网络整理| 查看: 265

springBoot+Lua+redis 实现限量秒杀抢购模块 | 惊鸿

基于Lua简单实现秒杀场景_小熙的博客-CSDN博客

【一】做一个秒杀系统【高并发减库存】

秒杀抢购可以说是在分布式环境下一个非常经典的案例。和普通的电商流程不同,秒杀有如下特点:

(1)低廉价格 (2)大幅推广 (3)瞬时售空 (4)一般是定时上架 (5)时间短、瞬时并发量高

里边有很多痛点比如:

1.高并发: 时间极短、 瞬间用户量大,而且用户会在开始前不断刷新页面,还会积累一大堆重复请求的问题,一瞬间的高QPS把系统或数据库直接打死,响应失败,导致与这个系统耦合的系统也GG,一挂挂一片。 2.链接暴露: 通过技术手段获取秒杀的url,然后在开始前通过这个url传要传递的参数来进行购买操作。 3.超卖: 你只有一百件商品,由于是高并发的问题,一起拿到了最后一件商品的信息,都认为还有,全卖出去了,最终卖了100+件,仓库里根本没这么多货。 4.恶意请求: 因为秒杀的价格比较低,有人会用脚本来秒杀,全给一个人买走了,他再转卖,或者同时以脚本操作多个账号一起秒杀。(就是常见的黄牛党)。

本文将使用jmeter工具模仿瞬时并发,基于redis来实现一个秒杀功能。没有实现前端页面。

一、系统分析

假设有一个需求:在某天的12点,开启秒杀,限量100件商品,每个用户会有唯一ID。

首先分析下, 秒杀系统为秒杀而设计,不同于一般的网购行为,参与秒杀活动的用户更关心的是如何能快速刷新商品页面,在秒杀开始的时候抢先进入下单页面并,而不是商品详情等用户体验细节,因此秒杀系统的页面设计应尽可能简单。

前端思路:

展示秒杀商品的页面, 页面上有一个秒杀活动开始的倒计时, 在准备阶段内用户会陆续打开这个秒杀的页面, 并且可能不停的刷新页面。

这里需要考虑两个问题:

第一个:资源静态化

秒杀页面的展示我们知道一个html页面还是比较大的,即使做了压缩,http头和内容的大小也可能高达数十K,加上其他的css, js,图片等资源,如果同时有几千万人参与一个商品的抢购,一般机房带宽也就只有1G~10G,网络带宽就极有可能成为瓶颈,所以这个页面上各类静态资源首先应分开存放,然后放到cdn节点上分散压力,由于CDN节点遍布全国各地,能缓冲掉绝大部分的压力。

第二个:时间同步

倒计时出于性能原因这个一般由js调用客户端本地时间,就有可能出现客户端时钟与服务器时钟不一致,另外服务器之间也是有可能出现时钟不一致。 客户端与服务器时钟不一致可以采用客户端定时和服务器同步时间,用于同步时间的接口由于不涉及到后端逻辑,只需要将当前web服务器的时间发送给客户端就可以了,这个接口可以只返回一小段json格式的数据,而且可以优化一下减少不必要cookie和其他http头的信息,所以数据量不会很大,一般来说网络不会成为瓶颈

第三个:操作控制

(1)产品层面,用户点击“购买”或者“下单”后,按钮置灰,禁止用户重复提交请求; (2)js层面,限制用户在x秒之内只能提交一次请求;

前端层的请求拦截,只能拦住小白用户(不过这也是98%的用户了),有点基础的程序员根本不会吃这一套,写个循环直接调用的http请求,怎么办?

(1)同一个uid,限制访问频度,做页面缓存,x秒内到达站点层的请求,均返回同一页面 (2)同一个item的查询,例如手机车次,做页面缓存,x秒内到达站点层的请求,均返回同一页面

如此限流,又有99%的流量会被拦截在站点层。

至此前端可以为服务端减轻很大的压力。接下来看后端设计思路:

后端思路:

针对如下几点做出解决方案:

提前知道了url:

可以对url进行加密如 使用随机字符串进行MD5加密,活动中点击秒杀按钮才返回url,和MD5加密字符拼接成完整url:

1

#PostMapping("/kill/{userid}/{md5}/url")

可以在服务网关层加过滤条件如After:

1 2 3 4 5 6 7 8 9

routes:   - id: 1                 # ...   - id: 2     uri: lb://kill-server       predicates:       - Path=/kill/exec       - After=2019-12-01T12:00:00.00+08:00[Asia/Shanghai]  # 在2019-12-01 12:00:00.000之后才可以访问

高并发:

nginx做负载均衡(一个tomcat可能只能抗住几百的的并发,nginx还可以对一些恶意ip做限制) 写请求,可以将商品总数做到缓存或队列中,每次只透过有限的写请求去数据层,如果均成功再放下一批,如果库存不够则请求全部返回“已售完” 读请求,cache集群来抗。 提前把数据库里的东西提前加载到redis来 。

超卖问题:

悲观锁、乐观锁、redis均可以解决。但是我个人更加倾向于使用redis解决,在大并发环境下对数据库的操作一定要慎之又慎,能cache的就不要访问库。

redis是单线程串行执行的 利用redis的单线程预减库存。比如商品有100件。那么我在redis存储一个k,v。如count:100。 每一个用户线程进来,count减1,等减到0的时候,全部拒绝剩下的请求。所以一定不会出现超卖的现象

恶意请求:

联合主键索引或者Redis Set数据结构均可解决。还是上面说的,更倾向于redis。

联合主键索引 :把用户id和商品id作为联合主键索引存储到数据库中,下次如果再想插入一样用户id和商品id的行,是会报错的。(主键索引的唯一性),事务中出现错误,支付操作自然也失败了。

Redis Set数据结构:要知道,set数据结构是不需重复的,我们可以巧妙地使用这点,建立一个的set结构,点击抢购就对此userID进行add。每次点击前在判断下是否存在次ID即可。

还有诸如MQ削峰解耦、数据库读写分离等一些列应对并发操作的手段在此不一一列举。

二、实现细节

新建一个服务,此处限定开始时间可以直接使用网关After配置,见上面代码。

因为需要整合lua,所以配置DefaultRedisScriptConfig类, 在应用上下文中配置好,避免在执行的时候重复创建脚本的SHA降低性能。

1 2 3 4 5 6 7 8 9 10 11 12

@Configuration public class DefaultRedisScriptConfig {     @Bean     public DefaultRedisScript redisScript() {         DefaultRedisScript redisScript = new DefaultRedisScript();         redisScript.setResultType(String.class);         redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/seckill.wlua")));         return redisScript;     }     }

seckill.wlua文件位置:

seckill.wlua 文件具体代码如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18

local userid=KEYS[1]; local date=KEYS[2]; local kckey='Seckill:'..date..":kc"; local userKey='Seckill:'..date..":user"; local userExists=redis.call("sismember", userKey, userid); if tonumber(userExists)==1 then     return 2; end local num = redis.call("get", kckey); if tonumber(num)         RedisTemplate template = new RedisTemplate();             template.setConnectionFactory(lettuceConnectionFactory);         RedisSerializer stringSerializer = new StringRedisSerializer();         jackson2JsonRedisSerializer.setObjectMapper(om);         template.setKeySerializer(stringSerializer);         template.setValueSerializer(new GenericJackson2JsonRedisSerializer());         template.setHashKeySerializer(stringSerializer);         template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());         template.afterPropertiesSet();         return template;     }

Controller关键方法:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25

/**      * 秒杀入口      * @param map 传入参数      * @return String      */     @PostMapping("/bf")     public String bf( @RequestBody Map map ){         String key = "bf:count";         Long increment = redisTemplate.opsForValue().increment(key, 1);         log.info(Thread.currentThread().getName() + " - 访问计数:" + increment );         JSONObject jo = new JSONObject();         try {             String res = seckillService.doSeckill(map.get("id"));             jo.put("message", res);             jo.put("success", true);         } catch (Exception e) {             e.printStackTrace();             jo.put("message", "系统繁忙");             jo.put("success", false);         }         return jo.toJSONString();     }

Service层关键方法:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29

    @Resource     private RedisTemplate redisTemplate;     @Resource     private DefaultRedisScript redisScript;     public String doSeckill( String userID ){         String date = DateUtil.format(new Date(),"yyyyMMdd");             Object res = redisTemplate.execute((RedisConnection connection) -; connection.eval(                 redisScript.getScriptAsString().getBytes(),                 ReturnType.INTEGER,                 2,                 userID.getBytes(),                 date.getBytes()         ));         long a = res == null ? -1 : (Long) res;         String message = "";         if( a == 1 ){             message = "抢购成功";         }else if( a== 0 ){             message = "商品已经被抢完啦";         }else {             message = "每个账号限购一次!请勿重复操作。";         }         log.info(message);         return message;     }

在单元测试保证每个方法都正常后,启动项目。

Jmiter配置 3秒2000线程 打过去。

这里post的请求参数定义一个随机uuid。

可以看到控制台疯狂输出:

注意:101是因为我提前在redis中放了一个测试id,所以第100次访问显示了101。

在计数超过100的时候都会显示抢完,此时在使用RedisDesktop观察数据:

用来存放已抢购userID的set的size为100。即100个用户。

再看商品,为0。抢完了。

验证是否只能抢一次,在已抢购userID的set中,随便复制一个id。

使用postMan来再次访问秒杀接口:

可以看到提示请勿重复操作。即实现了限制重复购买功能。

至此基于Boot+redis+lua实现的秒杀模块全部实现。 希望能对遇到相同问题的朋友有所帮助。

以上。

题记:秒杀,主要是在活动开始的一瞬间,服务器将要承受高并发,并且能够快速响应,反馈给用户是否抢购成功。所以必须缩短秒杀接口的处理时间集竟可能的减少处理逻辑,以提高TPS。

看过很多类似的博客,很多都是使用消息队列进行流量削峰,由于消息队列的异步性,并不能实时返回接口,然后前端轮询抢购结果。 也有的是在Nginx进行请求随机拦截,直接减少一大半的用户,也可以有效的减少并发量。 本文主要是结合Redis的Lua脚本实现,因为lua在redis中的原子性,和redis的单线程特性,可以有效防止超卖的现象发生,也可以充分利用Redis的高并发特性。

对于一般的秒杀活动,一般需要三个参数就够了,活动编码,抢购会员号,抢购数量,对于活动数量在活动开始前进行预热写入Redis即可。 java中调用redis的eval方法只需要传递三个参数和lua脚本即可

示例代码:

@RestController("/**") public class SeckillController{

@ResponseBody @GetMapping("/****") public String process(String activeId, String memberNum, Integer buyNum){                      // 可以将脚本内置到Redis中         String script = "............";         Object obj = jedis.eval(script, 3, activeId, memberNum, buyNum);         // TODO 解析返回值     } } 然后根据返回值判断即可。

lua示例脚本(定义为字符串即可):

## -1-库存不足  0-抢购数量大于可抢库存  1-抢购成功   # 通过活动编码获取活动库存数量 local num = redis.call('get', KEYS[1]) if num == 0 then     return -1 elseif num < KEYS[3] then     return 0 else     # 扣减活动库存     redis.call('decrby', KEYS[1], KEYS[3])     # 记录会员购买数量     redis.call('set', KEYS[2],  KEYS[3])     return 1 end 活动结束后可以通过定时任务进行消费,正式扣库存等动作。

应对秒杀场景,有很多处理方法和应对的架构,但数据库层最好隔离开,保护好数据库

全篇仅个人想法,如果存在不足之处,请多多指正,只有多说说自己的想法,得到大家的评点,才能从中有所收获,一步步提高自己。 ————————————————  

其实思路是跟思路一一样的,只是实现方式不一样,思路一是通过lua来实现原子性操作,思路二是通过锁的方式来实现原子性,就是说同一时间,只允许一个线程进入锁区域来操作库存记录。

由于高并发场景多数是集群环境,那么传统的sychronized关键字是不起作用的,这里就直接上分布式锁。

分布式锁的实现方式一般是通过redis或者zookeeper来实现,这里就使用redis来实现。

那么redis如何实现加锁呢?

加锁实际上就是在redis中,给Key键设置一个值,为避免死锁,并给定一个过期时间。

SET lock_key random_value NX PX 5000

这个random_value要求是唯一的,因为等到解除锁时,需要通过该值来判断是否删除,如果不是唯一,有可能存在误删除锁的情况。

解锁的过程就是将Key键删除。但也不能乱删。通过random_value来删除。

为了保证解锁操作的原子性,我们用LUA脚本完成这一操作。先判断当前锁的字符串是否与传入的值相等,是的话就删除Key,解锁成功。

lua代码如下:

1if redis.call('get',KEYS[1]) == ARGV[1] then 2 return redis.call('del',KEYS[1]) 3else 4 return 0 5end 6

 知道以上原理后,就可以通过java代码来实现了。

环境同思路一

先创建一个RedisLock工具类。这里就直接贴代码了,主要就是2个方法,加锁、解锁:

1package com.ming.seckillredistest.util; 2 3import org.springframework.beans.factory.annotation.Autowired; 4import org.springframework.data.redis.connection.RedisConnection; 5import org.springframework.data.redis.connection.ReturnType; 6import org.springframework.data.redis.core.RedisTemplate; 7import org.springframework.stereotype.Component; 8 9import java.util.concurrent.TimeUnit; 10 11@Component 12public class RedisLock { 13 private final long TIME_OUT=5000;//5秒钟后还未获取到锁设定为超时。 14 //释放锁的脚本, 15 private final String LUA_SCRIPT = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; 16 17 @Autowired 18 private RedisTemplate redisTemplate; 19 20 /** 21 * 加锁方法 22 * @param key 23 * @param value 24 * @return 25 */ 26 public boolean lock(String key,String value){ 27 //开始获取锁的时间 28 long startTime=System.currentTimeMillis(); 29 //如果获取不到锁一直阻塞,并尝试继续获取锁。 30 while (true){ 31 boolean result=redisTemplate.opsForValue().setIfAbsent(key,value,3000, TimeUnit.MILLISECONDS); 32 if(result){ 33 return true; 34 } 35 long tempTime=System.currentTimeMillis()-startTime; 36 if(tempTime>TIME_OUT){ 37 return false; 38 } 39 try { 40 Thread.sleep(5); 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 } 44 } 45 } 46 public boolean unlock(String key,String value){ 47 String tempValue="\""+value+"\"";//由于config中对序列号的配置,这块需要做处理。 48 Object result = redisTemplate.execute((RedisConnection connection) -> connection.eval( 49 LUA_SCRIPT.getBytes(), 50 ReturnType.INTEGER, 51 1, 52 key.getBytes(), 53 tempValue.getBytes())); 54 55 if(result.toString().equals("0")){ 56 return false; 57 } 58 return true; 59 } 60} 61 62

锁完成了,就准备线程任务代码:

1 private class MyRunnable1 implements Runnable{ 2 @Override 3 public void run() { 4 String lockKey="202002172031"; 5 String lockValue= "202002172031001"; 6 boolean lockResult = redisLock.lock(lockKey, lockValue); 7 if(lockResult){ 8 Object value=redisTemplate.opsForValue().get("20200217"); 9 if(Integer.parseInt(value.toString())>0){ 10 redisTemplate.opsForValue().decrement("20200217"); 11 System.out.println("秒杀成功,生成订单"); 12 }else { 13 System.out.println("秒杀失败"); 14 } 15 boolean unlock = redisLock.unlock(lockKey, lockValue); 16 if(!unlock){ 17 System.out.println("释放锁失败!"); 18 } 19 }else { 20 System.out.println("锁超时,秒杀失败"); 21 } 22 } 23 } 24

最后,测试类调用:

1 @Test 2 public void test6(){ 3 ExecutorService executorService = Executors.newCachedThreadPool(); 4 //生成1000个线程来执行任务。 5 for(int i=0;i



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有