1. 设计思路
- 连接管理:使用
net
包建立TCP连接,每个节点作为一个服务器监听特定端口,并可以作为客户端连接其他节点。
- 数据同步:通过自定义协议,在数据前添加长度字段,确保数据完整性。使用
sync.Mutex
保护共享资源,如连接池。
- 错误恢复:设置连接超时,对于丢包和网络延迟,使用重试机制。如果节点断开连接,尝试重新连接。
2. Go代码实现
package main
import (
"bytes"
"encoding/binary"
"fmt"
"net"
"sync"
"time"
)
const (
bufferSize = 1024
maxRetries = 3
timeout = 5 * time.Second
)
// ConnectionManager 管理连接
type ConnectionManager struct {
nodeID string
address string
connPool map[string]net.Conn
mutex sync.Mutex
}
// NewConnectionManager 创建新的连接管理器
func NewConnectionManager(nodeID, address string) *ConnectionManager {
return &ConnectionManager{
nodeID: nodeID,
address: address,
connPool: make(map[string]net.Conn),
}
}
// Connect 连接到其他节点
func (cm *ConnectionManager) Connect(targetID, targetAddress string) (net.Conn, error) {
cm.mutex.Lock()
defer cm.mutex.Unlock()
if conn, ok := cm.connPool[targetID]; ok {
return conn, nil
}
var err error
for i := 0; i < maxRetries; i++ {
conn, err = net.DialTimeout("tcp", targetAddress, timeout)
if err == nil {
cm.connPool[targetID] = conn
return conn, nil
}
time.Sleep(time.Second)
}
return nil, err
}
// SendData 通过连接发送数据
func SendData(conn net.Conn, data []byte) error {
length := uint32(len(data))
var buf bytes.Buffer
err := binary.Write(&buf, binary.BigEndian, length)
if err != nil {
return err
}
buf.Write(data)
_, err = conn.Write(buf.Bytes())
return err
}
// ReceiveData 从连接接收数据
func ReceiveData(conn net.Conn) ([]byte, error) {
lengthBuf := make([]byte, 4)
_, err := conn.Read(lengthBuf)
if err != nil {
return nil, err
}
length := binary.BigEndian.Uint32(lengthBuf)
data := make([]byte, length)
_, err = conn.Read(data)
if err != nil {
return nil, err
}
return data, nil
}
// DataSyncService 数据同步服务
func DataSyncService(cm *ConnectionManager, dataCh <-chan []byte, targetID, targetAddress string) {
for data := range dataCh {
conn, err := cm.Connect(targetID, targetAddress)
if err != nil {
fmt.Printf("Failed to connect to %s: %v\n", targetID, err)
continue
}
err = SendData(conn, data)
if err != nil {
fmt.Printf("Failed to send data to %s: %v\n", targetID, err)
}
}
}
func main() {
// 示例使用
cm := NewConnectionManager("node1", "127.0.0.1:8081")
dataCh := make(chan []byte)
go DataSyncService(cm, dataCh, "node2", "127.0.0.1:8082")
// 模拟发送数据
data := []byte("Hello, distributed system!")
dataCh <- data
time.Sleep(2 * time.Second)
close(dataCh)
}
3. 代码说明
- ConnectionManager:管理节点之间的连接,提供连接池和连接方法。
- SendData 和 ReceiveData:通过自定义协议(先发送数据长度,再发送数据)在连接上安全地发送和接收数据。
- DataSyncService:从
dataCh
接收数据,并通过连接管理器连接目标节点并发送数据。
- main 函数:示例演示如何使用连接管理器和数据同步服务。