1. sync.Pool
的底层实现
数据结构
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
New func() interface{}
}
- poolLocal:每个逻辑处理器(P)都有一个
poolLocal
实例,用来存储该P本地的对象缓存。
type poolLocalInternal struct {
private interface{}
shared poolChain
}
type poolLocal struct {
poolLocalInternal
Pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
- poolChain:是一个链表结构,用于存储共享的对象。
type poolChain struct {
head, tail *poolChainElem
}
type poolChainElem struct {
elem interface{}
next *poolChainElem
}
内存管理方式
- 对象的获取与归还:当调用
Get
方法时,首先尝试从当前P的private
字段获取对象,如果没有则从shared
链表获取。归还对象时,优先放入private
字段,如果private
字段已有对象,则放入shared
链表。
- 清理机制:
sync.Pool
有一个清理周期,在垃圾回收(GC)时,会将victim
中的对象全部释放,并将当前的local
数据移动到victim
。这样可以保证在长时间没有使用的情况下,对象占用的内存可以被回收。
多协程竞争问题的解决
- 基于P的本地缓存:每个P都有自己独立的
poolLocal
,使得大部分获取和归还操作可以在本地完成,减少了跨P的竞争。
- 原子操作:在从共享链表获取对象或向共享链表添加对象时,使用原子操作(如
sync/atomic
包中的函数)来保证并发安全。例如,在从共享链表中获取对象时,会使用原子操作来修改链表指针。
2. 基于sync.Pool
设计思想设计特定业务场景对象池
设计思路
- 明确业务场景需求:确定对象池中的对象类型、对象的复用逻辑以及对象创建的开销等。例如,对于数据库连接对象池,需要考虑连接的初始化、连接的最大数量、连接的健康检查等。
- 借鉴
P
的本地缓存思想:为每个协程或类似的执行单元分配本地缓存,减少竞争。可以使用与Go
语言逻辑处理器类似的本地存储概念。
- 设计合理的共享缓存:当本地缓存没有所需对象时,从共享缓存获取。共享缓存需要设计合理的数据结构(如链表、队列等)来管理对象,并且要保证并发安全。
- 考虑内存管理和清理机制:类似于
sync.Pool
在GC时的清理机制,设计一种机制来定期清理长时间未使用的对象,释放内存。
关键实现步骤
type CustomPool struct {
local map[interface{}]interface{} // 本地缓存,键可以是协程ID等标识
shared []interface{} // 共享缓存
maxShared int // 共享缓存最大容量
New func() interface{} // 对象创建函数
}
func NewCustomPool(newFunc func() interface{}, maxShared int) *CustomPool {
return &CustomPool{
local: make(map[interface{}]interface{}),
shared: make([]interface{}, 0),
maxShared: maxShared,
New: newFunc,
}
}
func (p *CustomPool) Get() interface{} {
// 从本地缓存获取
if v, ok := p.local[getCurrentID()]; ok {
delete(p.local, getCurrentID())
return v
}
// 从共享缓存获取
if len(p.shared) > 0 {
lastIndex := len(p.shared) - 1
v := p.shared[lastIndex]
p.shared = p.shared[:lastIndex]
return v
}
// 创建新对象
return p.New()
}
func (p *CustomPool) Put(v interface{}) {
id := getCurrentID()
// 尝试放入本地缓存
if _, ok := p.local[id];!ok {
p.local[id] = v
return
}
// 放入共享缓存
if len(p.shared) < p.maxShared {
p.shared = append(p.shared, v)
}
}
func getCurrentID() interface{} {
var buf [64]byte
n := runtime.Stack(buf[:], false)
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
id, _ := strconv.Atoi(idField)
return id
}
- 清理机制:可以使用定时器或结合业务逻辑中的某些时机,定期清理本地缓存和共享缓存中长时间未使用的对象。例如:
func (p *CustomPool) Cleanup() {
p.local = make(map[interface{}]interface{})
p.shared = make([]interface{}, 0)
}