MST

星途 面试题库

面试题:Go Channel数据发送与接收的复杂场景处理

假设有一个分布式系统,多个节点通过网络连接,每个节点都有一个本地的Go程序,需要通过Channel在不同节点间安全且高效地发送和接收数据。网络可能存在延迟、丢包等问题。请设计一个完整的解决方案,包括如何处理连接管理、数据同步、错误恢复等问题,并用Go代码实现关键部分逻辑。
44.5万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

1. 设计思路

  1. 连接管理:使用 net 包建立TCP连接,每个节点作为一个服务器监听特定端口,并可以作为客户端连接其他节点。
  2. 数据同步:通过自定义协议,在数据前添加长度字段,确保数据完整性。使用 sync.Mutex 保护共享资源,如连接池。
  3. 错误恢复:设置连接超时,对于丢包和网络延迟,使用重试机制。如果节点断开连接,尝试重新连接。

2. Go代码实现

package main

import (
    "bytes"
    "encoding/binary"
    "fmt"
    "net"
    "sync"
    "time"
)

const (
    bufferSize = 1024
    maxRetries = 3
    timeout    = 5 * time.Second
)

// ConnectionManager 管理连接
type ConnectionManager struct {
    nodeID   string
    address  string
    connPool map[string]net.Conn
    mutex    sync.Mutex
}

// NewConnectionManager 创建新的连接管理器
func NewConnectionManager(nodeID, address string) *ConnectionManager {
    return &ConnectionManager{
        nodeID:   nodeID,
        address:  address,
        connPool: make(map[string]net.Conn),
    }
}

// Connect 连接到其他节点
func (cm *ConnectionManager) Connect(targetID, targetAddress string) (net.Conn, error) {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()

    if conn, ok := cm.connPool[targetID]; ok {
        return conn, nil
    }

    var err error
    for i := 0; i < maxRetries; i++ {
        conn, err = net.DialTimeout("tcp", targetAddress, timeout)
        if err == nil {
            cm.connPool[targetID] = conn
            return conn, nil
        }
        time.Sleep(time.Second)
    }
    return nil, err
}

// SendData 通过连接发送数据
func SendData(conn net.Conn, data []byte) error {
    length := uint32(len(data))
    var buf bytes.Buffer
    err := binary.Write(&buf, binary.BigEndian, length)
    if err != nil {
        return err
    }
    buf.Write(data)

    _, err = conn.Write(buf.Bytes())
    return err
}

// ReceiveData 从连接接收数据
func ReceiveData(conn net.Conn) ([]byte, error) {
    lengthBuf := make([]byte, 4)
    _, err := conn.Read(lengthBuf)
    if err != nil {
        return nil, err
    }
    length := binary.BigEndian.Uint32(lengthBuf)

    data := make([]byte, length)
    _, err = conn.Read(data)
    if err != nil {
        return nil, err
    }
    return data, nil
}

// DataSyncService 数据同步服务
func DataSyncService(cm *ConnectionManager, dataCh <-chan []byte, targetID, targetAddress string) {
    for data := range dataCh {
        conn, err := cm.Connect(targetID, targetAddress)
        if err != nil {
            fmt.Printf("Failed to connect to %s: %v\n", targetID, err)
            continue
        }
        err = SendData(conn, data)
        if err != nil {
            fmt.Printf("Failed to send data to %s: %v\n", targetID, err)
        }
    }
}

func main() {
    // 示例使用
    cm := NewConnectionManager("node1", "127.0.0.1:8081")
    dataCh := make(chan []byte)

    go DataSyncService(cm, dataCh, "node2", "127.0.0.1:8082")

    // 模拟发送数据
    data := []byte("Hello, distributed system!")
    dataCh <- data

    time.Sleep(2 * time.Second)
    close(dataCh)
}

3. 代码说明

  1. ConnectionManager:管理节点之间的连接,提供连接池和连接方法。
  2. SendDataReceiveData:通过自定义协议(先发送数据长度,再发送数据)在连接上安全地发送和接收数据。
  3. DataSyncService:从 dataCh 接收数据,并通过连接管理器连接目标节点并发送数据。
  4. main 函数:示例演示如何使用连接管理器和数据同步服务。