高并发业务代码中一个事务中同时使用select和update的场景深度剖析 您所在的位置:网站首页 update更新select结果 高并发业务代码中一个事务中同时使用select和update的场景深度剖析

高并发业务代码中一个事务中同时使用select和update的场景深度剖析

2024-07-11 23:38| 来源: 网络整理| 查看: 265

1.秒杀代码(最low实现): //开启事务 @Transactional(rollbackFor = Exception.class) public BaseResponse startSeckillWithLock(Long id, Long userId) { try { return seckill(id, userId); } catch (Exception e) { throw e; } finally { } } private BaseResponse seckill(Long id, Long userId) { //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} Integer number = seckillMapper.getNumberById(id); if (number != null && number > 0) { log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number); //扣库存,sql为:UPDATE seckill SET number=number-1 WHERE id = #{id} seckillMapper.deductNumberById(id); //创建订单 SuccessKilledModel killed = new SuccessKilledModel(); killed.setSeckillId(id); killed.setUserId(userId); killed.setState((short)0); successKilledMapper.insert(killed); return BaseResponse.valueOfSuccess(); } else { return BaseResponse.valueOfError(10010, "库存不足"); } }

这种多个线程执行肯定会发生超卖的现象:然后第一种解决方案是加ReenTranlock程序锁如下:

2.秒杀加程序锁实现(会出现超卖一个的现象) @Override //开启事务 @Transactional(rollbackFor = Exception.class) public BaseResponse startSeckillWithLock(Long id, Long userId) { lock.lock(); try { return seckill(id, userId); } catch (Exception e) { throw e; } finally { lock.unlock(); } } private BaseResponse seckill(Long id, Long userId) { //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} Integer number = seckillMapper.getNumberById(id); if (number != null && number > 0) { log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number); //扣库存,sql为:UPDATE seckill SET number=number-1 WHERE id = #{id} seckillMapper.deductNumberById(id); //创建订单 SuccessKilledModel killed = new SuccessKilledModel(); killed.setSeckillId(id); killed.setUserId(userId); killed.setState((short)0); successKilledMapper.insert(killed); return BaseResponse.valueOfSuccess(); } else { return BaseResponse.valueOfError(10010, "库存不足"); } }

启动一千个线程,库存为100,最终结果是个很奇妙的现象: 在这里插入图片描述

库存数居然是1,然后在程序里面打的日志是这样的:

在这里插入图片描述

然后就有个疑问:加了锁居然还是无法保证数据的原子性?

后面查阅了mysql的默认隔离机制可重复读底层实现以及@Transaction注解的底层实现,原来根本原因是这样的:

我们代码里面,lock.unlock的操作是写在方法里面的finally语句的, 这时候可能我们解锁了,@Transaction标注的方法事务还没提交, 这就导致了另外一个线程执行select语句的时候读的是未更新前的数据, 至于后面的库存数为什么又是依次递减:那是因为事务已经提交了,undo log 里面的快照version已经更新了,后面读的都是前面更新了的快照, 那肯定就是依次递减啦,所以肯定会发生超卖一个的现象

mysql可重复读(RR)和读提交(RC)的隔离机制下,select语句用的是快照读,update,insert,delete默认使用当前读

3.mysql当前读和快照读底层

当前读:

select…lock in share mode (共享读锁)   select…for update(排它锁)   update , delete , insert(排它锁)

当前读, 读取的是最新版本, 并且对读取的记录加锁, 阻塞其他事务同时改动相同记录,避免出现安全问题。

例如,假设要update一条记录,但是另一个事务已经delete这条数据并且commit了,如果不加锁就会产生冲突。所以update的时候肯定要是当前读,得到最新的信息并且锁定相应的记录

快照读:

单纯的select操作,不包括上述 select … lock in share mode, select … for update

Read Committed隔离级别:每次select都生成一个快照读。

Read Repeatable隔离级别:开启事务后第一个select语句才是快照读的地方,而不是一开启事务就快照读

快照读的实现方式,undolog和多版本并发控制

下图右侧绿色的是数据:一行数据记录,主键ID是10,name=‘Jack’,age=10, 被update更新set为name= ‘Tom’,age=23。

事务会先使用“排他锁”锁定改行,将该行当前的值复制到undo log中,然后再真正地修改当前行的值,最后填写事务的DB_TRX_ID,使用回滚指针DB_ROLL_PTR指向undo log中修改前的行DB_ROW_ID

在这里插入图片描述

DB_TRX_ID: 6字节DB_TRX_ID字段,表示最后更新的事务id(update,delete,insert)。此外,删除在内部被视为更新,其中行中的特殊位被设置为将其标记为已软删除。

DB_ROLL_PTR: 7字节回滚指针,指向前一个版本的undolog记录,组成undo链表。如果更新了行,则撤消日志记录包含在更新行之前重建行内容所需的信息。 DB_ROW_ID: 6字节的DB_ROW_ID字段,包含一个随着新行插入而单调递增的行ID, 当由innodb自动产生聚集索引时,聚集索引会包括这个行ID的值,否则这个行ID不会出现在任何索引中。如果表中没有主键或合适的唯一索引, 也就是无法生成聚簇索引的时候, InnoDB会帮我们自动生成聚集索引, 聚簇索引会使用DB_ROW_ID的值来作为主键; 如果表中有主键或者合适的唯一索引, 那么聚簇索引中也就不会包含 DB_ROW_ID了 。

其它:insert undo log只在事务回滚时需要, 事务提交就可以删掉了。update undo log包括update 和 delete , 回滚和快照读 都需要

4.共享锁和排他锁(LOCK IN SHARE MODE和FOR UPDATE)

共享锁允许其他事务加共享锁读取,但是,不允许其他事务去做修改,或者加排它锁。而排它锁显得更加严格,不允许其他事务加共享锁或者排它锁,更加不允许其他事务修改加锁的行

5.解决方案1,select语句强制使用当前读(共享锁方式LOCK IN SHARE MODE) @Override //开启事务 @Transactional(rollbackFor = Exception.class) public BaseResponse startSeckillWithLock(Long id, Long userId) { lock.lock(); try { return seckill(id, userId); } catch (Exception e) { throw e; } finally { lock.unlock(); } } private BaseResponse seckill(Long id, Long userId) { //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE Integer number = seckillMapper.getNumberById(id); if (number != null && number > 0) { log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number); //扣库存 seckillMapper.deductNumberById(id); //创建订单 SuccessKilledModel killed = new SuccessKilledModel(); killed.setSeckillId(id); killed.setUserId(userId); killed.setState((short)0); successKilledMapper.insert(killed); return BaseResponse.valueOfSuccess(); } else { return BaseResponse.valueOfError(10010, "库存不足"); } }

这种方式的原理:

方法中加了程序锁ReeTranlock,就是在执行完所有代码前只允许一个线程去跑 (但这里也没有保证unlock执行前去提交事务),事务中select语句加上LOCK IN SHARE MODE代码这行数据加了共享锁,然后下面执行了update语句这时候又给语句 加上了排他锁,排它锁的功能就是保证了其他事务既不能读也不能写这行数据,所以下 个事务进来执行SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE是要等待的,这时候就保证了获取的数据是最新的数据

然后我们再测试一种死锁的现象,我们把程序锁去掉,程序如下:

@Override //开启事务 @Transactional(rollbackFor = Exception.class) public BaseResponse startSeckillWithLock(Long id, Long userId) { //lock.lock(); try { return seckill(id, userId); } catch (Exception e) { throw e; } finally { //lock.unlock(); } } private BaseResponse seckill(Long id, Long userId) { //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE Integer number = seckillMapper.getNumberById(id); if (number != null && number > 0) { log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number); //扣库存 seckillMapper.deductNumberById(id); //创建订单 SuccessKilledModel killed = new SuccessKilledModel(); killed.setSeckillId(id); killed.setUserId(userId); killed.setState((short)0); successKilledMapper.insert(killed); return BaseResponse.valueOfSuccess(); } else { return BaseResponse.valueOfError(10010, "库存不足"); } }

结果会报死锁的问题:

下面拿一个例子来解析一下产生死锁的原因,使用lock in share mode具有很高的风险,看下面的案例:

session 1:

set autocommit = 0; select * from tb_test where id = 1 lock in share mode;

open session2:

set autocommit = 0; select * from tb_test where id = 1 lock in share mode;

这个时候两个session同时持有id = 1这行数据的共享锁。这个时候我们在session 1里面执行update操作:

session 1:

update tb_test set col1 = 'AAA' where id = 1;

结果卡住了:这个时候session1必须等待session2退出事务或者等待直到锁超时:

锁超时的情况: ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction

接着 我们再session2里面执行

session2:

update tb_test set col1 = 'BBB' where id = 1;

结果直接报错:

ERROR 1213 (40001): Deadlock found when trying to get lock ; try restarting transaction

这个时候mysql检测到会发生死锁,会中断当前事务该语句的执行,重新开启一个新的事务(应该就是相当于session2先退出事务,然后再开启一个事务)。

总结:

Lock in share mode使用不当很容易照成死锁,就是在两个事务同时都有select 和update语句的时候,第一个事务select或者共享锁,第二个事务启动执行select 也获得共享锁,然后第一个事务执行update获取排他锁要等待第二个事务结束,因为 共享锁允许其他事务加共享锁读取,但是,不允许其他事务去做修改,或者加排它锁, 接着第二个事务又加了排它锁,又要等待第一个事务解锁,就照成了一个相互等待 的现象 6.解决方案2,select语句强制使用当前读(排它锁方式FOR UPDATE)(最优解)

代码:

@Override //开启事务 @Transactional(rollbackFor = Exception.class) public BaseResponse startSeckillWithLock(Long id, Long userId) { //lock.lock(); try { return seckill(id, userId); } catch (Exception e) { throw e; } finally { //lock.unlock(); } } private BaseResponse seckill(Long id, Long userId) { //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} FOR UPDATE Integer number = seckillMapper.getNumberById(id); if (number != null && number > 0) { log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number); //扣库存 seckillMapper.deductNumberById(id); //创建订单 SuccessKilledModel killed = new SuccessKilledModel(); killed.setSeckillId(id); killed.setUserId(userId); killed.setState((short)0); successKilledMapper.insert(killed); return BaseResponse.valueOfSuccess(); } else { return BaseResponse.valueOfError(10010, "库存不足"); } }

这种方式没有加程序锁,用一个FOR UPDATE搞定

原理:

for update会使用排它锁,排它锁不允许其他事务加共享锁或者排它锁,更加 不允许其他事务修改加锁的行,所以结合我们代码,select语句加了排它锁,那么 其他事务过来执行SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE是要等待的,直到我们第一个事务执行完了 update后提交了事务,这时候下一个事务的select语句才开始执行,这时候 快照里面的行数据已经是最新的了 7.解决方案3,加redis分布式锁(redisson实现)及aop实现(程序锁也可以)

先解释为什么要用aop:

分布式锁内部都是Runtime.exe命令调用外部,肯定是异步的。分布式锁的释放只是 发了一个锁释放命令就算完活了。真正其作用的是下次获取锁的时候,要确保上次是 释放了的。然后如果加锁和解锁请求放在service层方法,那么如果锁释放完了,事务 有可能还没提交,这时候其他事务就已经执行select语句生成快照了,然后读的还是 历史的undo中的记录,所以就会发生超卖一条的现象,放在aop使用环绕通知就可以 在方法执行前和结束后进行加锁和释放锁,确保锁的释放是在事务提交之后

redis配置:

spring: redis: host: localhost port: 6379 password: timeout: 10000 database: 0 redisson: connectionPoolSize: 64 timeOut: 5000 connectionMinimumIdleSize: 10 package com.qsk.seckill.common.config; import com.qsk.seckill.common.properties.RedissonProperties; import com.qsk.seckill.common.utils.RedissonLockUtil; import org.apache.commons.lang3.StringUtils; import org.


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有