MST
星途 面试题库

面试题:Go任务队列context跨服务传递及一致性保证

在一个分布式系统中,Go编写的服务通过任务队列进行交互,使用context管理上下文依赖。请设计一种方案,确保context在跨服务传递过程中的一致性,包括处理网络延迟、服务故障等异常情况,详细阐述设计思路、涉及的技术要点及关键代码框架。
49.1万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. Context 传递:在任务队列消息中嵌入 Context 相关信息,例如请求 ID 等关键标识,下游服务根据这些信息重建 Context。
  2. 处理网络延迟:设置合理的 Context 超时时间,同时在下游服务中对消息处理进行限流,防止因网络延迟导致的大量请求积压。
  3. 服务故障:使用重试机制,当下游服务处理失败时,根据 Context 中的重试标识决定是否重试,并且结合熔断机制,在服务频繁故障时快速失败,避免资源浪费。

技术要点

  1. Context 序列化与反序列化:将 Context 中的关键信息(如请求 ID、截止时间等)提取出来进行序列化,在接收端反序列化重建 Context。
  2. 消息队列可靠性:选择可靠的消息队列,如 RabbitMQ、Kafka 等,确保消息不丢失。同时利用消息队列的持久化机制,在服务故障重启后能够继续处理未完成的任务。
  3. 重试与熔断:采用指数退避算法实现重试机制,避免短时间内大量重试加重系统负担。使用熔断器(如 hystrix-go)实现熔断机制,监控服务的健康状态。

关键代码框架

  1. 发送端(Go 服务往任务队列发送消息并携带 Context 信息)
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/streadway/amqp"
)

// ContextInfo 用于序列化 Context 关键信息
type ContextInfo struct {
    RequestID string
    Deadline  time.Time
}

func sendMessage(ctx context.Context, message string, ch *amqp.Channel, queueName string) error {
    var info ContextInfo
    if deadline, ok := ctx.Deadline(); ok {
        info.Deadline = deadline
    }
    info.RequestID = ctx.Value("requestID").(string)

    infoBytes, err := json.Marshal(info)
    if err!= nil {
        return err
    }

    body := append(infoBytes, []byte(message)...)

    err = ch.Publish(
        "",
        queueName,
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "text/plain",
            Body:         body,
        })
    if err!= nil {
        return err
    }
    return nil
}
  1. 接收端(从任务队列接收消息并重建 Context 处理任务)
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/afex/hystrix-go/hystrix"
    "github.com/streadway/amqp"
    "time"
)

// ContextInfo 用于反序列化 Context 关键信息
type ContextInfo struct {
    RequestID string
    Deadline  time.Time
}

func receiveMessage(ctx context.Context, ch *amqp.Channel, queueName string) {
    messages, err := ch.Consume(
        queueName,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    if err!= nil {
        panic(err)
    }

    for d := range messages {
        var info ContextInfo
        err := json.Unmarshal(d.Body[:20], &info)
        if err!= nil {
            fmt.Println("Unmarshal ContextInfo error:", err)
            continue
        }

        var newCtx context.Context
        if!info.Deadline.IsZero() {
            newCtx, _ = context.WithDeadline(context.Background(), info.Deadline)
        } else {
            newCtx = context.Background()
        }
        newCtx = context.WithValue(newCtx, "requestID", info.RequestID)

        // 使用 Hystrix 进行熔断与重试
        err = hystrix.Do("taskHandler", func() error {
            // 实际任务处理逻辑
            fmt.Printf("Processing message: %s with Context: %v\n", string(d.Body[20:]), newCtx)
            return nil
        }, func(err error) error {
            // 重试逻辑
            fmt.Println("Retry due to error:", err)
            return nil
        })
        if err!= nil {
            fmt.Println("Hystrix error:", err)
        }
    }
}
  1. 主函数示例(用于启动发送和接收端)
package main

import (
    "context"
    "fmt"
    "github.com/streadway/amqp"
    "time"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err!= nil {
        panic(err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err!= nil {
        panic(err)
    }
    defer ch.Close()

    queueName := "testQueue"
    _, err = ch.QueueDeclare(
        queueName,
        true,
        false,
        false,
        false,
        nil,
    )
    if err!= nil {
        panic(err)
    }

    ctx := context.WithValue(context.Background(), "requestID", "123456")
    ctx, _ = context.WithTimeout(ctx, 5*time.Second)

    go sendMessage(ctx, "Hello, Queue!", ch, queueName)
    receiveMessage(ctx, ch, queueName)
}