package main
import (
"context"
"fmt"
"sync"
"time"
)
// Worker 定义工作者
type Worker struct {
id int
ctx context.Context
cancel context.CancelFunc
taskChan chan func()
}
// WorkerPool 定义goroutine池
type WorkerPool struct {
workerNum int
taskChan chan func()
workers []*Worker
wg sync.WaitGroup
mu sync.Mutex
}
// NewWorkerPool 创建新的goroutine池
func NewWorkerPool(workerNum, taskQueueSize int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
wp := &WorkerPool{
workerNum: workerNum,
taskChan: make(chan func(), taskQueueSize),
workers: make([]*Worker, 0, workerNum),
cancel: cancel,
}
for i := 0; i < workerNum; i++ {
worker := &Worker{
id: i,
ctx: ctx,
taskChan: make(chan func(), 1),
}
wp.workers = append(wp.workers, worker)
wp.wg.Add(1)
go worker.start(&wp.wg)
}
return wp
}
// start 启动工作者
func (w *Worker) start(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case task, ok := <-w.taskChan:
if!ok {
return
}
task()
case <-w.ctx.Done():
return
}
}
}
// Submit 提交任务到任务队列
func (wp *WorkerPool) Submit(task func()) {
wp.taskChan <- task
}
// Shutdown 关闭goroutine池
func (wp *WorkerPool) Shutdown() {
wp.cancel()
close(wp.taskChan)
for _, worker := range wp.workers {
close(worker.taskChan)
}
wp.wg.Wait()
}
func main() {
// 创建一个包含5个工作者,任务队列大小为10的goroutine池
pool := NewWorkerPool(5, 10)
for i := 0; i < 20; i++ {
task := func(id int) func() {
return func() {
fmt.Printf("Worker is handling task %d\n", id)
time.Sleep(1 * time.Second)
}
}(i)
pool.Submit(task)
}
time.Sleep(3 * time.Second)
pool.Shutdown()
}
安全性保证
- 任务队列:使用
chan
作为任务队列,chan
本身是线程安全的,不同的goroutine
可以安全地向其发送和接收任务。
- 上下文控制:使用
context.Context
来管理goroutine
的生命周期。当调用Shutdown
方法时,通过cancel
函数取消上下文,所有工作者goroutine
会监听上下文的取消信号,然后安全退出。
- 互斥锁:在
WorkerPool
结构体中定义了一个sync.Mutex
,虽然在当前实现中没有直接使用,但如果后续需要对WorkerPool
的状态进行修改(如动态调整工作者数量等),可以使用该互斥锁来保证数据的一致性。
资源管理机制
- 工作者创建与销毁:在
NewWorkerPool
函数中,一次性创建指定数量的Worker
,并启动对应的goroutine
。在Shutdown
方法中,通过关闭worker.taskChan
和取消上下文来通知工作者goroutine
退出,并且使用sync.WaitGroup
等待所有工作者goroutine
完成任务后再退出。
- 任务队列管理:任务队列使用
chan
实现,当任务提交到队列中时,如果队列已满,提交操作会阻塞,直到有工作者从队列中取出任务,这样可以避免任务过多导致内存溢出。同时,当Shutdown
方法被调用时,会关闭taskChan
,确保没有新的任务可以提交。