MST

星途 面试题库

面试题:Go不定参数边界控制在分布式系统中的优化

在一个分布式计算系统中,有一个任务分发函数,它接收不定数量的任务ID(整数类型)作为参数。由于系统资源限制,每个节点每次最多能处理50个任务ID。同时,为了保证任务处理的及时性,每个任务ID对应的任务需要在10秒内开始处理。假设你需要在Go语言中实现这个任务分发函数,并对传入的不定参数进行边界控制,确保系统高效稳定运行。请描述你的设计思路,包括如何优化网络传输、资源分配以及处理可能出现的任务超时情况,并用代码实现核心逻辑。
12.8万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 参数处理:将不定数量的任务ID参数收集起来,按照每个节点最多处理50个任务ID的规则进行分组。
  2. 网络传输优化
    • 使用高效的网络协议,如gRPC,减少传输开销。
    • 批量发送任务ID,减少网络请求次数。
  3. 资源分配
    • 维护一个节点资源池,记录每个节点的负载情况。
    • 优先将任务分配给负载较低的节点。
  4. 任务超时处理
    • 使用Go语言的context包来控制任务的处理时间。
    • 在任务启动时,设置10秒的超时时间,如果任务在10秒内未开始处理,则取消该任务。

代码实现

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 模拟节点资源池
type Node struct {
    id     int
    load   int
    maxLoad int
}

type NodePool struct {
    nodes []*Node
    mutex sync.Mutex
}

func NewNodePool(nodeCount int, maxLoad int) *NodePool {
    pool := &NodePool{
        nodes: make([]*Node, nodeCount),
        mutex: sync.Mutex{},
    }
    for i := 0; i < nodeCount; i++ {
        pool.nodes[i] = &Node{
            id:     i,
            load:   0,
            maxLoad: maxLoad,
        }
    }
    return pool
}

func (np *NodePool) GetLeastLoadedNode() *Node {
    np.mutex.Lock()
    defer np.mutex.Unlock()
    leastLoaded := np.nodes[0]
    for _, node := range np.nodes {
        if node.load < leastLoaded.load {
            leastLoaded = node
        }
    }
    return leastLoaded
}

func (np *NodePool) IncreaseLoad(node *Node, load int) {
    np.mutex.Lock()
    node.load += load
    np.mutex.Unlock()
}

func (np *NodePool) DecreaseLoad(node *Node, load int) {
    np.mutex.Lock()
    node.load -= load
    np.mutex.Unlock()
}

// 任务分发函数
func DistributeTasks(ctx context.Context, taskIDs ...int) {
    nodePool := NewNodePool(10, 50) // 假设有10个节点,每个节点最大负载50
    const batchSize = 50

    var wg sync.WaitGroup
    for i := 0; i < len(taskIDs); i += batchSize {
        end := i + batchSize
        if end > len(taskIDs) {
            end = len(taskIDs)
        }
        batch := taskIDs[i:end]

        node := nodePool.GetLeastLoadedNode()
        nodePool.IncreaseLoad(node, len(batch))

        wg.Add(1)
        go func(batch []int, node *Node) {
            defer wg.Done()
            innerCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
            defer cancel()

            select {
            case <-time.After(10 * time.Second):
                fmt.Printf("任务超时,取消任务: %v\n", batch)
            case <-innerCtx.Done():
                if innerCtx.Err() == context.DeadlineExceeded {
                    fmt.Printf("任务超时,取消任务: %v\n", batch)
                } else {
                    fmt.Printf("节点 %d 处理任务: %v\n", node.id, batch)
                }
            }

            nodePool.DecreaseLoad(node, len(batch))
        }(batch, node)
    }

    wg.Wait()
}

使用示例

func main() {
    ctx := context.Background()
    taskIDs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
    DistributeTasks(ctx, taskIDs...)
}