设计架构
- 任务分发器(Dispatcher):负责接收外部任务,并将任务分发给各个工作节点(Worker Node)。它会维护一个任务队列,并通过通道将任务发送给空闲的工作节点。
- 工作节点(Worker Node):从任务分发器接收任务,执行闭包封装的业务逻辑。工作节点在执行任务过程中,会考虑网络延迟等问题,例如设置合理的超时时间。执行完成后,通过通道将结果返回给结果收集器。
- 结果收集器(Result Collector):收集各个工作节点返回的任务结果。如果某个工作节点在规定时间内没有返回结果,视为节点故障,结果收集器会记录故障信息,并可以选择重新分配任务给其他节点。
各部分功能
- 任务分发器功能:
- 接收外部任务并将其加入任务队列。
- 监听工作节点的空闲通道,当有工作节点空闲时,从任务队列取出任务发送给该工作节点。
- 工作节点功能:
- 监听任务分发器的任务通道,接收任务。
- 执行闭包封装的业务逻辑,考虑网络延迟等问题,设置合理超时机制。
- 将任务执行结果通过结果通道发送给结果收集器。
- 结果收集器功能:
- 监听工作节点的结果通道,收集任务执行结果。
- 记录节点故障信息,对于故障节点对应的任务,可以选择重新分配给其他工作节点。
核心代码实现(以Go语言为例)
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Task 定义任务结构
type Task struct {
ID int
Data interface{}
}
// Result 定义结果结构
type Result struct {
TaskID int
Data interface{}
Err error
}
func main() {
// 任务通道
taskCh := make(chan Task)
// 空闲工作节点通道
idleWorkerCh := make(chan struct{})
// 结果通道
resultCh := make(chan Result)
// 故障信息通道
failureCh := make(chan string)
// 启动任务分发器
go dispatcher(taskCh, idleWorkerCh, resultCh, failureCh)
// 启动3个工作节点
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(taskCh, resultCh, failureCh, &wg)
}
// 启动结果收集器
go resultCollector(resultCh, failureCh)
// 模拟添加任务
for i := 0; i < 10; i++ {
task := Task{ID: i, Data: fmt.Sprintf("Task data %d", i)}
taskCh <- task
}
close(taskCh)
wg.Wait()
close(idleWorkerCh)
close(resultCh)
close(failureCh)
}
// 任务分发器
func dispatcher(taskCh chan Task, idleWorkerCh chan struct{}, resultCh chan Result, failureCh chan string) {
taskQueue := []Task{}
for {
select {
case task, ok := <-taskCh:
if!ok {
// 任务通道关闭,处理完剩余任务
for _, t := range taskQueue {
<-idleWorkerCh
taskCh <- t
}
return
}
taskQueue = append(taskQueue, task)
case <-idleWorkerCh:
if len(taskQueue) > 0 {
task := taskQueue[0]
taskQueue = taskQueue[1:]
taskCh <- task
}
}
}
}
// 工作节点
func worker(taskCh chan Task, resultCh chan Result, failureCh chan string, wg *sync.WaitGroup) {
defer wg.Done()
for task := range taskCh {
result, err := executeTask(task)
if err != nil {
failureCh <- fmt.Sprintf("Node failed to process task %d: %v", task.ID, err)
} else {
resultCh <- Result{TaskID: task.ID, Data: result}
}
idleWorkerCh <- struct{}{}
}
}
// 执行任务的闭包
func executeTask(task Task) (interface{}, error) {
// 模拟网络延迟
time.Sleep(500 * time.Millisecond)
// 模拟业务逻辑
return fmt.Sprintf("Processed: %v", task.Data), nil
}
// 结果收集器
func resultCollector(resultCh chan Result, failureCh chan string) {
for {
select {
case result, ok := <-resultCh:
if!ok {
return
}
fmt.Printf("Received result for task %d: %v\n", result.TaskID, result.Data)
case failure, ok := <-failureCh:
if!ok {
return
}
fmt.Println(failure)
}
}
}
性能、可扩展性和容错性优势分析
- 性能优势:
- 通过通道进行数据传输和同步,避免了共享内存带来的锁竞争问题,提高了并发性能。
- 工作节点的任务执行和任务分发器的任务分配相互独立,并行处理任务,提高了整体系统的吞吐量。
- 可扩展性优势:
- 新增工作节点只需要启动新的worker goroutine,并将其连接到任务通道和结果通道即可,系统能够轻松应对更多任务负载。
- 任务分发器和结果收集器的逻辑简单且独立,易于扩展新的功能,例如任务优先级处理等。
- 容错性优势:
- 结果收集器通过监听故障信息通道,能够及时发现节点故障,并可以选择重新分配任务,保证任务最终能够得到处理。
- 工作节点在执行任务时设置了超时机制,防止因网络延迟等问题导致任务长时间阻塞,提高了系统的稳定性。