高并发业务代码中一个事务中同时使用select和update的场景深度剖析 | 您所在的位置:网站首页 › update更新select结果 › 高并发业务代码中一个事务中同时使用select和update的场景深度剖析 |
1.秒杀代码(最low实现):
//开启事务
@Transactional(rollbackFor = Exception.class)
public BaseResponse startSeckillWithLock(Long id, Long userId) {
try {
return seckill(id, userId);
} catch (Exception e) {
throw e;
} finally {
}
}
private BaseResponse seckill(Long id, Long userId) {
//获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id}
Integer number = seckillMapper.getNumberById(id);
if (number != null && number > 0) {
log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number);
//扣库存,sql为:UPDATE seckill SET number=number-1 WHERE id = #{id}
seckillMapper.deductNumberById(id);
//创建订单
SuccessKilledModel killed = new SuccessKilledModel();
killed.setSeckillId(id);
killed.setUserId(userId);
killed.setState((short)0);
successKilledMapper.insert(killed);
return BaseResponse.valueOfSuccess();
} else {
return BaseResponse.valueOfError(10010, "库存不足");
}
}
这种多个线程执行肯定会发生超卖的现象:然后第一种解决方案是加ReenTranlock程序锁如下: 2.秒杀加程序锁实现(会出现超卖一个的现象) @Override //开启事务 @Transactional(rollbackFor = Exception.class) public BaseResponse startSeckillWithLock(Long id, Long userId) { lock.lock(); try { return seckill(id, userId); } catch (Exception e) { throw e; } finally { lock.unlock(); } } private BaseResponse seckill(Long id, Long userId) { //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} Integer number = seckillMapper.getNumberById(id); if (number != null && number > 0) { log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number); //扣库存,sql为:UPDATE seckill SET number=number-1 WHERE id = #{id} seckillMapper.deductNumberById(id); //创建订单 SuccessKilledModel killed = new SuccessKilledModel(); killed.setSeckillId(id); killed.setUserId(userId); killed.setState((short)0); successKilledMapper.insert(killed); return BaseResponse.valueOfSuccess(); } else { return BaseResponse.valueOfError(10010, "库存不足"); } }启动一千个线程,库存为100,最终结果是个很奇妙的现象: 库存数居然是1,然后在程序里面打的日志是这样的: 然后就有个疑问:加了锁居然还是无法保证数据的原子性? 后面查阅了mysql的默认隔离机制可重复读底层实现以及@Transaction注解的底层实现,原来根本原因是这样的: 我们代码里面,lock.unlock的操作是写在方法里面的finally语句的, 这时候可能我们解锁了,@Transaction标注的方法事务还没提交, 这就导致了另外一个线程执行select语句的时候读的是未更新前的数据, 至于后面的库存数为什么又是依次递减:那是因为事务已经提交了,undo log 里面的快照version已经更新了,后面读的都是前面更新了的快照, 那肯定就是依次递减啦,所以肯定会发生超卖一个的现象mysql可重复读(RR)和读提交(RC)的隔离机制下,select语句用的是快照读,update,insert,delete默认使用当前读 3.mysql当前读和快照读底层当前读: select…lock in share mode (共享读锁) select…for update(排它锁) update , delete , insert(排它锁) 当前读, 读取的是最新版本, 并且对读取的记录加锁, 阻塞其他事务同时改动相同记录,避免出现安全问题。 例如,假设要update一条记录,但是另一个事务已经delete这条数据并且commit了,如果不加锁就会产生冲突。所以update的时候肯定要是当前读,得到最新的信息并且锁定相应的记录 快照读: 单纯的select操作,不包括上述 select … lock in share mode, select … for update Read Committed隔离级别:每次select都生成一个快照读。 Read Repeatable隔离级别:开启事务后第一个select语句才是快照读的地方,而不是一开启事务就快照读 快照读的实现方式,undolog和多版本并发控制 下图右侧绿色的是数据:一行数据记录,主键ID是10,name=‘Jack’,age=10, 被update更新set为name= ‘Tom’,age=23。 事务会先使用“排他锁”锁定改行,将该行当前的值复制到undo log中,然后再真正地修改当前行的值,最后填写事务的DB_TRX_ID,使用回滚指针DB_ROLL_PTR指向undo log中修改前的行DB_ROW_ID DB_TRX_ID: 6字节DB_TRX_ID字段,表示最后更新的事务id(update,delete,insert)。此外,删除在内部被视为更新,其中行中的特殊位被设置为将其标记为已软删除。 DB_ROLL_PTR: 7字节回滚指针,指向前一个版本的undolog记录,组成undo链表。如果更新了行,则撤消日志记录包含在更新行之前重建行内容所需的信息。 DB_ROW_ID: 6字节的DB_ROW_ID字段,包含一个随着新行插入而单调递增的行ID, 当由innodb自动产生聚集索引时,聚集索引会包括这个行ID的值,否则这个行ID不会出现在任何索引中。如果表中没有主键或合适的唯一索引, 也就是无法生成聚簇索引的时候, InnoDB会帮我们自动生成聚集索引, 聚簇索引会使用DB_ROW_ID的值来作为主键; 如果表中有主键或者合适的唯一索引, 那么聚簇索引中也就不会包含 DB_ROW_ID了 。 其它:insert undo log只在事务回滚时需要, 事务提交就可以删掉了。update undo log包括update 和 delete , 回滚和快照读 都需要 4.共享锁和排他锁(LOCK IN SHARE MODE和FOR UPDATE)共享锁允许其他事务加共享锁读取,但是,不允许其他事务去做修改,或者加排它锁。而排它锁显得更加严格,不允许其他事务加共享锁或者排它锁,更加不允许其他事务修改加锁的行 5.解决方案1,select语句强制使用当前读(共享锁方式LOCK IN SHARE MODE) @Override //开启事务 @Transactional(rollbackFor = Exception.class) public BaseResponse startSeckillWithLock(Long id, Long userId) { lock.lock(); try { return seckill(id, userId); } catch (Exception e) { throw e; } finally { lock.unlock(); } } private BaseResponse seckill(Long id, Long userId) { //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE Integer number = seckillMapper.getNumberById(id); if (number != null && number > 0) { log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number); //扣库存 seckillMapper.deductNumberById(id); //创建订单 SuccessKilledModel killed = new SuccessKilledModel(); killed.setSeckillId(id); killed.setUserId(userId); killed.setState((short)0); successKilledMapper.insert(killed); return BaseResponse.valueOfSuccess(); } else { return BaseResponse.valueOfError(10010, "库存不足"); } }这种方式的原理: 方法中加了程序锁ReeTranlock,就是在执行完所有代码前只允许一个线程去跑 (但这里也没有保证unlock执行前去提交事务),事务中select语句加上LOCK IN SHARE MODE代码这行数据加了共享锁,然后下面执行了update语句这时候又给语句 加上了排他锁,排它锁的功能就是保证了其他事务既不能读也不能写这行数据,所以下 个事务进来执行SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE是要等待的,这时候就保证了获取的数据是最新的数据然后我们再测试一种死锁的现象,我们把程序锁去掉,程序如下: @Override //开启事务 @Transactional(rollbackFor = Exception.class) public BaseResponse startSeckillWithLock(Long id, Long userId) { //lock.lock(); try { return seckill(id, userId); } catch (Exception e) { throw e; } finally { //lock.unlock(); } } private BaseResponse seckill(Long id, Long userId) { //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE Integer number = seckillMapper.getNumberById(id); if (number != null && number > 0) { log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number); //扣库存 seckillMapper.deductNumberById(id); //创建订单 SuccessKilledModel killed = new SuccessKilledModel(); killed.setSeckillId(id); killed.setUserId(userId); killed.setState((short)0); successKilledMapper.insert(killed); return BaseResponse.valueOfSuccess(); } else { return BaseResponse.valueOfError(10010, "库存不足"); } }结果会报死锁的问题: 下面拿一个例子来解析一下产生死锁的原因,使用lock in share mode具有很高的风险,看下面的案例: session 1: set autocommit = 0; select * from tb_test where id = 1 lock in share mode;open session2: set autocommit = 0; select * from tb_test where id = 1 lock in share mode;这个时候两个session同时持有id = 1这行数据的共享锁。这个时候我们在session 1里面执行update操作: session 1: update tb_test set col1 = 'AAA' where id = 1;结果卡住了:这个时候session1必须等待session2退出事务或者等待直到锁超时: 锁超时的情况: ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction接着 我们再session2里面执行 session2: update tb_test set col1 = 'BBB' where id = 1;结果直接报错: ERROR 1213 (40001): Deadlock found when trying to get lock ; try restarting transaction这个时候mysql检测到会发生死锁,会中断当前事务该语句的执行,重新开启一个新的事务(应该就是相当于session2先退出事务,然后再开启一个事务)。 总结: Lock in share mode使用不当很容易照成死锁,就是在两个事务同时都有select 和update语句的时候,第一个事务select或者共享锁,第二个事务启动执行select 也获得共享锁,然后第一个事务执行update获取排他锁要等待第二个事务结束,因为 共享锁允许其他事务加共享锁读取,但是,不允许其他事务去做修改,或者加排它锁, 接着第二个事务又加了排它锁,又要等待第一个事务解锁,就照成了一个相互等待 的现象 6.解决方案2,select语句强制使用当前读(排它锁方式FOR UPDATE)(最优解)代码: @Override //开启事务 @Transactional(rollbackFor = Exception.class) public BaseResponse startSeckillWithLock(Long id, Long userId) { //lock.lock(); try { return seckill(id, userId); } catch (Exception e) { throw e; } finally { //lock.unlock(); } } private BaseResponse seckill(Long id, Long userId) { //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} FOR UPDATE Integer number = seckillMapper.getNumberById(id); if (number != null && number > 0) { log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number); //扣库存 seckillMapper.deductNumberById(id); //创建订单 SuccessKilledModel killed = new SuccessKilledModel(); killed.setSeckillId(id); killed.setUserId(userId); killed.setState((short)0); successKilledMapper.insert(killed); return BaseResponse.valueOfSuccess(); } else { return BaseResponse.valueOfError(10010, "库存不足"); } }这种方式没有加程序锁,用一个FOR UPDATE搞定 原理: for update会使用排它锁,排它锁不允许其他事务加共享锁或者排它锁,更加 不允许其他事务修改加锁的行,所以结合我们代码,select语句加了排它锁,那么 其他事务过来执行SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE是要等待的,直到我们第一个事务执行完了 update后提交了事务,这时候下一个事务的select语句才开始执行,这时候 快照里面的行数据已经是最新的了 7.解决方案3,加redis分布式锁(redisson实现)及aop实现(程序锁也可以)先解释为什么要用aop: 分布式锁内部都是Runtime.exe命令调用外部,肯定是异步的。分布式锁的释放只是 发了一个锁释放命令就算完活了。真正其作用的是下次获取锁的时候,要确保上次是 释放了的。然后如果加锁和解锁请求放在service层方法,那么如果锁释放完了,事务 有可能还没提交,这时候其他事务就已经执行select语句生成快照了,然后读的还是 历史的undo中的记录,所以就会发生超卖一条的现象,放在aop使用环绕通知就可以 在方法执行前和结束后进行加锁和释放锁,确保锁的释放是在事务提交之后redis配置: spring: redis: host: localhost port: 6379 password: timeout: 10000 database: 0 redisson: connectionPoolSize: 64 timeOut: 5000 connectionMinimumIdleSize: 10 package com.qsk.seckill.common.config; import com.qsk.seckill.common.properties.RedissonProperties; import com.qsk.seckill.common.utils.RedissonLockUtil; import org.apache.commons.lang3.StringUtils; import org. |
CopyRight 2018-2019 实验室设备网 版权所有 |