可能出现问题的场景
- 多个线程同时操作epoll实例:
- 当多个线程调用
epoll_ctl
函数向同一个epoll实例中添加、修改或删除文件描述符时,可能会导致数据竞争。因为epoll_ctl
的操作涉及到对内核数据结构的修改,如果多个线程同时进行这些操作,可能会使内核数据结构处于不一致的状态,导致程序崩溃或出现不可预测的行为。
- 例如,线程A正在向epoll实例中添加一个新的文件描述符,而线程B同时尝试修改该文件描述符的事件掩码,这两个操作同时进行可能会造成混乱。
- 共享的事件处理函数:
- 如果多个线程处理从epoll_wait获取的事件,并且共享事件处理函数,而这些函数中访问了共享资源(如全局变量、共享内存等),就可能出现线程安全问题。例如,一个事件处理函数用于更新全局计数器,多个线程同时处理事件调用该函数时,会导致计数器更新不一致。
解决方案及代码示例
方案一:互斥锁(Mutex)
- 原理:互斥锁用于保护共享资源,在同一时间只有一个线程能够获取锁,从而访问共享资源,其他线程必须等待锁的释放。
- 代码示例:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <unistd.h>
#define MAX_EVENTS 10
pthread_mutex_t epoll_mutex;
int epoll_fd;
void* thread_function(void* arg) {
// 模拟向epoll实例添加文件描述符
int fd = (int)(long)arg;
struct epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
pthread_mutex_lock(&epoll_mutex);
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
perror("epoll_ctl add");
}
pthread_mutex_unlock(&epoll_mutex);
return NULL;
}
int main() {
epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
return 1;
}
pthread_mutex_init(&epoll_mutex, NULL);
pthread_t threads[5];
for (int i = 0; i < 5; i++) {
int fd = i; // 模拟文件描述符
if (pthread_create(&threads[i], NULL, thread_function, (void*)(long)fd) != 0) {
perror("pthread_create");
return 1;
}
}
for (int i = 0; i < 5; i++) {
if (pthread_join(threads[i], NULL) != 0) {
perror("pthread_join");
return 1;
}
}
pthread_mutex_destroy(&epoll_mutex);
close(epoll_fd);
return 0;
}
方案二:读写锁(Read - Write Lock)
- 原理:读写锁允许多个线程同时进行读操作,但在写操作时,会独占锁,不允许其他线程进行读或写操作。这在对epoll实例的操作中,读操作(如
epoll_wait
)可以并发执行,而写操作(如epoll_ctl
)需要独占访问。
- 代码示例:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <unistd.h>
#define MAX_EVENTS 10
pthread_rwlock_t epoll_rwlock;
int epoll_fd;
void* read_thread_function(void* arg) {
struct epoll_event events[MAX_EVENTS];
int num_events;
pthread_rwlock_rdlock(&epoll_rwlock);
num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (num_events == -1) {
perror("epoll_wait");
}
pthread_rwlock_unlock(&epoll_rwlock);
// 处理事件
for (int i = 0; i < num_events; i++) {
// 模拟事件处理
printf("Thread %ld handling event on fd %d\n", pthread_self(), events[i].data.fd);
}
return NULL;
}
void* write_thread_function(void* arg) {
int fd = (int)(long)arg;
struct epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
pthread_rwlock_wrlock(&epoll_rwlock);
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
perror("epoll_ctl add");
}
pthread_rwlock_unlock(&epoll_rwlock);
return NULL;
}
int main() {
epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
return 1;
}
pthread_rwlock_init(&epoll_rwlock, NULL);
pthread_t read_threads[3];
for (int i = 0; i < 3; i++) {
if (pthread_create(&read_threads[i], NULL, read_thread_function, NULL) != 0) {
perror("pthread_create read");
return 1;
}
}
pthread_t write_threads[2];
for (int i = 0; i < 2; i++) {
int fd = i;
if (pthread_create(&write_threads[i], NULL, write_thread_function, (void*)(long)fd) != 0) {
perror("pthread_create write");
return 1;
}
}
for (int i = 0; i < 3; i++) {
if (pthread_join(read_threads[i], NULL) != 0) {
perror("pthread_join read");
return 1;
}
}
for (int i = 0; i < 2; i++) {
if (pthread_join(write_threads[i], NULL) != 0) {
perror("pthread_join write");
return 1;
}
}
pthread_rwlock_destroy(&epoll_rwlock);
close(epoll_fd);
return 0;
}