MST

星途 面试题库

面试题:Go的Goroutine与Web服务的分布式并发处理

在一个分布式Web服务架构中,多个节点都使用Go的Goroutine处理并发请求。现在需要实现一个跨节点的分布式任务调度系统,确保任务在不同节点的Goroutine之间合理分配并高效执行。请详细描述你的设计思路,包括如何处理节点间的通信、任务分配策略、容错机制等,并给出核心功能的伪代码。
42.4万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

设计思路

节点间通信

  1. 使用消息队列:例如RabbitMQ、Kafka等。各个节点连接到同一个消息队列,任务被发送到队列中,节点从队列中获取任务。这种方式解耦了任务的生产和消费,提高系统的扩展性和稳定性。
  2. 心跳机制:每个节点定期向中心协调者(或通过广播)发送心跳消息,告知自己的存活状态。可以使用简单的HTTP请求或者基于UDP的轻量级协议。

任务分配策略

  1. 轮询策略:按照节点注册的顺序,依次将任务分配给各个节点。这种策略简单易实现,但可能导致节点负载不均衡。
  2. 基于负载的分配:每个节点实时向中心协调者汇报自己的当前负载(如CPU使用率、内存使用率、当前处理的任务数等)。协调者根据这些信息,将任务分配给负载最轻的节点。

容错机制

  1. 任务重试:如果某个节点在处理任务时失败,任务可以被重新放回消息队列,等待其他节点重新处理。可以设置重试次数和重试间隔,避免无限重试。
  2. 节点故障检测与替换:通过心跳机制,如果中心协调者在一定时间内没有收到某个节点的心跳消息,则判定该节点故障。将该节点从可用节点列表中移除,并重新分配该节点上未完成的任务到其他可用节点。

核心功能伪代码

任务生产者

package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        panic(err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "task_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }

    task := "example task"
    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(task),
        },
    )
    if err != nil {
        panic(err)
    }
    fmt.Println("Task sent to queue")
}

任务消费者(节点端)

package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        panic(err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "task_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }

    msgs, err := ch.Consume(
        q.Name,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }

    go func() {
        for d := range msgs {
            fmt.Printf("Received a task: %s\n", d.Body)
            // 模拟任务处理
            err := processTask(string(d.Body))
            if err != nil {
                // 处理失败,放回队列
                err = ch.Nack(d.DeliveryTag, false, true)
                if err != nil {
                    fmt.Printf("Failed to nack task: %v", err)
                }
            } else {
                // 处理成功,确认任务
                err = ch.Ack(d.DeliveryTag, false)
                if err != nil {
                    fmt.Printf("Failed to ack task: %v", err)
                }
            }
        }
    }()

    fmt.Println("Waiting for tasks...")
    select {}
}

func processTask(task string) error {
    // 实际任务处理逻辑
    fmt.Printf("Processing task: %s\n", task)
    return nil
}

中心协调者(简单示例,基于心跳检测)

package main

import (
    "fmt"
    "time"
)

type Node struct {
    ID        string
    LastHeart time.Time
}

var nodes = make(map[string]*Node)

func monitorNodes() {
    for {
        for id, node := range nodes {
            if time.Since(node.LastHeart) > 10*time.Second {
                // 节点故障处理
                delete(nodes, id)
                fmt.Printf("Node %s is down, removing from the list\n", id)
            }
        }
        time.Sleep(5 * time.Second)
    }
}

func main() {
    go monitorNodes()

    // 模拟节点注册与心跳
    go func() {
        nodeID := "node1"
        nodes[nodeID] = &Node{ID: nodeID, LastHeart: time.Now()}
        for {
            nodes[nodeID].LastHeart = time.Now()
            fmt.Printf("Node %s sent heartbeat\n", nodeID)
            time.Sleep(3 * time.Second)
        }
    }()

    select {}
}