sync.Cond实现原理
- 数据结构:
sync.Cond
结构体包含一个指向 sync.Mutex
的指针 L
,以及一个等待队列。
- 其定义如下:
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
- 等待机制:
- 当调用
Cond.Wait
方法时,当前 goroutine 会被暂停并加入到等待队列中。
- 在加入等待队列之前,
Wait
方法会自动解锁关联的 sync.Mutex
(因为等待期间不需要持有锁,否则会造成死锁)。
- 当该 goroutine 被唤醒后,
Wait
方法会重新获取该 sync.Mutex
锁,以确保线程安全。
- 通知机制:
Cond.Signal
方法会唤醒等待队列中的一个 goroutine。
Cond.Broadcast
方法会唤醒等待队列中的所有 goroutine。
与sync.Mutex配合使用
- 必要性:
sync.Cond
本身并不包含锁逻辑,它需要依赖 sync.Mutex
来保护共享数据。
- 当对共享数据进行检查(判断是否满足特定条件)以及修改时,需要持有
sync.Mutex
锁,以防止竞态条件。
- 使用方式:
- 首先创建一个
sync.Mutex
和基于该 Mutex
的 sync.Cond
。例如:
var mu sync.Mutex
cond := sync.NewCond(&mu)
- 在调用
Cond.Wait
方法前,必须先获取 sync.Mutex
锁。同样,在调用 Cond.Signal
或 Cond.Broadcast
方法前,也需要获取 sync.Mutex
锁,以保证通知操作的原子性和对共享数据修改的线程安全性。
典型应用场景及并发控制示例
- 场景:生产者 - 消费者模型。
- 在这个模型中,生产者向一个共享缓冲区写入数据,消费者从该缓冲区读取数据。当缓冲区满时,生产者需要等待;当缓冲区空时,消费者需要等待。
- 代码示例:
package main
import (
"fmt"
"sync"
"time"
)
const bufferSize = 5
var (
buffer = make([]int, 0, bufferSize)
mu sync.Mutex
cond = sync.NewCond(&mu)
wg sync.WaitGroup
)
func producer(id int) {
defer wg.Done()
for i := 0; i < 10; i++ {
mu.Lock()
for len(buffer) == bufferSize {
cond.Wait()
}
buffer = append(buffer, i)
fmt.Printf("Producer %d produced %d, buffer: %v\n", id, i, buffer)
cond.Signal()
mu.Unlock()
time.Sleep(time.Millisecond * 100)
}
}
func consumer(id int) {
defer wg.Done()
for {
mu.Lock()
for len(buffer) == 0 {
cond.Wait()
}
item := buffer[0]
buffer = buffer[1:]
fmt.Printf("Consumer %d consumed %d, buffer: %v\n", id, item, buffer)
cond.Signal()
mu.Unlock()
time.Sleep(time.Millisecond * 200)
if len(buffer) == 0 && item == 9 {
break
}
}
}
func main() {
wg.Add(2)
go producer(1)
go consumer(1)
wg.Wait()
}
- 并发控制解析:
- 在生产者函数中,当缓冲区满时(
len(buffer) == bufferSize
),生产者调用 cond.Wait()
进入等待状态,并释放 mu
锁。当消费者从缓冲区取出数据后,会调用 cond.Signal()
唤醒一个等待的生产者。
- 在消费者函数中,当缓冲区空时(
len(buffer) == 0
),消费者调用 cond.Wait()
进入等待状态,并释放 mu
锁。当生产者向缓冲区写入数据后,会调用 cond.Signal()
唤醒一个等待的消费者。通过这种方式,实现了生产者和消费者之间的同步和并发控制,避免了缓冲区的溢出和空读问题。