package main
import (
"fmt"
)
// Task 定义任务函数类型
type Task func() (interface{}, error)
// TaskScheduler 任务调度器结构体
type TaskScheduler struct {
maxWorkers int
taskQueue chan Task
results chan TaskResult
}
// TaskResult 任务执行结果结构体
type TaskResult struct {
taskID int
result interface{}
err error
}
// NewTaskScheduler 创建新的任务调度器
func NewTaskScheduler(maxWorkers int) *TaskScheduler {
return &TaskScheduler{
maxWorkers: maxWorkers,
taskQueue: make(chan Task),
results: make(chan TaskResult),
}
}
// Start 启动任务调度器
func (s *TaskScheduler) Start() {
for i := 0; i < s.maxWorkers; i++ {
go func(workerID int) {
for task := range s.taskQueue {
result, err := task()
s.results <- TaskResult{
taskID: workerID,
result: result,
err: err,
}
}
}(i)
}
}
// AddTask 添加任务到任务队列
func (s *TaskScheduler) AddTask(task Task) {
s.taskQueue <- task
}
// Stop 停止任务调度器
func (s *TaskScheduler) Stop() {
close(s.taskQueue)
close(s.results)
}
// GetResults 获取任务执行结果
func (s *TaskScheduler) GetResults() <-chan TaskResult {
return s.results
}
关键逻辑解释
- 任务定义:
- 定义
Task
类型为一个函数,该函数返回一个 interface{}
和一个 error
,用于表示任务的执行结果和可能出现的错误。
- 任务调度器结构体:
TaskScheduler
结构体包含三个字段:maxWorkers
表示最大并发数,taskQueue
是任务队列,results
是存储任务执行结果的通道。
- 创建任务调度器:
NewTaskScheduler
函数用于创建一个新的 TaskScheduler
实例,初始化任务队列和结果通道。
- 启动任务调度器:
Start
方法启动多个 goroutine
,每个 goroutine
从任务队列 taskQueue
中取出任务并执行,执行结果通过 results
通道发送出去。
- 添加任务:
AddTask
方法将任务添加到任务队列 taskQueue
中。
- 停止任务调度器:
Stop
方法关闭任务队列和结果通道,停止所有工作的 goroutine
。
- 获取结果:
GetResults
方法返回结果通道,调用者可以通过该通道获取任务执行的结果。这样通过通道来传递任务和结果,避免了资源竞争。