设计思路
- Context 传递:在任务队列消息中嵌入 Context 相关信息,例如请求 ID 等关键标识,下游服务根据这些信息重建 Context。
- 处理网络延迟:设置合理的 Context 超时时间,同时在下游服务中对消息处理进行限流,防止因网络延迟导致的大量请求积压。
- 服务故障:使用重试机制,当下游服务处理失败时,根据 Context 中的重试标识决定是否重试,并且结合熔断机制,在服务频繁故障时快速失败,避免资源浪费。
技术要点
- Context 序列化与反序列化:将 Context 中的关键信息(如请求 ID、截止时间等)提取出来进行序列化,在接收端反序列化重建 Context。
- 消息队列可靠性:选择可靠的消息队列,如 RabbitMQ、Kafka 等,确保消息不丢失。同时利用消息队列的持久化机制,在服务故障重启后能够继续处理未完成的任务。
- 重试与熔断:采用指数退避算法实现重试机制,避免短时间内大量重试加重系统负担。使用熔断器(如 hystrix-go)实现熔断机制,监控服务的健康状态。
关键代码框架
- 发送端(Go 服务往任务队列发送消息并携带 Context 信息)
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/streadway/amqp"
)
// ContextInfo 用于序列化 Context 关键信息
type ContextInfo struct {
RequestID string
Deadline time.Time
}
func sendMessage(ctx context.Context, message string, ch *amqp.Channel, queueName string) error {
var info ContextInfo
if deadline, ok := ctx.Deadline(); ok {
info.Deadline = deadline
}
info.RequestID = ctx.Value("requestID").(string)
infoBytes, err := json.Marshal(info)
if err!= nil {
return err
}
body := append(infoBytes, []byte(message)...)
err = ch.Publish(
"",
queueName,
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: body,
})
if err!= nil {
return err
}
return nil
}
- 接收端(从任务队列接收消息并重建 Context 处理任务)
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/afex/hystrix-go/hystrix"
"github.com/streadway/amqp"
"time"
)
// ContextInfo 用于反序列化 Context 关键信息
type ContextInfo struct {
RequestID string
Deadline time.Time
}
func receiveMessage(ctx context.Context, ch *amqp.Channel, queueName string) {
messages, err := ch.Consume(
queueName,
"",
true,
false,
false,
false,
nil,
)
if err!= nil {
panic(err)
}
for d := range messages {
var info ContextInfo
err := json.Unmarshal(d.Body[:20], &info)
if err!= nil {
fmt.Println("Unmarshal ContextInfo error:", err)
continue
}
var newCtx context.Context
if!info.Deadline.IsZero() {
newCtx, _ = context.WithDeadline(context.Background(), info.Deadline)
} else {
newCtx = context.Background()
}
newCtx = context.WithValue(newCtx, "requestID", info.RequestID)
// 使用 Hystrix 进行熔断与重试
err = hystrix.Do("taskHandler", func() error {
// 实际任务处理逻辑
fmt.Printf("Processing message: %s with Context: %v\n", string(d.Body[20:]), newCtx)
return nil
}, func(err error) error {
// 重试逻辑
fmt.Println("Retry due to error:", err)
return nil
})
if err!= nil {
fmt.Println("Hystrix error:", err)
}
}
}
- 主函数示例(用于启动发送和接收端)
package main
import (
"context"
"fmt"
"github.com/streadway/amqp"
"time"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err!= nil {
panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err!= nil {
panic(err)
}
defer ch.Close()
queueName := "testQueue"
_, err = ch.QueueDeclare(
queueName,
true,
false,
false,
false,
nil,
)
if err!= nil {
panic(err)
}
ctx := context.WithValue(context.Background(), "requestID", "123456")
ctx, _ = context.WithTimeout(ctx, 5*time.Second)
go sendMessage(ctx, "Hello, Queue!", ch, queueName)
receiveMessage(ctx, ch, queueName)
}