设计架构
- 主线程:负责初始化网络环境、创建其他线程,并且监控整个系统的状态,如检测网络中断并发起重连操作。
- 接收线程:专门负责从网络套接字接收数据。将接收到的数据放入一个线程安全的队列中。
- 发送线程:从另一个线程安全队列获取待发送的数据,并通过网络套接字发送出去。
- 数据处理线程:从接收队列取出数据进行处理,处理完后将需要发送的响应数据放入发送队列。
关键算法
- 线程安全队列:使用互斥锁(
std::mutex
)和条件变量(std::condition_variable
)实现线程安全的队列。当队列空时,接收线程等待条件变量通知;当队列满时,发送线程等待条件变量通知。例如:
template<typename T>
class ThreadSafeQueue {
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
public:
void push(T item) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(std::move(item));
lock.unlock();
cond_.notify_one();
}
bool pop(T& item) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return!queue_.empty(); });
if (queue_.empty()) return false;
item = std::move(queue_.front());
queue_.pop();
return true;
}
};
- 网络重连机制:在主线程中定期检测网络连接状态(如通过ping操作或者尝试发送心跳包)。当检测到网络中断时,关闭现有套接字,等待一段时间后尝试重新连接。可以使用指数退避算法来控制重连间隔,避免短时间内频繁重连耗尽资源。例如:
int reconnect_delay = 1; // 初始重连延迟1秒
while (!is_connected()) {
try {
// 尝试重连
connect_socket();
is_connected = true;
} catch(...) {
std::this_thread::sleep_for(std::chrono::seconds(reconnect_delay));
reconnect_delay = std::min(32, reconnect_delay * 2); // 指数退避
}
}
- 数据一致性保证:在发送端,为每个发送的数据块添加序列号。接收端维护一个已接收序列号的记录,丢弃重复的数据包。对于乱序到达的数据包,缓存起来,等待缺失的数据包到达后再按序处理。在网络中断恢复后,发送端重新发送未确认的数据块,接收端根据序列号判断哪些数据已接收,哪些需要重新接收,从而保证数据一致性。