架构设计
- 事件驱动模型:使用
epoll
来管理多个套接字的事件,epoll
具有高效的事件通知机制,适用于高并发场景。
- 超时管理:为每个非阻塞I/O操作维护一个独立的超时时间。可以使用一个结构体数组或链表来存储每个操作的相关信息,包括套接字描述符、操作类型(读或写)、超时时间等。
- 定时器:使用
timerfd
来实现定时器功能,定期检查是否有操作超时。
代码实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <time.h>
#define MAX_EVENTS 10
#define BUFFER_SIZE 1024
// 定义操作信息结构体
typedef struct {
int fd;
int type; // 0: 读, 1: 写
int timeout;
struct timespec start_time;
} IoOperation;
// 错误处理函数
void handle_error(const char *msg) {
perror(msg);
exit(EXIT_FAILURE);
}
int main() {
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
handle_error("epoll_create1");
}
int timer_fd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timer_fd == -1) {
handle_error("timerfd_create");
}
struct itimerspec new_value;
new_value.it_interval.tv_sec = 1; // 每秒检查一次超时
new_value.it_interval.tv_nsec = 0;
new_value.it_value.tv_sec = 1;
new_value.it_value.tv_nsec = 0;
if (timerfd_settime(timer_fd, 0, &new_value, NULL) == -1) {
handle_error("timerfd_settime");
}
struct epoll_event events[MAX_EVENTS];
// 假设已有多个套接字,将它们添加到epoll中
int sock1 = socket(AF_INET, SOCK_STREAM, 0);
if (sock1 == -1) {
handle_error("socket");
}
// 设置为非阻塞
int flags = fcntl(sock1, F_GETFL, 0);
if (fcntl(sock1, F_SETFL, flags | O_NONBLOCK) == -1) {
handle_error("fcntl");
}
struct epoll_event ev;
ev.data.fd = sock1;
ev.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock1, &ev) == -1) {
handle_error("epoll_ctl");
}
// 假设有另一个套接字
int sock2 = socket(AF_INET, SOCK_STREAM, 0);
if (sock2 == -1) {
handle_error("socket");
}
flags = fcntl(sock2, F_GETFL, 0);
if (fcntl(sock2, F_SETFL, flags | O_NONBLOCK) == -1) {
handle_error("fcntl");
}
ev.data.fd = sock2;
ev.events = EPOLLOUT | EPOLLET;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock2, &ev) == -1) {
handle_error("epoll_ctl");
}
// 假设这里初始化操作信息数组
IoOperation operations[2];
operations[0].fd = sock1;
operations[0].type = 0; // 读
operations[0].timeout = 5; // 5秒超时
clock_gettime(CLOCK_MONOTONIC, &operations[0].start_time);
operations[1].fd = sock2;
operations[1].type = 1; // 写
operations[1].timeout = 3; // 3秒超时
clock_gettime(CLOCK_MONOTONIC, &operations[1].start_time);
while (1) {
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
handle_error("epoll_wait");
}
for (int i = 0; i < nfds; ++i) {
if (events[i].data.fd == timer_fd) {
uint64_t exp;
ssize_t s = read(timer_fd, &exp, sizeof(uint64_t));
if (s != sizeof(uint64_t)) {
handle_error("read timerfd");
}
// 检查操作是否超时
for (int j = 0; j < 2; ++j) {
struct timespec current_time;
clock_gettime(CLOCK_MONOTONIC, ¤t_time);
long elapsed_sec = current_time.tv_sec - operations[j].start_time.tv_sec;
long elapsed_nsec = current_time.tv_nsec - operations[j].start_time.tv_nsec;
if (elapsed_sec > operations[j].timeout || (elapsed_sec == operations[j].timeout && elapsed_nsec > 0)) {
// 超时处理
printf("Operation on fd %d timed out\n", operations[j].fd);
// 例如关闭套接字等操作
close(operations[j].fd);
}
}
} else {
int fd = events[i].data.fd;
char buffer[BUFFER_SIZE];
ssize_t read_bytes;
if (events[i].events & EPOLLIN) {
read_bytes = recv(fd, buffer, sizeof(buffer), 0);
if (read_bytes == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("recv");
close(fd);
}
} else if (read_bytes == 0) {
// 对端关闭连接
printf("Connection closed by peer on fd %d\n", fd);
close(fd);
} else {
buffer[read_bytes] = '\0';
printf("Received: %s on fd %d\n", buffer, fd);
}
} else if (events[i].events & EPOLLOUT) {
const char *msg = "Hello, server!";
ssize_t write_bytes = send(fd, msg, strlen(msg), 0);
if (write_bytes == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("send");
close(fd);
}
} else {
printf("Sent: %s on fd %d\n", msg, fd);
}
}
}
}
}
close(epoll_fd);
close(timer_fd);
return 0;
}
说明
- 架构部分:
- 使用
epoll
管理套接字事件,采用边缘触发(EPOLLET
)模式,提高事件处理效率。
timerfd
实现定时器功能,每秒检查一次是否有操作超时。
- 用
IoOperation
结构体数组来存储每个I/O操作的相关信息,包括套接字描述符、操作类型和超时时间,并记录操作开始时间用于超时判断。
- 代码部分:
- 初始化
epoll
实例和timerfd
。
- 创建并设置两个非阻塞套接字,添加到
epoll
实例中。
- 在
epoll_wait
循环中,当timerfd
有事件时,检查各个操作是否超时。
- 当套接字有可读(
EPOLLIN
)或可写(EPOLLOUT
)事件时,进行相应的读写操作,并处理可能的错误。
- 注意在实际应用中,需要根据具体需求进一步完善错误处理、资源释放等操作。