设计思路
- 任务定义:使用空接口
interface{}
来表示不同类型的任务,这样可以将任何类型的函数作为任务放入任务队列。
- 任务分发:通过一个任务通道
taskChan
将任务分发给多个工作者(worker)。
- 结果收集:每个工作者完成任务后,将结果通过结果通道
resultChan
返回。
- 工作者管理:创建固定数量的工作者,它们从任务通道中取出任务并执行,以实现并发处理任务。
代码实现
package main
import (
"fmt"
)
// Task 定义任务类型,使用空接口可以接受任何类型的函数
type Task struct {
Function interface{}
Args []interface{}
}
// Result 定义结果类型,使用空接口可以接受任何类型的返回值
type Result struct {
TaskIndex int
Value interface{}
Err error
}
func worker(taskChan <-chan Task, resultChan chan<- Result, index int) {
for task := range taskChan {
// 使用反射来调用函数
result, err := callFunction(task.Function, task.Args)
res := Result{
TaskIndex: index,
Value: result,
Err: err,
}
resultChan <- res
}
}
func callFunction(f interface{}, args []interface{}) (interface{}, error) {
// 这里简单示例,实际应使用反射实现更通用的调用
// 例如,检查函数类型、参数数量和类型等
if len(args) == 0 {
return nil, fmt.Errorf("no arguments provided")
}
switch f := f.(type) {
case func(int) int:
if arg, ok := args[0].(int); ok {
return f(arg), nil
}
return nil, fmt.Errorf("argument type mismatch")
default:
return nil, fmt.Errorf("unsupported function type")
}
}
func main() {
const numWorkers = 3
taskChan := make(chan Task)
resultChan := make(chan Result)
// 启动工作者
for i := 0; i < numWorkers; i++ {
go worker(taskChan, resultChan, i)
}
// 添加任务
tasks := []Task{
{Function: func(a int) int { return a * 2 }, Args: []interface{}{2}},
{Function: func(a int) int { return a + 3 }, Args: []interface{}{5}},
}
for i, task := range tasks {
task.Args = append(task.Args, i)
taskChan <- task
}
close(taskChan)
// 收集结果
results := make([]Result, len(tasks))
for i := 0; i < len(tasks); i++ {
result := <-resultChan
results[result.TaskIndex] = result
}
close(resultChan)
// 输出结果
for _, res := range results {
if res.Err != nil {
fmt.Printf("Task %d failed: %v\n", res.TaskIndex, res.Err)
} else {
fmt.Printf("Task %d result: %v\n", res.TaskIndex, res.Value)
}
}
}
性能和扩展性考虑
- 性能:
- 并发处理:通过多工作者并发处理任务,充分利用多核CPU资源,提高任务处理速度。
- 任务队列:任务通道作为队列,避免了任务处理的阻塞,工作者可以不断从队列中取出任务执行。
- 扩展性:
- 增加工作者数量:可以轻松增加工作者数量来处理更多任务,提高系统的并发处理能力。
- 任务类型:由于使用空接口,系统可以处理各种类型的任务,具有很强的通用性。
- 错误处理:每个任务的执行结果包含错误信息,方便定位和处理任务执行过程中的问题,增强系统的稳定性。