设计思路
- 控制并发数量:使用一个计数器记录当前正在运行的 goroutine 数量,当新的 goroutine 启动时计数器加一,结束时减一。
- 动态调整限制:提供方法来修改允许同时运行的 goroutine 最大数量。
- 并发安全:使用互斥锁保护对计数器和最大限制数量的操作,确保在并发环境下数据的一致性。
数据结构
- 计数器:用于记录当前运行的 goroutine 数量。
- 最大限制数量:表示允许同时运行的 goroutine 最大数目。
- 互斥锁:保护对上述两个变量的操作。
关键方法
- 启动新 goroutine:检查当前计数器是否小于最大限制,若小于则计数器加一,否则等待直到有空间。
- 结束 goroutine:计数器减一。
- 调整限制:修改最大限制数量,同时检查是否有等待的 goroutine 可以继续运行。
代码框架实现
package main
import (
"fmt"
"sync"
)
type CustomConcurrencyControl struct {
maxCount int
running int
mutex sync.Mutex
semaphore chan struct{}
}
func NewCustomConcurrencyControl(maxCount int) *CustomConcurrencyControl {
return &CustomConcurrencyControl{
maxCount: maxCount,
running: 0,
semaphore: make(chan struct{}, maxCount),
}
}
func (ccc *CustomConcurrencyControl) Start() {
ccc.mutex.Lock()
if ccc.running >= ccc.maxCount {
ccc.mutex.Unlock()
<-ccc.semaphore
}
ccc.running++
ccc.mutex.Unlock()
}
func (ccc *CustomConcurrencyControl) Stop() {
ccc.mutex.Lock()
ccc.running--
if ccc.running < ccc.maxCount {
ccc.semaphore <- struct{}{}
}
ccc.mutex.Unlock()
}
func (ccc *CustomConcurrencyControl) AdjustLimit(newLimit int) {
ccc.mutex.Lock()
ccc.maxCount = newLimit
for ccc.running >= ccc.maxCount {
<-ccc.semaphore
}
for i := ccc.running; i < ccc.maxCount; i++ {
ccc.semaphore <- struct{}{}
}
ccc.mutex.Unlock()
}