package main
import (
"fmt"
"sync"
)
// Task 定义任务类型
type Task func()
// TaskScheduler 任务调度器
type TaskScheduler struct {
tasks chan Task
wg sync.WaitGroup
mutex sync.Mutex
cond *sync.Cond
running bool
}
// NewTaskScheduler 创建新的任务调度器
func NewTaskScheduler(numWorkers int) *TaskScheduler {
ts := &TaskScheduler{
tasks: make(chan Task),
running: true,
}
ts.cond = sync.NewCond(&ts.mutex)
for i := 0; i < numWorkers; i++ {
ts.wg.Add(1)
go func() {
defer ts.wg.Done()
for {
task, ok := <-ts.tasks
if!ok {
return
}
task()
ts.mutex.Lock()
for!ts.running && len(ts.tasks) == 0 {
ts.cond.Wait()
}
if!ts.running {
break
}
ts.mutex.Unlock()
}
}()
}
return ts
}
// AddTask 添加任务到任务队列
func (ts *TaskScheduler) AddTask(task Task) {
ts.mutex.Lock()
ts.tasks <- task
ts.cond.Broadcast()
ts.mutex.Unlock()
}
// Stop 停止任务调度器
func (ts *TaskScheduler) Stop() {
ts.mutex.Lock()
ts.running = false
close(ts.tasks)
ts.cond.Broadcast()
ts.mutex.Unlock()
ts.wg.Wait()
}
func main() {
scheduler := NewTaskScheduler(3)
// 添加任务
scheduler.AddTask(func() {
fmt.Println("Task 1 is running")
})
scheduler.AddTask(func() {
fmt.Println("Task 2 is running")
})
// 停止调度器
scheduler.Stop()
}