缓冲区管理以避免数据丢失和提高效率
- 动态缓冲区分配:使用动态内存分配(如
malloc
和free
)来根据实际需求分配缓冲区大小。对于网络套接字读操作,初始时可以分配一个适中大小的缓冲区,当接收到的数据量超过缓冲区大小时,重新分配更大的缓冲区。
- 环形缓冲区(Circular Buffer):环形缓冲区适合处理连续的数据流。它允许在缓冲区满时覆盖旧数据,避免数据丢失。通过维护读指针和写指针,实现高效的数据读写操作。
处理不同I/O操作间缓冲区的竞争问题
- 互斥锁(Mutex):在访问共享缓冲区时,使用互斥锁来确保同一时间只有一个线程或进程能够访问缓冲区。在进入临界区(访问缓冲区)前加锁,离开临界区时解锁。
- 读写锁(Read - Write Lock):如果大部分操作是读操作,可以使用读写锁。允许多个线程同时进行读操作,但写操作时需要独占访问,以防止数据不一致。
相关数据结构
- 环形缓冲区数据结构:
typedef struct {
char *buffer;
size_t size;
size_t read_index;
size_t write_index;
} CircularBuffer;
- 初始化环形缓冲区:
CircularBuffer* createCircularBuffer(size_t size) {
CircularBuffer *cb = (CircularBuffer*)malloc(sizeof(CircularBuffer));
if (cb == NULL) {
return NULL;
}
cb->buffer = (char*)malloc(size);
if (cb->buffer == NULL) {
free(cb);
return NULL;
}
cb->size = size;
cb->read_index = 0;
cb->write_index = 0;
return cb;
}
- 向环形缓冲区写入数据:
ssize_t writeToCircularBuffer(CircularBuffer *cb, const char *data, size_t len) {
size_t space_available = cb->size - ((cb->write_index >= cb->read_index)? (cb->write_index - cb->read_index) : (cb->size - (cb->read_index - cb->write_index)));
if (len > space_available) {
len = space_available;
}
size_t part1 = cb->size - cb->write_index;
if (len <= part1) {
memcpy(cb->buffer + cb->write_index, data, len);
cb->write_index += len;
} else {
memcpy(cb->buffer + cb->write_index, data, part1);
memcpy(cb->buffer, data + part1, len - part1);
cb->write_index = len - part1;
}
return len;
}
- 从环形缓冲区读取数据:
ssize_t readFromCircularBuffer(CircularBuffer *cb, char *data, size_t len) {
size_t data_available = (cb->write_index >= cb->read_index)? (cb->write_index - cb->read_index) : (cb->size - (cb->read_index - cb->write_index));
if (len > data_available) {
len = data_available;
}
size_t part1 = cb->size - cb->read_index;
if (len <= part1) {
memcpy(data, cb->buffer + cb->read_index, len);
cb->read_index += len;
} else {
memcpy(data, cb->buffer + cb->read_index, part1);
memcpy(data + part1, cb->buffer, len - part1);
cb->read_index = len - part1;
}
return len;
}
处理竞争问题关键代码
- 使用互斥锁:
#include <pthread.h>
pthread_mutex_t buffer_mutex = PTHREAD_MUTEX_INITIALIZER;
// 写操作
void* write_to_buffer(void* arg) {
CircularBuffer *cb = (CircularBuffer*)arg;
pthread_mutex_lock(&buffer_mutex);
writeToCircularBuffer(cb, "some data", strlen("some data"));
pthread_mutex_unlock(&buffer_mutex);
return NULL;
}
// 读操作
void* read_from_buffer(void* arg) {
CircularBuffer *cb = (CircularBuffer*)arg;
char data[1024];
pthread_mutex_lock(&buffer_mutex);
readFromCircularBuffer(cb, data, 1024);
pthread_mutex_unlock(&buffer_mutex);
return NULL;
}
- 使用读写锁:
#include <pthread.h>
pthread_rwlock_t buffer_rwlock = PTHREAD_RWLOCK_INITIALIZER;
// 写操作
void* write_to_buffer(void* arg) {
CircularBuffer *cb = (CircularBuffer*)arg;
pthread_rwlock_wrlock(&buffer_rwlock);
writeToCircularBuffer(cb, "some data", strlen("some data"));
pthread_rwlock_unlock(&buffer_rwlock);
return NULL;
}
// 读操作
void* read_from_buffer(void* arg) {
CircularBuffer *cb = (CircularBuffer*)arg;
char data[1024];
pthread_rwlock_rdlock(&buffer_rwlock);
readFromCircularBuffer(cb, data, 1024);
pthread_rwlock_unlock(&buffer_rwlock);
return NULL;
}