package main
import (
"fmt"
"sync"
)
// Task 定义任务类型
type Task struct {
ID int
// 可以根据实际需求添加更多任务相关的字段
}
// Worker 定义工作者
type Worker struct {
ID int
taskChan chan Task
wg *sync.WaitGroup
}
// Work 工作者执行任务的方法
func (w *Worker) Work() {
defer w.wg.Done()
for task := range w.taskChan {
fmt.Printf("Worker %d is processing task %d\n", w.ID, task.ID)
// 这里可以添加实际任务处理逻辑
}
}
// WorkerPool 定义工作池
type WorkerPool struct {
workerNum int
taskQueue chan Task
workers []*Worker
wg sync.WaitGroup
}
// NewWorkerPool 创建新的工作池
func NewWorkerPool(workerNum, taskQueueCap int) *WorkerPool {
pool := &WorkerPool{
workerNum: workerNum,
taskQueue: make(chan Task, taskQueueCap),
workers: make([]*Worker, workerNum),
}
for i := 0; i < workerNum; i++ {
pool.workers[i] = &Worker{
ID: i,
taskChan: pool.taskQueue,
wg: &pool.wg,
}
pool.wg.Add(1)
go pool.workers[i].Work()
}
return pool
}
// AddTask 添加任务到任务队列
func (p *WorkerPool) AddTask(task Task) {
p.taskQueue <- task
}
// Stop 停止工作池
func (p *WorkerPool) Stop() {
close(p.taskQueue)
p.wg.Wait()
}
任务调度初步优化
- 任务队列长度限制:通过设置任务队列的容量,避免任务队列无限制增长。例如在
NewWorkerPool
函数中make(chan Task, taskQueueCap)
设置了任务队列的容量taskQueueCap
。当任务队列满时,可以采取不同策略,如丢弃新任务、阻塞等待队列有空闲位置、采用优先级队列等。
- 动态调整工作者数量:根据任务队列的长度和系统资源情况,动态增加或减少工作者数量。可以定期检查任务队列长度,如果队列长度持续高于某个阈值,增加新的工作者;如果队列长度持续低于某个阈值,减少工作者数量以节省资源。
- 任务优先级调度:改变
Task
结构体,增加优先级字段,然后使用优先级队列(如heap
包实现的堆结构)作为任务队列,这样高优先级任务可以优先被处理。
- 任务批量处理:在工作者中,可以每次从任务队列中获取一批任务(如果队列中有足够任务),而不是单个任务,减少工作者与任务队列之间的交互次数,提高效率。