并发逻辑封装
- 使用Go语言的
sync
包:
sync.Mutex
用于在微服务内部保护共享资源,防止并发访问冲突。例如,在一个微服务中有一个全局计数器:
package main
import (
"fmt"
"sync"
)
var counter int
var mu sync.Mutex
func increment(wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
fmt.Println("Final counter:", counter)
}
sync.WaitGroup
用于等待一组并发任务完成。在上述代码中,WaitGroup
确保所有increment
函数执行完毕后才打印最终计数器的值。
sync.Cond
可以用于在多个协程之间进行条件同步。比如,在一个微服务中,当某个资源达到一定条件时,通知其他协程:
package main
import (
"fmt"
"sync"
"time"
)
var mu sync.Mutex
var cond *sync.Cond
var resource int
func waiter(wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
for resource < 10 {
cond.Wait()
}
fmt.Println("Resource is ready:", resource)
mu.Unlock()
}
func updater(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(2 * time.Second)
mu.Lock()
resource = 10
fmt.Println("Updating resource...")
cond.Broadcast()
mu.Unlock()
}
func main() {
var wg sync.WaitGroup
cond = sync.NewCond(&mu)
wg.Add(2)
go waiter(&wg)
go updater(&wg)
wg.Wait()
}
分布式锁处理
- 使用Redis实现分布式锁:
- 利用Redis的原子操作
SETNX
(SET if Not eXists)来实现分布式锁。以下是一个简单的示例:
package main
import (
"fmt"
"github.com/go-redis/redis/v8"
"context"
"time"
)
var rdb *redis.Client
var ctx = context.Background()
func acquireLock(lockKey string, lockValue string, expiration time.Duration) bool {
set, err := rdb.SetNX(ctx, lockKey, lockValue, expiration).Result()
if err != nil {
fmt.Println("Error acquiring lock:", err)
return false
}
return set
}
func releaseLock(lockKey string, lockValue string) {
script := `if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end`
_, err := rdb.Eval(ctx, script, []string{lockKey}, lockValue).Int64()
if err != nil {
fmt.Println("Error releasing lock:", err)
}
}
func main() {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
lockKey := "my_distributed_lock"
lockValue := "unique_value"
expiration := 10 * time.Second
if acquireLock(lockKey, lockValue, expiration) {
fmt.Println("Lock acquired")
// 执行业务逻辑
time.Sleep(5 * time.Second)
releaseLock(lockKey, lockValue)
fmt.Println("Lock released")
} else {
fmt.Println("Failed to acquire lock")
}
}
- 使用etcd实现分布式锁:
- etcd是一个分布式键值存储系统,也可用于实现分布式锁。可以利用etcd的事务功能来实现锁的获取和释放。
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func acquireEtcdLock(client *clientv3.Client, key string, leaseTTL int64) (bool, *clientv3.LeaseGrantResponse, error) {
lease := clientv3.NewLease(client)
grant, err := lease.Grant(context.TODO(), leaseTTL)
if err != nil {
return false, nil, err
}
txn := client.Txn(context.TODO())
resp, err := txn.If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
Then(clientv3.OpPut(key, "", clientv3.WithLease(grant.ID))).
Commit()
if err != nil {
return false, nil, err
}
return resp.Succeeded, grant, nil
}
func releaseEtcdLock(client *clientv3.Client, key string, lease *clientv3.LeaseGrantResponse) {
clientv3.NewLease(client).Revoke(context.TODO(), lease.ID)
client.Delete(context.TODO(), key)
}
func main() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Println("Failed to connect to etcd:", err)
return
}
defer client.Close()
key := "/locks/my_lock"
leaseTTL := int64(10)
acquired, lease, err := acquireEtcdLock(client, key, leaseTTL)
if err != nil {
fmt.Println("Error acquiring lock:", err)
return
}
if acquired {
fmt.Println("Lock acquired")
// 执行业务逻辑
time.Sleep(5 * time.Second)
releaseEtcdLock(client, key, lease)
fmt.Println("Lock released")
} else {
fmt.Println("Failed to acquire lock")
}
}
数据一致性处理
- 最终一致性:
- 对于一些允许短暂不一致的场景,可以采用最终一致性。例如,在一个电商系统中,商品库存的更新可以采用异步方式。
- 微服务A更新库存时,先记录更新日志到消息队列(如Kafka),微服务B从消息队列消费日志并异步更新库存。
- 在这种情况下,可能会存在短暂的库存不一致,但随着时间推移,最终会达到一致。
- 强一致性:
- 使用分布式事务框架,如Seata。Seata提供了AT、TCC等模式来保证分布式事务的强一致性。
- 以电商下单为例,下单微服务、库存微服务、支付微服务之间可以通过Seata的AT模式来保证订单、库存和支付的一致性。
跨服务并发任务协调
- 使用消息队列:
- 例如使用Kafka。微服务A将任务相关的消息发送到Kafka主题,多个微服务B、C、D订阅该主题,各自消费消息并处理任务。
- 可以通过消息分区来实现任务的负载均衡。例如,按照用户ID对消息进行分区,同一用户的相关任务会被发送到同一个分区,确保同一用户的任务按顺序处理。
- 使用gRPC结合
context.Context
:
- 当一个微服务调用另一个微服务时,可以通过
context.Context
传递上下文信息,包括超时等控制。
- 例如,微服务A调用微服务B处理一个任务,A可以设置一个上下文超时时间:
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
// 假设这是微服务B的接口定义
type TaskService interface {
ProcessTask(ctx context.Context, task *TaskRequest) (*TaskResponse, error)
}
// 假设这是微服务B的客户端代码
func callTaskService() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
fmt.Println("Failed to connect:", err)
return
}
defer conn.Close()
client := NewTaskServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
task := &TaskRequest{Data: "Some task data"}
resp, err := client.ProcessTask(ctx, task)
if err != nil {
fmt.Println("Error calling task service:", err)
return
}
fmt.Println("Task processed:", resp.Result)
}
优化策略
- 缓存优化:
- 在微服务中使用本地缓存(如
go-cache
)或分布式缓存(如Redis)。例如,对于一些不经常变化的数据,如配置信息,可以缓存起来,减少数据库查询次数。
- 异步处理优化:
- 尽量将一些非关键路径的任务异步化。比如,在用户注册微服务中,发送注册成功邮件可以异步处理,提高主业务流程的响应速度。
- 资源池优化:
- 对于数据库连接、网络连接等资源,使用资源池来复用资源。例如,使用
database/sql
包中的连接池来管理数据库连接,减少连接创建和销毁的开销。