1. 架构设计
- 生产者 - 消费者模式:
- 生产者:负责从多个网页抓取数据。在Go语言中,可以使用多个goroutine作为生产者,每个goroutine负责抓取一个网页的数据。例如:
func producer(urls <-chan string, data chan<- []byte) {
for url := range urls {
resp, err := http.Get(url)
if err != nil {
log.Println("Error fetching data from", url, err)
continue
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println("Error reading response body from", url, err)
continue
}
data <- body
}
close(data)
}
- 消费者:负责对抓取到的数据进行处理,如清洗、分析、存储。同样可以使用多个goroutine作为消费者,每个消费者从数据通道中获取数据并处理。例如:
func consumer(data <-chan []byte, result chan<- interface{}) {
for d := range data {
// 清洗数据
cleanData := clean(d)
// 分析数据
analysisResult := analyze(cleanData)
// 存储数据
err := store(analysisResult)
if err != nil {
log.Println("Error storing data:", err)
}
result <- analysisResult
}
close(result)
}
- 责任链模式:对于数据处理的不同阶段(清洗、分析、存储),可以使用责任链模式来解耦这些操作。例如,定义一个处理接口和具体的处理器:
type Handler interface {
Handle(data []byte) (interface{}, error)
SetNext(Handler)
}
type Cleaner struct {
next Handler
}
func (c *Cleaner) Handle(data []byte) (interface{}, error) {
cleanData := clean(data)
if c.next != nil {
return c.next.Handle(cleanData)
}
return cleanData, nil
}
func (c *Cleaner) SetNext(next Handler) {
c.next = next
}
type Analyzer struct {
next Handler
}
func (a *Analyzer) Handle(data []byte) (interface{}, error) {
analysisResult := analyze(data)
if a.next != nil {
return a.next.Handle(analysisResult)
}
return analysisResult, nil
}
func (a *Analyzer) SetNext(next Handler) {
a.next = next
}
type Storer struct {
next Handler
}
func (s *Storer) Handle(data interface{}) (interface{}, error) {
err := store(data)
if err != nil {
return nil, err
}
if s.next != nil {
return s.next.Handle(data)
}
return data, nil
}
func (s *Storer) SetNext(next Handler) {
s.next = next
}
cleaner := &Cleaner{}
analyzer := &Analyzer{}
storer := &Storer{}
cleaner.SetNext(analyzer)
analyzer.SetNext(storer)
func consumer(data <-chan []byte, result chan<- interface{}) {
for d := range data {
res, err := cleaner.Handle(d)
if err != nil {
log.Println("Error handling data:", err)
}
result <- res
}
close(result)
}
2. 性能优化
- 减少内存占用:
- 及时释放资源:在读取网页数据时,及时关闭HTTP响应体,如
defer resp.Body.Close()
,避免内存泄漏。
- 复用缓冲区:在读取HTTP响应体时,可以复用字节切片缓冲区,而不是每次都创建新的。例如:
buf := make([]byte, 4096)
for {
n, err := resp.Body.Read(buf)
if err != nil && err != io.EOF {
log.Println("Error reading response body:", err)
break
}
if n == 0 {
break
}
// 处理buf[:n]的数据
}
- 提高并发效率:
- 合理设置goroutine数量:根据CPU核心数和网络带宽等资源,合理设置生产者和消费者的goroutine数量。可以使用
runtime.NumCPU()
获取CPU核心数,例如:
numProducers := runtime.NumCPU() * 2
numConsumers := runtime.NumCPU() * 4
- 使用sync.Pool:对于一些频繁创建和销毁的对象,如用于数据清洗的临时对象,可以使用
sync.Pool
来复用对象,减少内存分配和垃圾回收压力。例如:
var cleanBufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}
- 优化数据通道:合理设置数据通道的缓冲区大小,避免数据通道成为性能瓶颈。如果数据通道过小,可能会导致goroutine阻塞;如果过大,可能会占用过多内存。例如:
dataChan := make(chan []byte, 100)