1. 两阶段提交(2PC)
- 技术原理:引入一个协调者(Coordinator),第一阶段,协调者向所有参与者(如库存数据库、订单数据库对应的参与者)发送准备消息,参与者执行事务操作但不提交,回复协调者是否准备成功;第二阶段,若所有参与者准备成功,协调者发送提交消息,否则发送回滚消息,参与者根据协调者指令进行提交或回滚。
- 优点:简单直接,能保证强一致性,在大多数关系型数据库原生支持事务的基础上实现分布式事务。
- 缺点:单点问题,协调者故障会导致整个事务无法继续;同步阻塞,在两阶段过程中,参与者资源被锁定;性能问题,涉及多次网络交互。
- 代码示例(简化版伪代码,以Java JDBC为例):
// 协调者
public class Coordinator {
private List<Participant> participants;
public Coordinator(List<Participant> participants) {
this.participants = participants;
}
public void twoPhaseCommit() {
boolean allPrepared = true;
// 第一阶段
for (Participant participant : participants) {
if (!participant.prepare()) {
allPrepared = false;
break;
}
}
// 第二阶段
if (allPrepared) {
for (Participant participant : participants) {
participant.commit();
}
} else {
for (Participant participant : participants) {
participant.rollback();
}
}
}
}
// 参与者
public class Participant {
private Connection connection;
public Participant(Connection connection) {
this.connection = connection;
}
public boolean prepare() {
try {
// 执行数据库操作,如库存更新等
Statement statement = connection.createStatement();
statement.executeUpdate("UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 1");
connection.setAutoCommit(false);
return true;
} catch (SQLException e) {
return false;
}
}
public void commit() {
try {
connection.commit();
connection.setAutoCommit(true);
} catch (SQLException e) {
e.printStackTrace();
}
}
public void rollback() {
try {
connection.rollback();
connection.setAutoCommit(true);
} catch (SQLException e) {
e.printStackTrace();
}
}
}
- 架构设计思路:在分布式电商系统中,协调者可以是独立的服务,管理各个数据库操作对应的参与者。每个参与者负责与自己对应的数据库进行交互,在协调者的控制下完成两阶段提交。
2. 三阶段提交(3PC)
- 技术原理:在2PC基础上增加预询问阶段。第一阶段,协调者向参与者发送预询问消息,检查参与者是否可以执行事务;第二阶段,协调者发送准备消息,参与者执行事务操作但不提交,回复准备结果;第三阶段,若所有参与者准备成功,协调者发送提交消息,否则发送回滚消息。
- 优点:相比2PC,减少了单点故障导致的阻塞问题,因为在预询问阶段可以检测部分故障情况;也减少了同步阻塞时间。
- 缺点:增加了网络交互次数,实现更复杂,性能开销相对较大。
- 代码示例(简化版伪代码):
// 协调者
public class Coordinator3PC {
private List<Participant> participants;
public Coordinator3PC(List<Participant> participants) {
this.participants = participants;
}
public void threePhaseCommit() {
boolean allReady = true;
// 预询问阶段
for (Participant participant : participants) {
if (!participant.preCheck()) {
allReady = false;
break;
}
}
if (allReady) {
boolean allPrepared = true;
// 准备阶段
for (Participant participant : participants) {
if (!participant.prepare()) {
allPrepared = false;
break;
}
}
// 提交阶段
if (allPrepared) {
for (Participant participant : participants) {
participant.commit();
}
} else {
for (Participant participant : participants) {
participant.rollback();
}
}
} else {
for (Participant participant : participants) {
participant.abort();
}
}
}
}
// 参与者
public class Participant {
private Connection connection;
public Participant(Connection connection) {
this.connection = connection;
}
public boolean preCheck() {
// 检查资源是否可用等
return true;
}
public boolean prepare() {
try {
// 执行数据库操作,如库存更新等
Statement statement = connection.createStatement();
statement.executeUpdate("UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 1");
connection.setAutoCommit(false);
return true;
} catch (SQLException e) {
return false;
}
}
public void commit() {
try {
connection.commit();
connection.setAutoCommit(true);
} catch (SQLException e) {
e.printStackTrace();
}
}
public void rollback() {
try {
connection.rollback();
connection.setAutoCommit(true);
} catch (SQLException e) {
e.printStackTrace();
}
}
public void abort() {
// 释放资源等操作
}
}
- 架构设计思路:类似2PC架构,协调者增加预询问逻辑,参与者增加预检查方法。在电商系统中,预询问可以检查库存是否足够等条件,减少后续无效操作。
3. 消息队列(MQ)最终一致性方案
- 技术原理:将订单下单操作拆分为多个本地事务,通过消息队列异步解耦。例如,先在订单数据库创建订单记录并发送库存更新消息到MQ,库存服务从MQ消费消息进行库存更新。通过补偿机制和定期对账来保证最终一致性。
- 优点:异步解耦,提高系统吞吐量;避免了长事务导致的资源锁定问题;实现相对简单,对系统侵入性小。
- 缺点:不能保证强一致性,存在短暂的数据不一致窗口;依赖MQ的可靠性,若MQ出现故障可能导致消息丢失等问题。
- 代码示例(以Kafka为例,简化版伪代码):
// 订单服务
public class OrderService {
private KafkaProducer<String, String> producer;
private Connection orderConnection;
public OrderService(KafkaProducer<String, String> producer, Connection orderConnection) {
this.producer = producer;
this.orderConnection = orderConnection;
}
public void createOrder(Order order) {
try {
// 创建订单本地事务
Statement statement = orderConnection.createStatement();
statement.executeUpdate("INSERT INTO orders (order_id, user_id, product_id) VALUES ('1', '1', '1')");
orderConnection.commit();
orderConnection.setAutoCommit(true);
// 发送库存更新消息
producer.send(new ProducerRecord<>("inventory_topic", "update_inventory:1:1"));
} catch (SQLException e) {
e.printStackTrace();
}
}
}
// 库存服务
public class InventoryService {
private KafkaConsumer<String, String> consumer;
private Connection inventoryConnection;
public InventoryService(KafkaConsumer<String, String> consumer, Connection inventoryConnection) {
this.consumer = consumer;
this.inventoryConnection = inventoryConnection;
}
public void consumeInventoryUpdate() {
consumer.subscribe(Arrays.asList("inventory_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// 解析消息并更新库存
String[] parts = record.value().split(":");
String action = parts[0];
if ("update_inventory".equals(action)) {
String productId = parts[1];
int quantity = Integer.parseInt(parts[2]);
Statement statement = inventoryConnection.createStatement();
statement.executeUpdate("UPDATE inventory SET quantity = quantity - " + quantity + " WHERE product_id = " + productId);
inventoryConnection.commit();
inventoryConnection.setAutoCommit(true);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
- 架构设计思路:在电商系统中,订单服务、库存服务等作为独立服务,通过MQ进行消息传递。订单服务创建订单后发送消息,库存服务消费消息更新库存。同时,设置定时任务进行数据对账,确保最终一致性。
4. TCC(Try - Confirm - Cancel)补偿事务
- 技术原理:将每个数据库操作分为Try、Confirm、Cancel三个阶段。Try阶段进行资源预留等操作,Confirm阶段进行正式提交,Cancel阶段在Try成功但Confirm失败时进行补偿操作。例如,库存服务在Try阶段检查库存并冻结库存,订单服务在Try阶段创建订单记录。若所有Try成功,执行Confirm,否则执行Cancel。
- 优点:不依赖数据库原生事务,适合跨服务、跨数据库类型场景;具有较高的灵活性,可根据业务定制补偿逻辑。
- 缺点:实现复杂,需要每个服务实现Try、Confirm、Cancel三个接口;对业务侵入性大,需要业务逻辑深度参与。
- 代码示例(简化版伪代码):
// 库存服务
public class InventoryService {
private Connection inventoryConnection;
public InventoryService(Connection inventoryConnection) {
this.inventoryConnection = inventoryConnection;
}
public boolean tryInventory(String productId, int quantity) {
try {
// 检查库存并冻结库存
Statement statement = inventoryConnection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT quantity FROM inventory WHERE product_id = " + productId);
if (resultSet.next()) {
int currentQuantity = resultSet.getInt("quantity");
if (currentQuantity >= quantity) {
statement.executeUpdate("UPDATE inventory SET frozen_quantity = frozen_quantity + " + quantity + " WHERE product_id = " + productId);
return true;
}
}
return false;
} catch (SQLException e) {
return false;
}
}
public void confirmInventory(String productId, int quantity) {
try {
// 正式扣减库存
Statement statement = inventoryConnection.createStatement();
statement.executeUpdate("UPDATE inventory SET quantity = quantity - " + quantity + ", frozen_quantity = frozen_quantity - " + quantity + " WHERE product_id = " + productId);
} catch (SQLException e) {
e.printStackTrace();
}
}
public void cancelInventory(String productId, int quantity) {
try {
// 解冻库存
Statement statement = inventoryConnection.createStatement();
statement.executeUpdate("UPDATE inventory SET frozen_quantity = frozen_quantity - " + quantity + " WHERE product_id = " + productId);
} catch (SQLException e) {
e.printStackTrace();
}
}
}
// 订单服务
public class OrderService {
private Connection orderConnection;
public OrderService(Connection orderConnection) {
this.orderConnection = orderConnection;
}
public boolean tryOrder(Order order) {
try {
// 创建订单记录
Statement statement = orderConnection.createStatement();
statement.executeUpdate("INSERT INTO orders (order_id, user_id, product_id, status) VALUES ('1', '1', '1', 'TRY')");
return true;
} catch (SQLException e) {
return false;
}
}
public void confirmOrder(String orderId) {
try {
// 更新订单状态为已确认
Statement statement = orderConnection.createStatement();
statement.executeUpdate("UPDATE orders SET status = 'CONFIRMED' WHERE order_id = " + orderId);
} catch (SQLException e) {
e.printStackTrace();
}
}
public void cancelOrder(String orderId) {
try {
// 删除订单记录
Statement statement = orderConnection.createStatement();
statement.executeUpdate("DELETE FROM orders WHERE order_id = " + orderId);
} catch (SQLException e) {
e.printStackTrace();
}
}
}
- 架构设计思路:每个服务(如订单服务、库存服务)实现自己的TCC接口。在分布式电商系统中,通过一个事务管理器来协调各个服务的Try、Confirm、Cancel操作,确保整个下单事务的一致性。