设计思路
- 任务接口定义:定义
Task
接口,包含Execute
方法,不同类型的任务实现该接口。
- 任务调度器:使用Go语言的goroutine实现并发执行任务,利用通道(channel)来管理任务的提交、结果收集以及错误处理。
- 任务提交:将不同类型的任务发送到任务调度器的任务通道。
- 任务执行:调度器启动多个goroutine来从任务通道中取出任务并执行。
- 结果收集:每个任务执行完毕后,将结果发送到结果通道。
- 汇总处理:等待所有任务完成,从结果通道中收集所有结果并进行汇总处理。
关键代码实现
package main
import (
"fmt"
)
// Task 接口定义
type Task interface {
Execute() (interface{}, error)
}
// TaskScheduler 任务调度器结构体
type TaskScheduler struct {
taskChan chan Task
resultChan chan interface{}
errorChan chan error
}
// NewTaskScheduler 创建新的任务调度器
func NewTaskScheduler(numWorkers int) *TaskScheduler {
ts := &TaskScheduler{
taskChan: make(chan Task),
resultChan: make(chan interface{}),
errorChan: make(chan error),
}
// 启动worker goroutine
for i := 0; i < numWorkers; i++ {
go func() {
for task := range ts.taskChan {
result, err := task.Execute()
if err != nil {
ts.errorChan <- err
} else {
ts.resultChan <- result
}
}
}()
}
return ts
}
// SubmitTask 提交任务
func (ts *TaskScheduler) SubmitTask(task Task) {
ts.taskChan <- task
}
// WaitAndCollect 等待所有任务完成并收集结果
func (ts *TaskScheduler) WaitAndCollect(numTasks int) ([]interface{}, error) {
var results []interface{}
for i := 0; i < numTasks; i++ {
select {
case result := <-ts.resultChan:
results = append(results, result)
case err := <-ts.errorChan:
return nil, err
}
}
close(ts.taskChan)
close(ts.resultChan)
close(ts.errorChan)
return results, nil
}
// 示例任务类型1
type ExampleTask1 struct{}
func (et1 ExampleTask1) Execute() (interface{}, error) {
// 具体任务逻辑
return "ExampleTask1 result", nil
}
// 示例任务类型2
type ExampleTask2 struct{}
func (et2 ExampleTask2) Execute() (interface{}, error) {
// 具体任务逻辑
return 42, nil
}
错误处理
- 任务执行错误:在
Execute
方法中,如果任务执行失败,返回错误信息。调度器的worker goroutine会将错误发送到errorChan
。
- 调度器错误处理:在
WaitAndCollect
方法中,通过select
语句从errorChan
接收错误。如果接收到错误,立即返回错误,停止收集结果。
- 资源清理:在
WaitAndCollect
方法结束时,关闭所有相关的通道,确保资源正确释放,避免内存泄漏。
使用示例
func main() {
scheduler := NewTaskScheduler(3)
scheduler.SubmitTask(ExampleTask1{})
scheduler.SubmitTask(ExampleTask2{})
results, err := scheduler.WaitAndCollect(2)
if err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("Results:", results)
}
}