设计思路
- 任务定义:将每个 I/O 操作封装成一个函数,这些函数返回操作结果和可能的错误。
- 并发控制:使用 channel 来限制并发数。我们创建一个带有固定缓冲区大小的 channel,这个缓冲区大小决定了可以同时执行的最大任务数。
- Goroutine 调度:为每个任务启动一个 goroutine 执行,将任务结果发送到一个结果 channel 中。
- 错误处理:在每个任务执行过程中捕获错误,并将错误与结果一起发送到结果 channel。
- 等待所有任务完成:使用
sync.WaitGroup
来等待所有 goroutine 完成任务。
代码逻辑
package main
import (
"fmt"
"sync"
)
// 定义任务函数类型
type Task func() (interface{}, error)
// 调度器函数
func scheduler(tasks []Task, concurrency int) ([]interface{}, []error) {
var wg sync.WaitGroup
resultChan := make(chan interface{}, len(tasks))
errorChan := make(chan error, len(tasks))
semaphore := make(chan struct{}, concurrency)
// 启动每个任务
for _, task := range tasks {
semaphore <- struct{}{} // 获取信号量
wg.Add(1)
go func(t Task) {
defer func() {
<-semaphore // 释放信号量
wg.Done()
}()
result, err := t()
if err != nil {
errorChan <- err
} else {
resultChan <- result
}
}(task)
}
// 等待所有任务完成
go func() {
wg.Wait()
close(resultChan)
close(errorChan)
}()
var results []interface{}
var errors []error
// 收集结果和错误
for i := 0; i < len(tasks); i++ {
select {
case result := <-resultChan:
results = append(results, result)
case err := <-errorChan:
errors = append(errors, err)
}
}
return results, errors
}
使用示例
func main() {
// 模拟任务
task1 := func() (interface{}, error) {
// 模拟网络请求或文件读取
fmt.Println("Task 1 started")
return "Result of Task 1", nil
}
task2 := func() (interface{}, error) {
// 模拟网络请求或文件读取
fmt.Println("Task 2 started")
return "Result of Task 2", nil
}
task3 := func() (interface{}, error) {
// 模拟网络请求或文件读取
fmt.Println("Task 3 started")
return nil, fmt.Errorf("Task 3 failed")
}
tasks := []Task{task1, task2, task3}
results, errors := scheduler(tasks, 2)
fmt.Println("Results:", results)
fmt.Println("Errors:", errors)
}
代码解释
- Task 类型:定义了任务函数的类型,每个任务函数返回一个接口类型的结果和可能的错误。
- scheduler 函数:
tasks
是要执行的任务列表,concurrency
是最大并发数。
sync.WaitGroup
用于等待所有任务完成。
resultChan
和 errorChan
分别用于接收任务的结果和错误。
semaphore
是一个带缓冲区的 channel,用于控制并发数。
- 遍历任务列表,为每个任务启动一个 goroutine,获取信号量后执行任务,完成后释放信号量。
- 使用
select
语句从 resultChan
和 errorChan
中收集结果和错误。
- main 函数:创建一些模拟任务并调用
scheduler
函数,最后打印结果和错误。