设计思路
- 任务抽象:定义一个任务接口,所有不同类型的任务结构体实现该接口,接口包含
Execute
方法用于实际执行任务。
- 优先级控制:为任务结构体添加优先级字段,任务调度器根据优先级决定任务执行顺序。
- 并发执行:使用Go语言的goroutine来实现任务的并发执行,利用channel来协调任务的提交与执行。
- 资源合理分配:可以通过限制并发执行的任务数量,使用一个带缓冲的channel来控制同时执行的任务上限,当任务执行完成后,将其从执行队列中移除,以便新的任务可以进入执行队列。
关键代码实现(以Go语言为例)
package main
import (
"container/heap"
"fmt"
"sync"
)
// 定义任务接口
type Task interface {
Execute()
Priority() int
}
// 定义任务结构体
type MyTask struct {
priority int
// 其他任务相关数据
}
func (t MyTask) Execute() {
// 实际任务执行逻辑
fmt.Println("Executing task with priority:", t.priority)
}
func (t MyTask) Priority() int {
return t.priority
}
// 定义任务优先级队列
type PriorityQueue []Task
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
// 优先级高的在前
return pq[i].Priority() > pq[j].Priority()
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *PriorityQueue) Push(x interface{}) {
*pq = append(*pq, x.(Task))
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n - 1]
*pq = old[0 : n - 1]
return item
}
// 任务调度器
type TaskScheduler struct {
maxConcurrency int
taskQueue PriorityQueue
wg sync.WaitGroup
semaphore chan struct{}
}
func NewTaskScheduler(maxConcurrency int) *TaskScheduler {
return &TaskScheduler{
maxConcurrency: maxConcurrency,
taskQueue: make(PriorityQueue, 0),
semaphore: make(chan struct{}, maxConcurrency),
}
}
func (ts *TaskScheduler) AddTask(task Task) {
heap.Init(&ts.taskQueue)
heap.Push(&ts.taskQueue, task)
}
func (ts *TaskScheduler) Start() {
for {
if ts.taskQueue.Len() == 0 {
break
}
task := heap.Pop(&ts.taskQueue).(Task)
ts.semaphore <- struct{}{}
ts.wg.Add(1)
go func(t Task) {
defer func() {
<-ts.semaphore
ts.wg.Done()
}()
t.Execute()
}(task)
}
ts.wg.Wait()
}
使用示例
func main() {
scheduler := NewTaskScheduler(3)
scheduler.AddTask(MyTask{priority: 3})
scheduler.AddTask(MyTask{priority: 1})
scheduler.AddTask(MyTask{priority: 2})
scheduler.Start()
}