整体设计方案
- 消息封装格式:
- 为每个消息添加唯一标识符(如UUID),用于去重。
- 包含消息类型字段,标识消息用途。
- 消息体字段存放实际数据。
- 可以使用JSON格式进行封装,例如:
{
"message_id": "unique_uuid",
"message_type": "data_update",
"message_body": "actual data"
}
- 确认机制:
- 接收方接收到消息后,解析出
message_id
,向发送方发送确认消息(ACK),ACK消息同样封装message_id
。
- 发送方维护一个已发送消息的字典,记录消息发送时间和是否收到ACK。
- 重传策略:
- 发送方设置一个重传定时器,当发送消息后,启动定时器。
- 如果在定时器超时前未收到ACK,从已发送消息字典中取出该消息进行重传,并重置定时器。
- 重传次数可设置上限,达到上限若仍未收到ACK,则进行相应错误处理(如记录日志、通知管理员等)。
- 与Socket编程相结合:
- 使用TCP协议的Socket,因为TCP本身提供可靠的字节流传输,能在一定程度上处理网络抖动。
- 在发送端,将封装好的消息通过
sendall
方法发送。
- 在接收端,使用
recv
方法接收数据,解析消息后发送ACK。
关键代码设计框架
发送端
import socket
import json
import uuid
import time
def send_message(sock, message_type, message_body):
message = {
"message_id": str(uuid.uuid4()),
"message_type": message_type,
"message_body": message_body
}
json_message = json.dumps(message).encode('utf-8')
sent_messages = {}
max_retries = 3
retry_delay = 1
while True:
try:
sock.sendall(json_message)
sent_messages[message["message_id"]] = {"time": time.time(), "ack_received": False}
while not sent_messages[message["message_id"]]["ack_received"]:
if time.time() - sent_messages[message["message_id"]]["time"] > retry_delay:
if sent_messages[message["message_id"]].get("retries", 0) < max_retries:
sock.sendall(json_message)
sent_messages[message["message_id"]]["retries"] = sent_messages[message["message_id"]].get(
"retries", 0) + 1
sent_messages[message["message_id"]]["time"] = time.time()
else:
print(f"Max retries reached for message {message['message_id']}")
break
time.sleep(0.1)
break
except socket.error as e:
print(f"Socket error: {e}")
# 创建TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 10000)
sock.connect(server_address)
send_message(sock, "test_type", "test_body")
sock.close()
接收端
import socket
import json
def receive_message(sock):
received_messages = set()
while True:
try:
data = sock.recv(1024)
if not data:
break
json_data = json.loads(data.decode('utf-8'))
message_id = json_data["message_id"]
if message_id not in received_messages:
received_messages.add(message_id)
print(f"Received message: {json_data}")
ack_message = json.dumps({"message_id": message_id, "message_type": "ack"}).encode('utf-8')
sock.sendall(ack_message)
except socket.error as e:
print(f"Socket error: {e}")
break
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
# 创建TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 10000)
sock.bind(server_address)
sock.listen(1)
while True:
connection, client_address = sock.accept()
receive_message(connection)
connection.close()