package main
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql" // 以MySQL为例,根据实际情况替换
)
func transfer(db *sql.DB, fromAccountID, toAccountID int, amount float64) error {
// 开启事务
tx, err := db.Begin()
if err != nil {
return err
}
// 查询账户A余额
var fromBalance float64
err = tx.QueryRow("SELECT balance FROM accounts WHERE id =?", fromAccountID).Scan(&fromBalance)
if err != nil {
tx.Rollback()
return err
}
// 检查余额是否足够
if fromBalance < amount {
tx.Rollback()
return fmt.Errorf("insufficient balance")
}
// 更新账户A余额
_, err = tx.Exec("UPDATE accounts SET balance = balance -? WHERE id =?", amount, fromAccountID)
if err != nil {
tx.Rollback()
return err
}
// 更新账户B余额
_, err = tx.Exec("UPDATE accounts SET balance = balance +? WHERE id =?", amount, toAccountID)
if err != nil {
tx.Rollback()
return err
}
// 提交事务
err = tx.Commit()
if err != nil {
return err
}
return nil
}
应对高并发场景
- 数据库锁:在更新账户余额时,使用数据库的行级锁(例如在MySQL中使用
FOR UPDATE
语句),确保同一时间只有一个事务能修改账户余额。
// 查询账户A余额并锁定该行
err = tx.QueryRow("SELECT balance FROM accounts WHERE id =? FOR UPDATE", fromAccountID).Scan(&fromBalance)
- 乐观锁:在账户表中添加一个版本号字段,每次更新余额时检查版本号,只有版本号匹配才执行更新操作,并更新版本号。更新失败时,重新读取余额并尝试更新。
// 第一次读取账户A余额和版本号
var fromBalance, fromVersion float64
err = tx.QueryRow("SELECT balance, version FROM accounts WHERE id =?", fromAccountID).Scan(&fromBalance, &fromVersion)
if err != nil {
tx.Rollback()
return err
}
// 检查余额是否足够
if fromBalance < amount {
tx.Rollback()
return fmt.Errorf("insufficient balance")
}
// 更新账户A余额和版本号
res, err := tx.Exec("UPDATE accounts SET balance = balance -?, version = version + 1 WHERE id =? AND version =?", amount, fromAccountID, fromVersion)
if err != nil {
tx.Rollback()
return err
}
rowsAffected, err := res.RowsAffected()
if rowsAffected == 0 {
// 版本号不匹配,更新失败,需要重新尝试
tx.Rollback()
return fmt.Errorf("concurrency conflict, please retry")
}
// 类似操作更新账户B余额
- 队列:将转账请求放入队列(如RabbitMQ、Kafka等),由队列消费者按顺序处理转账请求,避免高并发直接操作数据库。
- 缓存:在应用层使用缓存(如Redis),在缓存中记录账户余额的临时状态,在事务操作前先在缓存中进行检查和预扣减,减少对数据库的直接压力。但要注意缓存和数据库之间的数据一致性问题。