- 使用
sync.Cond
优化等待机制
- 原理:
sync.Cond
基于 sync.Mutex
实现,它可以让一组 goroutine
在满足特定条件时被唤醒。在信号量实现中,当信号量的值为0时,新的请求需要等待。通过 sync.Cond
,可以避免不必要的循环等待(如忙等),当信号量值变为大于0时,sync.Cond
可以唤醒等待的 goroutine
,从而高效地处理资源获取与释放。例如:
package main
import (
"fmt"
"sync"
)
type Semaphore struct {
value int
mu sync.Mutex
cond *sync.Cond
}
func NewSemaphore(initialValue int) *Semaphore {
s := &Semaphore{
value: initialValue,
}
s.cond = sync.NewCond(&s.mu)
return s
}
func (s *Semaphore) Acquire() {
s.mu.Lock()
for s.value <= 0 {
s.cond.Wait()
}
s.value--
s.mu.Unlock()
}
func (s *Semaphore) Release() {
s.mu.Lock()
s.value++
s.cond.Broadcast()
s.mu.Unlock()
}
- 使用无锁数据结构(如
atomic
包)
- 原理:在多
goroutine
环境下,传统的加锁操作(如 sync.Mutex
)会带来一定的性能开销。atomic
包提供了原子操作,这些操作可以在不使用锁的情况下保证数据的一致性。在信号量实现中,可以使用 atomic
包中的函数来原子地增减信号量的值,减少锁竞争,提高并发性能。例如:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type AtomicSemaphore struct {
value int64
}
func NewAtomicSemaphore(initialValue int64) *AtomicSemaphore {
return &AtomicSemaphore{
value: initialValue,
}
}
func (s *AtomicSemaphore) Acquire() {
for {
v := atomic.LoadInt64(&s.value)
if v <= 0 {
continue
}
if atomic.CompareAndSwapInt64(&s.value, v, v - 1) {
break
}
}
}
func (s *AtomicSemaphore) Release() {
atomic.AddInt64(&s.value, 1)
}
- 限制等待队列长度
- 原理:在信号量实现中,可以维护一个等待队列。如果等待队列过长,会消耗大量的内存并且可能导致调度性能下降。通过限制等待队列的长度,当队列满时,新的请求可以选择直接返回错误或者进行其他处理,避免无限等待和资源耗尽。例如,在使用
sync.Cond
实现信号量时,可以在 Acquire
方法中添加对等待队列长度的判断逻辑:
package main
import (
"fmt"
"sync"
)
type LimitedSemaphore struct {
value int
mu sync.Mutex
cond *sync.Cond
maxWaitQueue int
waitQueueLen int
}
func NewLimitedSemaphore(initialValue, maxWaitQueue int) *LimitedSemaphore {
s := &LimitedSemaphore{
value: initialValue,
maxWaitQueue: maxWaitQueue,
}
s.cond = sync.NewCond(&s.mu)
return s
}
func (s *LimitedSemaphore) Acquire() bool {
s.mu.Lock()
for s.value <= 0 {
if s.waitQueueLen >= s.maxWaitQueue {
s.mu.Unlock()
return false
}
s.waitQueueLen++
s.cond.Wait()
s.waitQueueLen--
}
s.value--
s.mu.Unlock()
return true
}
func (s *LimitedSemaphore) Release() {
s.mu.Lock()
s.value++
s.cond.Broadcast()
s.mu.Unlock()
}