分布式事务
分布式事务问题
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 服务 A │ ──── │ 服务 B │ ──── │ 服务 C │
└─────────┘ └─────────┘ └─────────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ MySQL │ │ MySQL │ │ MySQL │
└─────────┘ └─────────┘ └─────────┘
A: 库存服务 - 扣减库存
B: 订单服务 - 创建订单
C: 支付服务 - 扣减余额
如何保证 A、B、C 同时成功或同时失败?解决方案
| 方案 | 说明 | 适用场景 |
|---|---|---|
| 2PC/3PC | XA 协议,强一致性 | 对数据一致性要求高 |
| TCC | Try-Confirm-Cancel | 业务可拆分为三个阶段 |
| 可靠消息 | 本地消息表 + MQ | 异步场景,最终一致 |
| Saga | 编排式事务 | 长事务 |
Seata AT 模式
原理
┌──────────────────────────────────────────────────────────┐
│ TC (事务协调器) │
└──────────────────────────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 库存服务 │ │ 订单服务 │ │ 支付服务 │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ undo_log │ │ undo_log │ │ undo_log │
│ 表 │ │ 表 │ │ 表 │
└─────────────┘ └─────────────┘ └─────────────┘配置
yaml
seata:
tx-service-group: my_tx_group
registry:
type: nacos
nacos:
server-addr: localhost:8848
config:
type: nacos使用
java
@GlobalTransactional(rollbackFor = Exception.class)
public void createOrder(OrderDTO order) {
// 扣减库存
inventoryFeignClient.decrease(order.getProductId(), order.getQuantity());
// 创建订单
Order newOrder = new Order();
newOrder.setUserId(order.getUserId());
newOrder.setProductId(order.getProductId());
newOrder.setQuantity(order.getQuantity());
orderService.save(newOrder);
// 扣减余额
accountFeignClient.decrease(order.getUserId(), order.getAmount());
}TCC 模式
三个阶段
Try: 预留资源(锁定)
Confirm: 确认执行
Cancel: 回滚取消实现
java
@LocalTCC
public interface AccountTccService {
@TwoPhaseBusinessAction(
name = "decrease",
commitMethod = "confirm",
rollbackMethod = "cancel"
)
void decrease(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "amount") BigDecimal amount);
boolean confirm(BusinessActionContext context);
boolean cancel(BusinessActionContext context);
}
@Service
public class AccountTccServiceImpl implements AccountTccService {
@Override
public void decrease(String userId, BigDecimal amount) {
// Try: 冻结金额
accountMapper.freezeAmount(userId, amount);
}
@Override
public boolean confirm(BusinessActionContext context) {
// Confirm: 扣减冻结金额
String userId = context.getActionContext("userId", String.class);
BigDecimal amount = context.getActionContext("amount", BigDecimal.class);
accountMapper.confirmDecrease(userId, amount);
return true;
}
@Override
public boolean cancel(BusinessActionContext context) {
// Cancel: 释放冻结金额
String userId = context.getActionContext("userId", String.class);
BigDecimal amount = context.getActionContext("amount", BigDecimal.class);
accountMapper.cancelFreeze(userId, amount);
return true;
}
}可靠消息最终一致性
本地消息表
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 本地事务 │ ─────── │ 消息表 │ ─────── │ MQ │
│ + 发送消息 │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘实现
java
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private MessageMapper messageMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Transactional
public void createOrder(Order order) {
// 1. 本地事务:创建订单
orderMapper.insert(order);
// 2. 本地事务:发送消息记录
Message message = new Message();
message.setTopic("order-topic");
message.setTag("create");
message.setBody(JSON.toJSONString(order));
message.setStatus(0); // 0-待发送
messageMapper.insert(message);
}
// 定时任务:发送待处理消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
List<Message> pending = messageMapper.findPending();
for (Message msg : pending) {
try {
rocketMQTemplate.syncSend(msg.getTopic() + ":" + msg.getTag(),
msg.getBody());
messageMapper.updateStatus(msg.getId(), 1); // 1-已发送
} catch (Exception e) {
// 失败重试
}
}
}
}总结
- Seata AT:自动补偿,低侵入,适合大多数场景
- TCC:强一致性,性能好,需要业务实现三个阶段
- 可靠消息:最终一致性,适用异步场景
- Saga:适合长事务、业务流程编排
