分布式事务
问题的引入
再来回顾一下我们的秒杀下单接口

从秒杀下单业务逻辑来讲,秒杀下单要么成功,要么失败,不存在中间状态。这就要求:
- 扣减库存 和 秒杀商品的订单以信息的保存 要么全部成功,要么全部失败
- 所以,扣减库存 和 秒杀商品的订单以及订单邮寄信息的保存 的实现,应该实现为一个事物
但是,回顾一下我们秒杀下单的实现代码,我们能够实现,让秒杀服务对数据库的访问(扣减秒杀商品库存),以及订单服务对数据库的访问(保存秒杀订单信息),成为一个数据库事物吗? 当然不行

那么如何让秒杀服务对数据库的访问,以及订单服务对数据库的访问成为一个事务呢? 需要使用分布式事务
分布式系统理论
CAP理论
在讲解分布式事务之前,我们必须首先了解分布式系统中的一个基本理论,CAP理论。这个定理里描述了一个分布式的系统中,以下三个特性最多只能同时满足其中两个
- C(consistentcy):一致性。代表数据在任何时刻、任何分布式节点中所看到的都是符合预期的
- A(Availibality): 可用性。代表系统不间断地提供服务的能力
- P(Partition Tolerance): 分区容忍性。代表分布式环境中部分节点因网络原因而彼此失联后,即与其他节点形成“网络分区”时,系统仍能正常地提供服务的能力。
接下来,我们以两台数据库服务器所组成的数据库服务器集群来说明,这三种特性对分布式系统来说将意味着什么。

在这个简单的系统中,用户通过数据库客户端访问数据库服务器,针对用户的写操作,数据库服务器1和数据库服务器2之间可以相互同步数据。我们希望不管用户访问数据库服务器1还是数据库服务器2,它们之间都可以正确的同步数据的修改,从而使两个数据库服务器中的数据一致。但是,在实际运行时,可能会发生以下的问题:
- 如果用户对数据库服务器1对数据的修改,没有被及时同步给数据库服务器2,那么就会导致,用户通过不同的客户端访问数据库,看到的数据不相同,此即为一致性问题
- 如果由于要把对一个数据库服务器中的数据修改同步给另外一个数据库服务器,必须暂时停止用户对数据库服务器的访问,直到数据同步完成在重新恢复,将可能导致用户在下一次访问数据库时,因数据库服务器之间正在同步数据,而无法访问数据库,此即为可用性问题
- 如果由于数据库服务器之间的网络问题,导致数据库服务器之间无法相互同步数据,此时我们是否继续允许数据库服务器继续正常运行,并且能被访问。此即为分区容忍性问题。
接下来,我们讨论一下在一个分布式系统中,C,A,P这三个指标之间的关系:
- 如果放弃分区容忍性(CA without P):意味着我们将假设节点之间通信永远是可靠的。永远可靠的通信在分布式系统中必定不成立的。所以分区容忍性在分布式系统中不可能排除。
- 如果放弃可用性(CP without A):意味着我们将假设一旦网络发生分区,节点之间的信息同步时间可以无限制地延长,直到网络故障恢复。这就意味着在这个过程中,系统不能被外界正常的访问。
- 如果放弃一致性(AP without C):意味着我们将假设一旦发生分区,节点之间所提供的数据可能不一致,用户访问到的数据也可能有不一致的情况。
所以,CAP理论的结论是:一个分布式的系统中CAP三个特性最多只能同时满足其中两个

BASE理论
有了CAP理论之后,2008年由eBay的工程师提出了BASE理论,可以把它看作是CAP理论的延伸:
- 基本可用(Basically Available):基本可用指的是系统保证核心功能的可用性。
- 软状态(Soft State):系统的状态不需要实时保持一致,即系统中的数据可以存在一段时间的不一致
- 最终一致性(Eventually Consistent):虽然系统在某一时间点可能不保证数据的严格一致性,但承诺在未来某个时间点,所有的数据最终会变得一致
分布式事务
在微服务架构中,分布式事务是指不同的服务对不同数据库的访问成为一个事务。根据对数据一致性的要求不同,分布式事务又分为了两类:
- 刚性事务:像本地事务一样,追求事务的ACID特性,即追求数据的强一致性。比如2PC等
- 柔性事务:不在追求事务的ACID特性,转而追求数据的最终一致性。比如,TCC事务,消息型事务等
刚性事务
为了解决分布式事务的一致性问题,X/Open组织提出了一套名为X/Open XA(XA 是 eXtended Architecture 的缩写)的处理事务规范,被称为2PC,又叫做二阶段提交协议 它将整个事务处理的过程定义为两个阶段:
-
准备阶段:又叫作投票阶段,在这一阶段,事务管理器询问事务的所有参与者,发送要执行的SQL语句,并询问是否准备好提交(此时,SQL语句在数据库的本地事务中执行,但不提交),参与者如果已经准备好提交则回复 Prepared,否则回复 Non-Prepared
-
提交阶段:又叫作执行阶段,事务管理器如果在上一阶段收到所有事务参与者回复的 Prepared 消息,则向所有参与者发送 Commit 指令,所有参与者立即执行提交操作;否则,任意一个参与者回复了 Non-Prepared 消息,或任意一个参与者超时未回复,协调者向所有参与者发送 Rollback 指令,参与者立即执行回滚操作

两段式提交原理简单,并不难实现,但有几个非常显著的缺点:
-
资源锁定问题:从事务的第一阶段开始,参与事务的资源就会被当前事务锁定,直到事务执行完毕。
-
单点问题:一旦协调者(事务管理器)宕机的话,所有参与者都会受到影响。如果协调者一直没有恢复,没有正常发送 Commit 或者 Rollback 的指令,那所有参与者都必须一直等待。
-
性能问题:整个事务的执行,要等待整个两个阶段的工作都完成,整个事务才会执行结束,这是一个同步阻塞的过程,所以事务的执行效率不会很高
-
一致性问题:数据不一致的风险仍然存在
柔性事务
柔性事务与刚性事务不同,不追求数据的强一致性,而只追求数据的最终一致性。常见的柔性事务有TCC,以及消息型事务等。
TCC事务
TCC 是另一种常见的分布式事务机制,它是“Try-Confirm-Cancel”三个单词的缩写。TCC事务分为以下三个步骤:
- Try:尝试执行,完成所有业务可执行性的检查,并且预留好全部需用到的业务资源。
- Confirm:确认执行,不进行任何业务检查,直接使用 Try 阶段准备的资源来完成业务处理
- Cancel:取消执行,释放 Try 阶段预留的业务资源
TCC事务和2PC的形式有些类似,也是将整个事务的执行分成了两个阶段,其中
- Try阶段类比于2PC的准备阶段
- Confirm/Cancel类比于2PC的提交阶段,confirm类比于commit指令,cancel类比于rollback指令
在使用TCC事务之前我们还得先完成代码的改造,让订单服务和秒杀服务分别实现,针对Try,Confirm和Cancel三个阶段的三个方法:

- 对于秒杀服务而言,try方法扣减可售卖库存,增加冻结库存,confirm方法扣减冻结库存,增加锁定库存,try方法和confirm方法合起来,完成整个库存扣减功能,而cancel方法的功能则是”抵消”第一步try方法对数据库的修改,扣减冻结库存,并加回可售卖库存。
- 对于订单服务也是类似,在try方法中完成保存订单信息,但订单状态保存为“预下单”,在confirm方法中将订单状态改为初始化状态。cancel方法则用来”抵消”第一步try方法对数据库的修改,删除try方法中插入的订单
整个执行过程分为如下两个阶段:
- 第一阶段,通过事务管理器调用秒杀服务和订单服务的try方法,事务管理器会负责收集两个服务try方法执行的结果,当收集到两服务的try方法的执行结果,自动进入第二阶段
- 第二个阶段,事务管理器会判断两服务的try方法执行成功与否的结果,如果两服务的try方法都执行成功,则事务管理器自动调用两服务的confirm方法完成整个事务。否则,如果两服务中至少有一个服务的try方法执行不成功,则调用两服务的cancel方法,实现事务的回滚,“抵消”两服务中的try方法对数据库的修改

虽然都是两阶段,但是TCC事务和2PC还是有明显的区别:
- TCC事务的实现不依赖于数据库事务,TCC是业务层面的两阶段,而2PC是数据库内部事务提交的两阶段
- 和2PC相比,使用TCC几乎不会出现资源锁定的问题,因为在数据库层面,try,confirm,cancel是三个完全独立的事务
- 因为TCC不依赖于数据库,所以其实现更加灵活
但是TCC事务也有它明显的缺点,其实现具有很强的代码侵入性,要是用TCC事务,首先就得改造代码,实现try,confirm,cancel,这也意味这TCC事务通常与业务逻辑强耦合,不便于代码维护
消息型事务

消息型事务不止一种实现方式,但是其中最简单的就是基于RocketMQ的事务消息实现的分布式事务。
要理解基于RocketMQ实现的分布式事务,就必须首先理解RocketMQ的事务消息,其特征如下:
- MQ发送方和MQ订阅方分别指消息生产者和消息消费者,它们在不同的服务中运行
- 对于消息生产者而言,一旦它发送消息成功,紧接着就会执行一个本地事务,该事物的结果直接决定消息是否能被消息消费者消费到
- 对于消费者而言,只有当消息生产者执行的本地事务成功,消费者才能消费到消息,否则永远也消费不到消息。
- 同时,如果本地事务执行的结果没有被成功发送给RocketMQ,RocketMQ还会主动向消息生产者查询本地事务的执行结果,从而决定消费者是否能消费到消息
所以,消息发送者所执行的本地事务的结果决定了消费者是否能消费到消息,这样的消息我们称之为事务消息。对于事务消息的发送和普通消息的发送有较大的差别,但是事务消息的消费和普通消息没有区别,所以我们重点看下事务消息的发送代码:
public static void main(String[] args) throws MQClientException {
// 1. 创建专门用于发送事务消息的producer TransactionMQProducer transactionProducer = new TransactionMQProducer("test_transaction_producer56");
//2. 设置nameserver地址 transactionProducer.setNamesrvAddr("192.168.153.149:9876");
//3. 设置TransactionLisener transactionProducer.setTransactionListener(new TransactionListener() { /* 在该方法中执行本地事务 参数: msg: 发送的事务消息 arg: 发送事务消息所需的参数(不需要就传递null) 返回值: 表示本地事务的执行结果 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... 本地事务 return null; }
/* 本地事务执行结果的查询 参数: msg: 之前发送的事务消息 返回值: 查询到的本地事务的执行结果 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // ... 查询本地事务的执行结果 return null; } });
// 4. 启动producer对象 transactionProducer.start();
// 5. 准备待发送的消息 Message message = ...;
// 6. 发送事务消息,第二个参数就是传递给本地事务的参数 TransactionSendResult transactionSendResult = transactionProducer.sendMessageInTransaction(message, null); System.out.println(transactionSendResult);
}
基于RocketMQ的事务消息,我们就可以实现秒杀服务和订单服务的分布式事务了,该事物的执行会有三种情况:
- 秒杀服务(我们称为上游服务)本地事务执行失败,订单服务(我们称为下游服务)消费不到消息,相当于什么也没做
- 上游服务本地事务执行成功,下游服务消费到消息且本地事务也执行成功,相当于都成功
- 上游服务本地事务执行成功,下游服务消费到消息但是本地事务执行失败,此时会出现数据的不一致
针对第3种情况,我们也无需担心,因为:
- RocketMQ会有16次重试的机会,在重试过程中,只要下游服务消费消息成功,那么又会变成第2种情况。
- 即使16次重试都失败,也没有关系,RocketMQ会把这种消息存储到一种特殊的死信队列中。然后我们可以通过人工干预的方式,获取死信队列中的消息,在解决订单服务的问题后,将死信队列中的消息重新发送到RocketMQ让下游服务重新消费。
分布式事务的实现
在下单时,除了检查是否重复下单,我们只需要检查重复下单,然后给订单服务发送事务消息,至于库存扣减,需要再TransactionManager中实现。
public void submitOrderInTransaction(OrderInfoParam orderInfo) {
// 一次只对一个秒杀商品下单 Long skuId = orderInfo.getOrderDetailList().get(0).getSkuId();
RSet<Long> set = redissonClient.getSet(RedisConst.PROMO_USER_ORDERED_FLAG + orderInfo.getUserId()); // 向用户抢购的商品id集合中,添加当前sku商品id boolean ret = set.tryAdd(skuId); if (!ret) { // 如果用户已经下过单,那么直接返回 throw new BusinessException(SeckillCodeEnum.SECKILL_DUPLICATE_TRADE); }
Map<String, Number> paramMap = new HashMap<>(); paramMap.put("skuId", orderInfo.getOrderDetailList().get(0).getSkuId()); paramMap.put("count", orderInfo.getOrderDetailList().get(0).getSkuNum()); MqResultEnum mqResultEnum = producer.sendTransactionMessage(MqTopicConst.PROMO_ORDER_TOPIC, orderInfo, paramMap); if (mqResultEnum.SEND_FAIL.equals(mqResultEnum)) { // 消息未发送成功,或者本地事务执行失败, 需要让用户可以重新下单 redissonClient.getSet(RedisConst.PROMO_USER_ORDERED_FLAG + orderInfo.getUserId()).remove(skuId); throw new BusinessException(SeckillCodeEnum.SECKILL_ORDER_TRY_AGAIN); }
if (mqResultEnum.LOCAL_TRANSACTION_FAIL.equals(mqResultEnum)) { // 消息发送成功,本地事务执行失败 throw new BusinessException(SeckillCodeEnum.SECKILL_FINISH); } // 本地事务执行成功 }在TransactionListener的实现如下:
transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { Map<String, Number> paramsMap = (Map<String, Number>) arg;
Number skuId = paramsMap.get("skuId"); Number count = paramsMap.get("count"); // 扣减库存 RBucket<String> bucket = redissonClient .getBucket(RedisConst.PROMO_TRANSACTION_PREFIX + msg.getTransactionId()); try { Integer stock = seckillGoodsMapper.decreaseStock(skuId, 1);
if (stock <= 0) { // 库存售罄 if (stock == 0) { // 设置库存售罄标志位 LocalCacheHelper.put(skuId.toString(), "0"); }
//保存本地事务结果 bucket.set("fail"); return LocalTransactionState.ROLLBACK_MESSAGE; }
bucket.set("success"); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { e.printStackTrace(); bucket.set("exception"); return LocalTransactionState.ROLLBACK_MESSAGE; }
}
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 获取本地事务结果 RBucket<String> bucket = redissonClient .getBucket(RedisConst.PROMO_TRANSACTION_PREFIX + msg.getTransactionId()); if (bucket == null) { return LocalTransactionState.UNKNOW; } if ("success".equals(bucket.get())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } });至于订单服务,只需要定义一个消费者来消费秒杀服务发送的事务消息即可:
@Slf4j@Componentpublic class PromoOrderConsumer {
@Value("${rocketmq.namesrv.addr}") String namesrvAddr;
@Value("${rocketmq.consumer.group}") String consumerGroup;
DefaultMQPushConsumer mqComsumer;
@Autowired OrderService orderService;
@PostConstruct public void init() throws MQClientException { // 初始化 mqComsumer = new DefaultMQPushConsumer(consumerGroup + "-promorder");
// 设置注册中心 mqComsumer.setNamesrvAddr(namesrvAddr);
// 订阅主题 mqComsumer.subscribe(MqTopicConst.PROMO_ORDER_TOPIC, "*");
// 设置消息监听器 mqComsumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = list.get(0); OrderInfoParam orderInfoParam = JSON.parseObject(new String(messageExt.getBody()), OrderInfoParam.class); log.info("收到秒杀下单消息,topic:{}, messageId:{}", messageExt.getTopic(), messageExt.getMsgId()); try { // 保存秒杀订单 Long orderId = orderService.saveSeckillOrder(orderInfoParam); log.info("秒杀下单 消息消费成功!orderId:{}" , orderId); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} }); // 启动 mqComsumer.start(); log.info("订单超时取消 消息消费者启动成功! "); }}秒杀下单的结果
在使用了基于RocketMQ事务消息实现的分布式事务之后,当秒杀下单接口返回时,有可能订单服务正在接收消息,或者正在执行秒杀下单功能,此时秒杀下单还没有真正完成。所以在秒杀下单之后,我们会跳转到一个排队等待页面。

当订单服务真正下单成功的时候,就会向Redis中放入对应的标志位
@Transactional @Override public Long saveSeckillOrder(OrderInfoParam orderInfoParam) {
//... 保存订单
// 向redis中添加标记 Long skuId = orderInfoParam.getOrderDetailList().get(0).getSkuId(); String userId = orderInfoParam.getUserId().toString(); RMap<String, Long> map = redissonClient.getMap(RedisConst.PROMO_SECKILL_ORDERS); // 放入下单成功的标志位 map.put(userId + ":" + skuId, orderInfo.getId());
return orderInfo.getId(); }同时,前端在排队等待页面中会有一个定时任务,向秒杀服务发送查询秒杀下单结果的请求,直到获取到本次秒杀下单的结果,然后会显示如下页面:

所以,在秒杀服务中我们还需要实现一个接口为前端返回秒杀下单的结果
@GetMapping(value = "/seckill/auth/checkOrder/{skuId}") public Result checkOrder(@PathVariable("skuId") Long skuId, HttpServletRequest request) { //当前登录用户 String userId = AuthContext.getUserId(request); boolean orderSuccess = promoService.checkOrder(skuId, userId); if (orderSuccess) { return Result.build(null, SeckillCodeEnum.SECKILL_ORDER_SUCCESS); }
return Result.build(null, SeckillCodeEnum.SECKILL_RUN); }@Override public boolean checkOrder(Long skuId, String userId) {
//2. 判断Redis中是否已经存在用户的订单 return redissonClient.getMap(RedisConst.PROMO_SECKILL_ORDERS).containsKey(userId + ":" + skuId);
}缓存预热-优化
很显然秒杀商品数据的访问量是比较大的。如果不考虑缓存,那么每次都需要直接访问数据库,势必会给数据库带来比较大的压力,所以在此考虑使用缓存预热的方案提高对秒杀数据的访问效率。
缓存预热就是系统启动前,提前将相关的缓存数据直接加载到缓存系统。避免在用户请求的时候,先查询数据库,然后再将数据放入缓存。
因为缓存预热是在每次秒杀活动开始之前进行的,而我们的秒杀活动是有固定时间的,因此我们可以使用定时任务来实现缓存预热
@Component@EnableSchedulingpublic class ScheduledTask {
@Scheduled(cron = "0 0 1 * * ?") // 每天凌晨一点执行 public void importIntoRedisTask(){
// 1. 从数据库查询今天的秒杀商品数据 // where status = CHECKED_PASS AND stock_count > 0 AND DATE_FORMAT(start_time,'%Y-%m-%d') = ?
// 2. 初始化 for(...) { // 2.1 将秒杀商品数据放入Redis(hash),其中每个filed对应skuId,value对应SseckillGoods对象 // 2.2 在redis中初始化秒杀商品库存(hash),其中每个filed对应skuId,value对应库存数量 // 2.3 初始化库存状态位 }
}}库存的扣减-优化
在Redis中,为了使库存的扣减成为一个原子操作,所以我们提供一个工具类,在该工具类中使用lua脚本来实现库存的扣减
@Component@Slf4jpublic class RedisStockOper {
@Autowired RedissonClient redissonClient;
private final static String STOCK_LUA_NAME = "Redis扣减库存脚本"; /* -- 调用Redis的get指令,查询活动库存,其中KEYS[1]为传入的参数1,即库存key local c_s = redis.call('get', KEYS[1]) -- 判断活动库存是否充足,其中ARGV[1]为传入当前抢购数量 if not c_s or tonumber(c_s) < tonumber(ARGV[1]) then return -1 end -- 如果活动库存充足,则进行扣减操作。其中ARGV[1]为传入当前抢购数量 redis.call('decrby',KEYS[1], ARGV[1]) * */ private final static String STOCK_LUA = "local c_s = redis.call('hget', KEYS[1], ARGV[1]) \n" + "if not c_s or tonumber(c_s) < tonumber(ARGV[2]) then \n" + "return -1 end \n" + "return redis.call('hincrby',KEYS[1], ARGV[1], -tonumber(ARGV[2]))";
private String sha1;
@PostConstruct public void loadScript(){ // 缓存脚本 sha1 = redissonClient.getScript().scriptLoad(STOCK_LUA); log.info("load script {}", sha1); }
public Long decrRedisStock(Long skuId, Integer count) { // 执行脚本,扣减商品skuId对应的库存,扣减的值为count return redissonClient.getScript(StringCodec.INSTANCE).evalSha(RScript.Mode.READ_WRITE, sha1, RScript.ReturnType.INTEGER, List.of(RedisConst.PROMO_SECKILL_GOODS_STOCK), // 脚本不涉及键,传递空列表 skuId.toString(), count.toString()); }
}因此在TransactionManager中,扣减库存的代码也会有所变化:
transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { Map<String, Number> paramsMap = (Map<String, Number>) arg;
Number skuId = paramsMap.get("skuId"); Number count = paramsMap.get("count");
RBucket<String> bucket = redissonClient .getBucket(RedisConst.PROMO_TRANSACTION_PREFIX + msg.getTransactionId());
try { // 从Redis中扣减库存 Long remainStock = redisStockOper.decrRedisStock(skuId, count.intValue());
if (remainStock >= 0) {
if (remainStock == 0) { LocalCacheHelper.put(skuId.toString(), PromoGoodsStock.STOCK_NOT_ENOUGH.getNum()); }
// 往Redis中添加标记,存入success bucket.set("success");
// 库存扣减成功 // 返回本地事务执行成功 return LocalTransactionState.COMMIT_MESSAGE; }else {
// 库存扣减失败 // 说明没库存了,设置本地内存中的库存状态位标记 为 “0” // LocalCacheHelper.put(skuId.toString(), PromoGoodsStock.STOCK_NOT_ENOUGH.getNum());
// 往Redis中添加标记,存入success bucket.set("fail");
// 返回本地事务执行失败 return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { e.printStackTrace(); bucket.set("exception"); return LocalTransactionState.ROLLBACK_MESSAGE; } } }定时任务-清除缓存
秒杀过程中我们写入了大量redis缓存,我们可以在秒杀结束或每天固定时间清楚缓存,释放缓存空间
当前项目中的秒杀活动,我们默认为每天早上8:00开始,晚上8:00结束,所以我们可以在每天晚上十点清空Redis缓存
需要清空哪些缓存呢?
清空商品缓存库存
清空缓存预热商品列表
清空用户下单商品集合标识
清空用户下单成功缓存
清空本地秒杀商品库存标志位
最后,还需要修改秒杀表对应的商品状态为已结束状态,修改数据库中的秒杀商品库存为秒杀结束之后的库存
@Override public void clearRedisCache() {
// 更新数据库中的库存 updateStock();
//修改数据库当天审核通过的的秒杀商品状态为秒杀结束 updateStatus();
// 删除redis中缓存预热的相关数据 redissonClient.getKeys().deleteByPattern("promo:*");
// 清空本地缓存 LocalCacheHelper.removeAll();
}文章分享
如果这篇文章对你有帮助,欢迎分享给更多人!