整体架构设计
- 数据接收模块:通过一个Goroutine从数据源接收股票价格数据,并将数据发送到一个共享的通道。
- 数据分发模块:多个Goroutine从共享通道读取数据,并根据股票代码将数据分发给对应的股票统计模块。
- 股票统计模块:每个股票对应一个Goroutine和一个时间窗口,用于计算每分钟的平均价格。
- 数据一致性模块:使用互斥锁(
sync.Mutex
)或原子操作来保证在高并发情况下的数据一致性。
关键代码示例
package main
import (
"fmt"
"sync"
"time"
)
// StockData 股票数据结构
type StockData struct {
Code string
Price float64
Timestamp time.Time
}
// StockStats 股票统计信息
type StockStats struct {
TotalPrice float64
Count int
LastUpdate time.Time
mu sync.Mutex
}
func main() {
dataCh := make(chan StockData)
stockStatsMap := make(map[string]*StockStats)
var wg sync.WaitGroup
// 数据接收模块
go func() {
// 模拟数据接收
for {
data := StockData{
Code: "AAPL",
Price: 150.0,
Timestamp: time.Now(),
}
dataCh <- data
time.Sleep(100 * time.Millisecond)
}
}()
// 数据分发模块
numDispatchers := 3
for i := 0; i < numDispatchers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
stockStats, ok := stockStatsMap[data.Code]
if!ok {
stockStats = &StockStats{
LastUpdate: time.Now(),
}
stockStatsMap[data.Code] = stockStats
}
stockStats.mu.Lock()
stockStats.TotalPrice += data.Price
stockStats.Count++
stockStats.LastUpdate = data.Timestamp
stockStats.mu.Unlock()
}
}()
}
// 计算每分钟平均价格模块
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
for code, stats := range stockStatsMap {
stats.mu.Lock()
if stats.Count > 0 {
avgPrice := stats.TotalPrice / float64(stats.Count)
fmt.Printf("Stock %s - Average Price in last minute: %.2f\n", code, avgPrice)
}
stats.TotalPrice = 0
stats.Count = 0
stats.LastUpdate = time.Now()
stats.mu.Unlock()
}
}
}
}()
wg.Wait()
}
性能优化和数据准确性保证
- 性能优化:
- 使用多个Goroutine进行数据分发,提高数据处理的并行度。
- 采用时间窗口机制,定期计算平均价格,避免频繁计算。
- 数据准确性保证:
- 使用互斥锁(
sync.Mutex
)来保护共享资源,确保在高并发情况下数据的一致性。
- 记录每次更新的时间戳,以保证计算平均价格的时间窗口准确。