程序员求职经验分享与学习资料整理平台

网站首页 > 文章精选 正文

分布式事务:SpringCloud 项目中的一致性密码解锁

balukai 2025-08-02 17:36:07 文章精选 2 ℃

分布式事务:SpringCloud 项目中的一致性密码解锁

**

在如今的互联网时代,软件系统的架构正经历着一场深刻的变革。曾经,单体应用凭借其部署简单、开发便捷的特点占据主流。但随着业务的飞速发展,用户量激增、业务逻辑日趋复杂,单体应用的局限性逐渐凸显,难以应对高并发、高可用的需求。于是,分布式系统应运而生,SpringCloud 作为当下主流的分布式开发框架,更是被广泛应用。而在分布式系统中,有一个绕不开的核心难题 —— 分布式事务。

从单体到分布式,事务如何 “升级”

在单体应用时代,我们所熟知的事务是局限在单一数据库中的,遵循 ACID 原则(原子性、一致性、隔离性、持久性)。比如在一个简单的电商订单系统中,下单、扣减库存、支付这一系列操作都在同一个数据库里完成,一个事务就能轻松保证这些操作要么全部成功,要么全部失败,数据一致性问题相对容易解决。

然而,随着业务的扩张,单体应用被拆分成一个个独立的微服务,每个服务都拥有自己的数据库。就拿 SpringCloud 构建的电商平台来说,订单服务、库存服务、支付服务分别对应着各自的数据库。此时,当用户下单时,订单服务需要创建订单,库存服务要扣减相应商品的库存,支付服务则要处理支付流程。这三个操作分别在三个不同的数据库中进行,传统的单体事务已经无法保证它们的一致性,分布式事务便由此产生。

分布式事务指的是事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。它的重要性不言而喻,一旦处理不当,就可能出现订单创建成功但库存未扣减,导致超卖;或者支付成功但订单状态未更新,引发用户投诉等问题,严重影响系统的可靠性和用户体验。

分布式事务的 “成长烦恼”

数据一致性难题

在分布式系统中,数据分散在各个节点的数据库中,要保证多个节点数据的一致性绝非易事。比如在上述的电商下单场景中,假设订单服务成功创建了订单,库存服务也成功扣减了库存,但支付服务由于某种原因失败了。这时候,就出现了订单已创建、库存已扣减但支付未成功的情况,数据不一致问题凸显。我们需要让这三个操作要么全部成功,要么全部回滚,但由于它们处于不同的节点,协调起来十分困难。

性能与开销的博弈

分布式事务需要跨节点进行协调通信,这必然会带来额外的性能开销。每一次事务的执行,都需要在各个节点之间传递消息、进行确认,这会增加事务的响应时间。在高并发的场景下,这种性能损耗更为明显。例如,在秒杀活动中,大量的下单请求同时涌入,分布式事务的协调过程可能会让系统的处理能力大幅下降。如何在保证数据一致性的前提下,尽可能地优化性能,减少开销,是分布式事务面临的一大挑战。

网络不稳定的挑战

分布式系统依赖网络进行节点间的通信,但网络并非绝对可靠,延迟和故障时有发生。当网络出现延迟时,分布式事务的协调过程可能会被拉长,导致事务长时间处于不确定状态。而如果网络发生故障,节点之间的通信中断,事务可能会被阻塞,甚至出现部分节点执行了操作,部分节点未执行的情况。要应对这些问题,需要复杂的容错机制,这无疑增加了分布式事务实现的难度。

应对策略大盘点

两阶段提交(2PC):经典策略的 “攻守道”

两阶段提交是分布式事务中最经典的策略,它将事务的执行分为准备阶段和提交阶段。

在准备阶段,事务协调者会向所有参与事务的节点发送准备请求,每个节点收到请求后,会执行本地事务,但不会真正提交,而是记录日志,并向协调者返回是否可以提交的结果。如果所有节点都返回可以提交,那么就进入提交阶段,协调者向所有节点发送提交请求,节点执行提交操作,并返回成功信息;如果有任何一个节点返回不可提交,协调者则向所有节点发送回滚请求,节点执行回滚操作。

这种策略能够很好地保证事务的原子性和一致性,但也存在明显的缺点。一方面,整个过程需要协调者和参与者之间进行多次通信,性能开销较大;另一方面,协调者是整个过程的核心,如果协调者出现单点故障,整个事务可能会陷入停滞。

三阶段提交(3PC):改进之路的探索

三阶段提交是在两阶段提交的基础上进行的改进,它增加了一个预提交阶段。

在准备阶段,协调者向参与者发送准备请求,参与者执行本地事务并返回是否准备好;预提交阶段,协调者在收到所有参与者准备好的消息后,向参与者发送预提交请求,参与者收到后会进行一些必要的检查,并返回确认信息;最后是提交阶段,协调者根据预提交阶段的结果,决定发送提交或回滚请求。

三阶段提交通过增加预提交阶段,减少了事务的阻塞时间,降低了单点故障带来的影响。当协调者出现故障时,参与者可以在等待超时后根据自身的状态进行决策。不过,它仍然存在性能开销大的问题,在一些对性能要求极高的场景下,并非最优选择。

TCC(Try-Confirm-Cancel):补偿机制的巧妙运用

TCC 模式采用了补偿机制,将分布式事务分为 Try、Confirm、Cancel 三个阶段。

Try 阶段,主要是对业务资源进行检查和预留。例如在电商下单中,Try 阶段会检查库存是否充足,并对相应的库存进行锁定。Confirm 阶段,当 Try 阶段成功后,会确认执行业务操作,比如将锁定的库存真正扣减。Cancel 阶段,如果 Try 阶段或其他环节出现问题,则会取消已执行的操作,释放预留的资源,比如解锁锁定的库存。

TCC 模式不需要像 2PC 和 3PC 那样进行复杂的跨节点协调,性能较好,也不存在单点故障问题。但它的开发难度较大,需要为每个业务操作都实现对应的 Try、Confirm 和 Cancel 接口,对开发人员的要求较高。

下面以电商下单场景为例,给出 TCC 模式的部分实现代码:

1. 定义 TCC 接口

public interface OrderTccService {

// Try阶段:创建待支付订单

@TwoPhaseBusinessAction(name = "orderTcc", commitMethod = "confirm", rollbackMethod = "cancel")

boolean tryCreateOrder(@BusinessActionContextParameter(paramName = "order") Order order);

// Confirm阶段:确认订单

boolean confirm(BusinessActionContext context);

// Cancel阶段:取消订单

boolean cancel(BusinessActionContext context);

}

public interface InventoryTccService {

// Try阶段:锁定库存

@TwoPhaseBusinessAction(name = "inventoryTcc", commitMethod = "confirm", rollbackMethod = "cancel")

boolean tryLockInventory(@BusinessActionContextParameter(paramName = "productId") String productId,

@BusinessActionContextParameter(paramName = "quantity") int quantity);

// Confirm阶段:扣减库存

boolean confirm(BusinessActionContext context);

// Cancel阶段:解锁库存

boolean cancel(BusinessActionContext context);

}

public interface PaymentTccService {

// Try阶段:冻结资金

@TwoPhaseBusinessAction(name = "paymentTcc", commitMethod = "confirm", rollbackMethod = "cancel")

boolean tryFreezeFunds(@BusinessActionContextParameter(paramName = "userId") String userId,

@BusinessActionContextParameter(paramName = "amount") BigDecimal amount);

// Confirm阶段:划扣资金

boolean confirm(BusinessActionContext context);

// Cancel阶段:解冻资金

boolean cancel(BusinessActionContext context);

}

2. 实现 TCC 接口

@Service

public class OrderTccServiceImpl implements OrderTccService {

@Autowired

private OrderMapper orderMapper;

@Override

public boolean tryCreateOrder(Order order) {

// 检查用户信息、商品信息等是否合法

// 创建待支付订单

order.setStatus("PENDING_PAYMENT");

return orderMapper.insert(order) > 0;

}

@Override

public boolean confirm(BusinessActionContext context) {

Order order = JSON.parseObject(context.getActionContext("order").toString(), Order.class);

// 将订单状态更新为“已确认”

order.setStatus("CONFIRMED");

return orderMapper.updateStatus(order) > 0;

}

@Override

public boolean cancel(BusinessActionContext context) {

Order order = JSON.parseObject(context.getActionContext("order").toString(), Order.class);

// 删除待支付订单

return orderMapper.deleteById(order.getId()) > 0;

}

}

@Service

public class InventoryTccServiceImpl implements InventoryTccService {

@Autowired

private InventoryMapper inventoryMapper;

@Override

public boolean tryLockInventory(String productId, int quantity) {

// 检查库存是否充足

Inventory inventory = inventoryMapper.selectByProductId(productId);

if (inventory.getStock() < quantity) {

return false;

}

// 锁定库存

return inventoryMapper.lockInventory(productId, quantity) > 0;

}

@Override

public boolean confirm(BusinessActionContext context) {

String productId = context.getActionContext("productId").toString();

int quantity = Integer.parseInt(context.getActionContext("quantity").toString());

// 扣减库存

return inventoryMapper.deductInventory(productId, quantity) > 0;

}

@Override

public boolean cancel(BusinessActionContext context) {

String productId = context.getActionContext("productId").toString();

int quantity = Integer.parseInt(context.getActionContext("quantity").toString());

// 解锁库存

return inventoryMapper.unlockInventory(productId, quantity) > 0;

}

}

@Service

public class PaymentTccServiceImpl implements PaymentTccService {

@Autowired

private AccountMapper accountMapper;

@Override

public boolean tryFreezeFunds(String userId, BigDecimal amount) {

// 检查用户账户余额是否足够

Account account = accountMapper.selectByUserId(userId);

if (account.getBalance().compareTo(amount) < 0) {

return false;

}

// 冻结资金

return accountMapper.freezeFunds(userId, amount) > 0;

}

@Override

public boolean confirm(BusinessActionContext context) {

String userId = context.getActionContext("userId").toString();

BigDecimal amount = new BigDecimal(context.getActionContext("amount").toString());

// 划扣资金

return accountMapper.deductFunds(userId, amount) > 0;

}

@Override

public boolean cancel(BusinessActionContext context) {

String userId = context.getActionContext("userId").toString();

BigDecimal amount = new BigDecimal(context.getActionContext("amount").toString());

// 解冻资金

return accountMapper.unfreezeFunds(userId, amount) > 0;

}

}

3. 发起 TCC 事务

@Service

public class OrderService {

@Autowired

private OrderTccService orderTccService;

@Autowired

private InventoryTccService inventoryTccService;

@Autowired

private PaymentTccService paymentTccService;

@GlobalTransactional

public boolean createOrder(Order order) {

// 调用订单服务的Try方法

boolean orderResult = orderTccService.tryCreateOrder(order);

if (!orderResult) {

throw new RuntimeException("创建订单Try阶段失败");

}

// 调用库存服务的Try方法

boolean inventoryResult = inventoryTccService.tryLockInventory(order.getProductId(), order.getQuantity());

if (!inventoryResult) {

throw new RuntimeException("锁定库存Try阶段失败");

}

// 调用支付服务的Try方法

boolean paymentResult = paymentTccService.tryFreezeFunds(order.getUserId(), order.getAmount());

if (!paymentResult) {

throw new RuntimeException("冻结资金Try阶段失败");

}

return true;

}

}

基于消息队列的最终一致性:异步协调的智慧

基于消息队列实现分布式事务,主要依靠异步协调的方式,分为事务发起和补偿两个阶段。

事务发起阶段,事务发起方执行本地事务,并向消息队列发送一条消息,通知其他参与者执行相应的操作。如果本地事务执行成功,消息会被发送出去;如果失败,则不会发送。其他参与者监听消息队列,收到消息后执行本地事务,并向消息队列发送执行结果消息。

补偿阶段,如果事务发起方没有收到某个参与者的成功执行消息,或者收到失败消息,会通过一定的机制触发补偿操作,让该参与者回滚之前的操作。

这种方式性能好,因为采用了异步通信,不会阻塞事务发起方的执行,也不存在单点故障问题。但它的实现比较复杂,需要保证消息的可靠性和顺序性,否则可能会出现消息丢失、重复消费等问题,影响事务的一致性。

以下是基于消息队列的最终一致性方案在用户注册送积分场景中的部分实现代码(以 RabbitMQ 为例):

1. 消息生产者(用户服务)

@Service

public class UserService {

@Autowired

private UserMapper userMapper;

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private MessageRepository messageRepository;

@Transactional

public void register(User user) {

// 执行本地事务:保存用户信息

userMapper.insert(user);

// 创建消息

Message message = new Message();

message.setContent(JSON.toJSONString(user));

message.setStatus(MessageStatus.SENDING);

message.setCreateTime(new Date());

messageRepository.save(message);

// 发送消息到消息队列

try {

rabbitTemplate.convertAndSend("user.register.exchange", "user.register.key", message);

// 消息发送成功,更新消息状态

message.setStatus(MessageStatus.SENT);

messageRepository.save(message);

} catch (Exception e) {

// 消息发送失败,后续可通过定时任务重新发送

log.error("发送用户注册消息失败", e);

}

}

// 定时任务:处理未成功发送的消息

@Scheduled(cron = "0 0/1 * * * ?")

public void resendFailedMessages() {

List<Message> failedMessages = messageRepository.findByStatus(MessageStatus.SENDING);

for (Message message : failedMessages) {

try {

rabbitTemplate.convertAndSend("user.register.exchange", "user.register.key", message);

message.setStatus(MessageStatus.SENT);

messageRepository.save(message);

} catch (Exception e) {

log.error("重新发送消息失败,消息ID:{}", message.getId(), e);

}

}

}

}

2. 消息消费者(积分服务)

@Service

public class IntegralService {

@Autowired

private IntegralMapper integralMapper;

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private ConsumeRecordRepository consumeRecordRepository;

@RabbitListener(queues = "user.register.queue")

public void handleUserRegister(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {

try {

// 检查消息是否已消费

String messageId = message.getId();

ConsumeRecord record = consumeRecordRepository.findByMessageId(messageId);

if (record != null) {

// 消息已消费,确认消息

channel.basicAck(deliveryTag, false);

return;

}

// 解析消息内容

User user = JSON.parseObject(message.getContent(), User.class);

// 执行本地事务:添加积分

Integral integral = new Integral();

integral.setUserId(user.getId());

integral.setAmount(100); // 注册送100积分

integral.setCreateTime(new Date());

integralMapper.insert(integral);

// 记录消息消费记录

ConsumeRecord consumeRecord = new ConsumeRecord();

consumeRecord.setMessageId(messageId);

consumeRecord.setConsumeTime(new Date());

consumeRecordRepository.save(consumeRecord);

// 发送积分赠送成功消息

rabbitTemplate.convertAndSend("integral.success.exchange", "integral.success.key", messageId);

// 确认消息

channel.basicAck(deliveryTag, false);

} catch (Exception e) {

log.error("处理用户注册消息失败", e);

// 消息处理失败,拒绝消息并重新入队

channel.basicNack(deliveryTag, false, true);

}

}

}

3. 补偿机制(用户服务)

@Service

public class CompensationService {

@Autowired

private MessageRepository messageRepository;

@Autowired

private IntegralFeignClient integralFeignClient;

@Autowired

private SuccessMessageRepository successMessageRepository;

// 定时任务:检查未收到积分赠送成功消息的注册消息

@Scheduled(cron = "0 0/5 * * * ?")

public void checkUnsuccessfulIntegralMessages() {

Date fiveMinutesAgo = new Date(System.currentTimeMillis() - 5 * 60 * 1000);

// 查询5分钟前发送且未收到成功消息的注册消息

List<Message> messages = messageRepository.findByStatusAndCreateTimeBefore(MessageStatus.SENT, fiveMinutesAgo);

for (Message message : messages) {

String messageId = message.getId();

// 检查是否收到积分赠送成功消息

SuccessMessage successMessage = successMessageRepository.findByMessageId(messageId);

if (successMessage == null) {

// 未收到,触发补偿机制:重新调用积分服务赠送积分

User user = JSON.parseObject(message.getContent(), User.class);

try {

integralFeignClient.addIntegral(user.getId(), 100);

// 补偿成功,记录成功消息

SuccessMessage newSuccessMessage = new SuccessMessage();

newSuccessMessage.setMessageId(messageId);

newSuccessMessage.setReceiveTime(new Date());

successMessageRepository.save(newSuccessMessage);

} catch (Exception e) {

log.error("补偿赠送积分失败,用户ID:{}", user.getId(), e);

// 可根据实际情况进行重试或人工干预

}

}

}

}

// 监听积分赠送成功消息

@RabbitListener(queues = "integral.success.queue")

public void handleIntegralSuccess(String messageId) {

SuccessMessage successMessage = new SuccessMessage();

successMessage.setMessageId(messageId);

successMessage.setReceiveTime(new Date());

successMessageRepository.save(successMessage);

}

}

4. Feign 客户端(调用积分服务)

@FeignClient(name = "integral-service")

public interface IntegralFeignClient {

@PostMapping("/integral/add")

Result addIntegral(@RequestParam("userId") String userId, @RequestParam("amount") int amount);

}

实战案例分析

电商系统:订单处理的一致性保障

在 SpringCloud 构建的电商系统中,订单处理是一个典型的分布式事务场景。当用户下单时,订单服务需要创建订单,库存服务要扣减库存,支付服务要完成支付。

假设采用 TCC 模式来解决这个问题。在 Try 阶段,订单服务会检查用户信息、商品信息等是否合法,并创建一个待支付的订单;库存服务检查商品库存是否充足,锁定相应数量的库存;支付服务检查用户账户余额是否足够,冻结相应的资金。如果这三个服务的 Try 阶段都成功,就进入 Confirm 阶段。订单服务将订单状态更新为 “已确认”,库存服务将锁定的库存真正扣减,支付服务将冻结的资金划扣到商家账户。如果任何一个服务的 Try 阶段失败,则进入 Cancel 阶段。订单服务删除待支付订单,库存服务解锁锁定的库存,支付服务解冻冻结的资金。通过 TCC 模式的补偿机制,保证了订单处理过程中各个服务之间的数据一致性。

银行转账系统:资金安全的坚实守护

银行转账系统中,用户从 A 账户向 B 账户转账,这涉及到 A 账户所在银行节点和 B 账户所在银行节点的操作,必须保证两个账户的数据一致性,即 A 账户扣减金额和 B 账户增加金额要么同时成功,要么同时失败。

在这种场景下,两阶段提交(2PC)得到了广泛应用。协调者会先向 A 账户所在节点和 B 账户所在节点发送准备请求,A 节点执行扣减操作(不提交),B 节点执行增加操作(不提交),并分别返回是否可以提交的结果。如果都可以提交,协调者发送提交请求,两个节点完成提交;如果有一个节点不可提交,则发送回滚请求,两个节点进行回滚。通过 2PC,确保了转账过程中资金数据的一致性,保障了用户的资金安全。

SpringCloud 项目中的实际案例:用户注册送积分

在一个基于 SpringCloud 的会员系统中,有这样一个业务场景:用户注册成功后,系统需要自动为用户赠送相应的积分。这里涉及到用户服务(负责用户注册)和积分服务(负责积分赠送),它们分别有自己的数据库,属于分布式事务。

我们可以采用基于消息队列的最终一致性方案来实现。当用户在用户服务中注册成功后,用户服务执行本地事务(将用户信息存入数据库),然后向消息队列发送一条 “用户注册成功” 的消息。积分服务监听消息队列,收到消息后,执行本地事务(为该用户添加积分),并向消息队列发送 “积分赠送成功” 的消息。

如果用户服务执行本地事务失败,就不会发送消息,积分服务也不会进行积分赠送操作。如果积分服务执行本地事务失败,没有发送 “积分赠送成功” 的消息,用户服务可以通过定时任务监听消息队列,发现超时未收到成功消息后,触发补偿机制,比如调用积分服务的接口重新赠送积分,或者通过人工干预的方式解决,最终保证用户注册和积分赠送的一致性。

未来展望

分布式事务作为分布式系统中的核心难题,其重要性随着分布式技术的发展日益凸显。当前,虽然已经有多种应对策略,但每种策略都有其适用场景和局限性,在实际应用中需要根据业务需求进行选择和权衡。

未来,随着云计算、大数据等技术的不断发展,分布式事务技术也将朝着更高效、更可靠、更易用的方向发展。可能会出现更多融合多种策略优点的混合模式,在保证数据一致性的同时,进一步提升性能;也可能会有更智能化的事务协调机制,能够根据系统的实时状态动态调整策略,应对各种复杂的场景。

对于开发者来说,深入理解分布式事务的原理和各种应对策略,结合实际业务场景选择合适的方案,是构建可靠分布式系统的关键。而随着技术的不断进步,相信分布式事务这个难题将会得到更好的解决,为分布式系统的发展提供更坚实的保障。

最近发表
标签列表