- 使用信号量或计数器:
- 可以使用一个
sync.WaitGroup
来记录正在使用资源的协程数量。当一个协程开始使用池中的资源时,调用 WaitGroup.Add(1)
,使用完毕后调用 WaitGroup.Done()
。
- 在销毁Go池时,调用
WaitGroup.Wait()
,等待所有正在使用资源的协程完成操作。例如:
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func worker(pool *sync.Pool) {
wg.Add(1)
defer wg.Done()
resource := pool.Get()
defer pool.Put(resource)
// 使用资源
fmt.Println("Using resource")
time.Sleep(2 * time.Second)
}
func main() {
pool := &sync.Pool{
New: func() interface{} {
return "new resource"
},
}
for i := 0; i < 3; i++ {
go worker(pool)
}
time.Sleep(1 * time.Second)
fmt.Println("Starting to destroy the pool")
wg.Wait()
fmt.Println("Pool destroyed gracefully")
}
- 设置标志位:
- 在Go池结构体中设置一个
isDestroying
的布尔标志位。当开始销毁Go池时,将该标志位置为 true
。
- 在获取和释放资源的方法中,检查这个标志位。如果
isDestroying
为 true
,获取资源方法可以返回 nil
或者阻塞等待直到所有资源被释放,释放资源方法可以不再将资源放回池中。例如:
package main
import (
"fmt"
"sync"
"time"
)
type ResourcePool struct {
pool sync.Pool
isDestroying bool
}
func (rp *ResourcePool) Get() interface{} {
if rp.isDestroying {
return nil
}
return rp.pool.Get()
}
func (rp *ResourcePool) Put(resource interface{}) {
if!rp.isDestroying {
rp.pool.Put(resource)
}
}
func (rp *ResourcePool) Destroy() {
rp.isDestroying = true
// 等待所有资源被释放(可结合WaitGroup等方式)
time.Sleep(2 * time.Second)
fmt.Println("Pool destroyed gracefully")
}
func worker(rp *ResourcePool) {
resource := rp.Get()
if resource != nil {
defer rp.Put(resource)
fmt.Println("Using resource")
time.Sleep(2 * time.Second)
}
}
func main() {
rp := &ResourcePool{
pool: sync.Pool{
New: func() interface{} {
return "new resource"
},
},
}
for i := 0; i < 3; i++ {
go worker(rp)
}
time.Sleep(1 * time.Second)
fmt.Println("Starting to destroy the pool")
rp.Destroy()
}
- 使用通道:
- 可以创建一个通道用于通知正在使用资源的协程进行清理操作。当开始销毁Go池时,向这个通道发送消息。
- 协程在使用资源的过程中,定期检查这个通道是否有消息。如果收到消息,则进行资源清理并退出。例如:
package main
import (
"fmt"
"sync"
"time"
)
type ResourcePool struct {
pool sync.Pool
destroyCh chan struct{}
}
func (rp *ResourcePool) Get() interface{} {
return rp.pool.Get()
}
func (rp *ResourcePool) Put(resource interface{}) {
rp.pool.Put(resource)
}
func (rp *ResourcePool) Destroy() {
close(rp.destroyCh)
// 等待所有资源被释放(可结合WaitGroup等方式)
time.Sleep(2 * time.Second)
fmt.Println("Pool destroyed gracefully")
}
func worker(rp *ResourcePool) {
resource := rp.Get()
defer rp.Put(resource)
fmt.Println("Using resource")
select {
case <-rp.destroyCh:
fmt.Println("Received destroy signal, cleaning up")
// 进行清理操作
return
case <-time.After(2 * time.Second):
fmt.Println("Finished using resource")
}
}
func main() {
rp := &ResourcePool{
pool: sync.Pool{
New: func() interface{} {
return "new resource"
},
},
destroyCh: make(chan struct{}),
}
for i := 0; i < 3; i++ {
go worker(rp)
}
time.Sleep(1 * time.Second)
fmt.Println("Starting to destroy the pool")
rp.Destroy()
}