MST

星途 面试题库

面试题:网络编程之Python Socket专家难度题

在一个复杂的分布式系统中,使用Python的Socket进行网络通信。要求设计一种可靠的消息传输机制,保证消息不丢失、不重复,并且能够处理网络抖动、节点故障等异常情况。请详细描述整体设计方案,包括消息的封装格式、确认机制、重传策略以及如何与Socket编程相结合,并给出关键的代码设计框架。
40.5万 热度难度
后端开发网络编程

知识考点

AI 面试

面试题答案

一键面试

整体设计方案

  1. 消息封装格式
    • 为每个消息添加唯一标识符(如UUID),用于去重。
    • 包含消息类型字段,标识消息用途。
    • 消息体字段存放实际数据。
    • 可以使用JSON格式进行封装,例如:
{
    "message_id": "unique_uuid",
    "message_type": "data_update",
    "message_body": "actual data"
}
  1. 确认机制
    • 接收方接收到消息后,解析出message_id,向发送方发送确认消息(ACK),ACK消息同样封装message_id
    • 发送方维护一个已发送消息的字典,记录消息发送时间和是否收到ACK。
  2. 重传策略
    • 发送方设置一个重传定时器,当发送消息后,启动定时器。
    • 如果在定时器超时前未收到ACK,从已发送消息字典中取出该消息进行重传,并重置定时器。
    • 重传次数可设置上限,达到上限若仍未收到ACK,则进行相应错误处理(如记录日志、通知管理员等)。
  3. 与Socket编程相结合
    • 使用TCP协议的Socket,因为TCP本身提供可靠的字节流传输,能在一定程度上处理网络抖动。
    • 在发送端,将封装好的消息通过sendall方法发送。
    • 在接收端,使用recv方法接收数据,解析消息后发送ACK。

关键代码设计框架

发送端

import socket
import json
import uuid
import time


def send_message(sock, message_type, message_body):
    message = {
        "message_id": str(uuid.uuid4()),
        "message_type": message_type,
        "message_body": message_body
    }
    json_message = json.dumps(message).encode('utf-8')
    sent_messages = {}
    max_retries = 3
    retry_delay = 1
    while True:
        try:
            sock.sendall(json_message)
            sent_messages[message["message_id"]] = {"time": time.time(), "ack_received": False}
            while not sent_messages[message["message_id"]]["ack_received"]:
                if time.time() - sent_messages[message["message_id"]]["time"] > retry_delay:
                    if sent_messages[message["message_id"]].get("retries", 0) < max_retries:
                        sock.sendall(json_message)
                        sent_messages[message["message_id"]]["retries"] = sent_messages[message["message_id"]].get(
                            "retries", 0) + 1
                        sent_messages[message["message_id"]]["time"] = time.time()
                    else:
                        print(f"Max retries reached for message {message['message_id']}")
                        break
                time.sleep(0.1)
            break
        except socket.error as e:
            print(f"Socket error: {e}")


# 创建TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 10000)
sock.connect(server_address)

send_message(sock, "test_type", "test_body")

sock.close()

接收端

import socket
import json


def receive_message(sock):
    received_messages = set()
    while True:
        try:
            data = sock.recv(1024)
            if not data:
                break
            json_data = json.loads(data.decode('utf-8'))
            message_id = json_data["message_id"]
            if message_id not in received_messages:
                received_messages.add(message_id)
                print(f"Received message: {json_data}")
                ack_message = json.dumps({"message_id": message_id, "message_type": "ack"}).encode('utf-8')
                sock.sendall(ack_message)
        except socket.error as e:
            print(f"Socket error: {e}")
            break
        except json.JSONDecodeError as e:
            print(f"JSON decode error: {e}")


# 创建TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 10000)
sock.bind(server_address)
sock.listen(1)

while True:
    connection, client_address = sock.accept()
    receive_message(connection)
    connection.close()