设计思路
- 参数处理:将不定数量的任务ID参数收集起来,按照每个节点最多处理50个任务ID的规则进行分组。
- 网络传输优化:
- 使用高效的网络协议,如gRPC,减少传输开销。
- 批量发送任务ID,减少网络请求次数。
- 资源分配:
- 维护一个节点资源池,记录每个节点的负载情况。
- 优先将任务分配给负载较低的节点。
- 任务超时处理:
- 使用Go语言的
context
包来控制任务的处理时间。
- 在任务启动时,设置10秒的超时时间,如果任务在10秒内未开始处理,则取消该任务。
代码实现
package main
import (
"context"
"fmt"
"sync"
"time"
)
// 模拟节点资源池
type Node struct {
id int
load int
maxLoad int
}
type NodePool struct {
nodes []*Node
mutex sync.Mutex
}
func NewNodePool(nodeCount int, maxLoad int) *NodePool {
pool := &NodePool{
nodes: make([]*Node, nodeCount),
mutex: sync.Mutex{},
}
for i := 0; i < nodeCount; i++ {
pool.nodes[i] = &Node{
id: i,
load: 0,
maxLoad: maxLoad,
}
}
return pool
}
func (np *NodePool) GetLeastLoadedNode() *Node {
np.mutex.Lock()
defer np.mutex.Unlock()
leastLoaded := np.nodes[0]
for _, node := range np.nodes {
if node.load < leastLoaded.load {
leastLoaded = node
}
}
return leastLoaded
}
func (np *NodePool) IncreaseLoad(node *Node, load int) {
np.mutex.Lock()
node.load += load
np.mutex.Unlock()
}
func (np *NodePool) DecreaseLoad(node *Node, load int) {
np.mutex.Lock()
node.load -= load
np.mutex.Unlock()
}
// 任务分发函数
func DistributeTasks(ctx context.Context, taskIDs ...int) {
nodePool := NewNodePool(10, 50) // 假设有10个节点,每个节点最大负载50
const batchSize = 50
var wg sync.WaitGroup
for i := 0; i < len(taskIDs); i += batchSize {
end := i + batchSize
if end > len(taskIDs) {
end = len(taskIDs)
}
batch := taskIDs[i:end]
node := nodePool.GetLeastLoadedNode()
nodePool.IncreaseLoad(node, len(batch))
wg.Add(1)
go func(batch []int, node *Node) {
defer wg.Done()
innerCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
select {
case <-time.After(10 * time.Second):
fmt.Printf("任务超时,取消任务: %v\n", batch)
case <-innerCtx.Done():
if innerCtx.Err() == context.DeadlineExceeded {
fmt.Printf("任务超时,取消任务: %v\n", batch)
} else {
fmt.Printf("节点 %d 处理任务: %v\n", node.id, batch)
}
}
nodePool.DecreaseLoad(node, len(batch))
}(batch, node)
}
wg.Wait()
}
使用示例
func main() {
ctx := context.Background()
taskIDs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
DistributeTasks(ctx, taskIDs...)
}