package main
import (
"fmt"
"sync"
)
const queueSize = 10
type TaskQueue struct {
queue []interface{}
mutex sync.Mutex
full sync.Cond
empty sync.Cond
}
func NewTaskQueue() *TaskQueue {
tq := &TaskQueue{
queue: make([]interface{}, 0, queueSize),
}
tq.full.L = &tq.mutex
tq.empty.L = &tq.mutex
return tq
}
func (tq *TaskQueue) AddTask(task interface{}) {
tq.mutex.Lock()
defer tq.mutex.Unlock()
for len(tq.queue) == queueSize {
tq.full.Wait()
}
tq.queue = append(tq.queue, task)
tq.empty.Signal()
}
func (tq *TaskQueue) GetTask() interface{} {
tq.mutex.Lock()
defer tq.mutex.Unlock()
for len(tq.queue) == 0 {
tq.empty.Wait()
}
task := tq.queue[0]
tq.queue = tq.queue[1:]
tq.full.Signal()
return task
}
func main() {
var wg sync.WaitGroup
taskQueue := NewTaskQueue()
numProducers := 3
numConsumers := 2
for i := 0; i < numProducers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
task := fmt.Sprintf("Producer %d - Task %d", id, j)
taskQueue.AddTask(task)
fmt.Printf("Producer %d added task: %s\n", id, task)
}
}(i)
}
for i := 0; i < numConsumers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
task := taskQueue.GetTask()
fmt.Printf("Consumer %d processed task: %s\n", id, task)
}
}(i)
}
wg.Wait()
}
竞态条件分析及避免方法
- 竞态条件:在高并发情况下,可能出现多个goroutine同时访问和修改共享队列的情况,这会导致数据不一致。例如,一个生产者goroutine和一个消费者goroutine同时尝试修改队列,可能导致队列状态混乱。
- 避免方法:
- 互斥锁:通过
sync.Mutex
来保护共享队列,在访问队列前加锁,访问完成后解锁,确保同一时间只有一个goroutine可以操作队列。
- 条件变量:使用
sync.Cond
实现生产者在队列满时等待,消费者在队列空时等待。这样可以避免不必要的竞争,并且合理利用资源,在队列状态满足条件时唤醒等待的goroutine。