设计思路
- 数据传输层:使用Go语言的标准库
net
来处理网络通信。可以使用TCP
协议进行数据传输,通过net.Dial
和net.Listen
函数来建立连接和监听端口。
- Channel 设计:
- 发送 Channel:每个节点维护一个发送 Channel,用于将待发送的数据放入该 Channel 中。然后由一个独立的 goroutine 负责从这个 Channel 中取出数据并通过网络发送出去。
- 接收 Channel:每个节点有一个接收 Channel,用于接收从网络中读取到的数据。另一个独立的 goroutine 负责监听网络连接,一旦有数据到达,就将数据放入接收 Channel 中。
- 节点故障处理:
- 在网络连接建立时,采用重试机制。如果连接失败,等待一段时间后重新尝试连接。
- 对于发送数据,在发送失败时,将数据重新放入发送 Channel 中,等待再次发送。
- 接收端可以通过心跳机制检测节点是否存活,如果某个节点长时间没有心跳,则认为该节点故障,停止从该节点接收数据。
- 数据处理:
- 从接收 Channel 中取出的数据可以放入另一个处理 Channel 中,由专门的 goroutine 从处理 Channel 中取出数据进行处理。
关键代码片段
package main
import (
"fmt"
"net"
"time"
)
// 定义数据结构
type Data struct {
Content string
}
// 发送数据的函数
func sendData(sendCh <-chan Data, targetAddr string) {
for data := range sendCh {
conn, err := net.Dial("tcp", targetAddr)
if err != nil {
fmt.Printf("连接失败: %v,重试...\n", err)
time.Sleep(5 * time.Second)
continue
}
defer conn.Close()
_, err = fmt.Fprintf(conn, "%s\n", data.Content)
if err != nil {
fmt.Printf("发送数据失败: %v,重新放入发送队列...\n", err)
sendCh <- data
}
}
}
// 接收数据的函数
func receiveData(receiveCh chan<- Data, listenAddr string) {
ln, err := net.Listen("tcp", listenAddr)
if err != nil {
fmt.Printf("监听失败: %v\n", err)
return
}
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
fmt.Printf("接受连接失败: %v\n", err)
continue
}
go func(c net.Conn) {
defer c.Close()
var data Data
_, err := fmt.Fscanf(c, "%s\n", &data.Content)
if err != nil {
fmt.Printf("读取数据失败: %v\n", err)
return
}
receiveCh <- data
}(conn)
}
}
func main() {
sendCh := make(chan Data)
receiveCh := make(chan Data)
// 启动发送和接收 goroutine
go sendData(sendCh, "127.0.0.1:8080")
go receiveData(receiveCh, "127.0.0.1:8081")
// 模拟产生数据并发送
go func() {
for i := 0; i < 10; i++ {
sendCh <- Data{Content: fmt.Sprintf("数据 %d", i)}
time.Sleep(1 * time.Second)
}
close(sendCh)
}()
// 模拟处理接收的数据
go func() {
for data := range receiveCh {
fmt.Printf("接收到数据: %s\n", data.Content)
}
}()
select {}
}