实现思路
- 共享资源:使用一个数组作为共享缓冲区,生产者向其中写入数据,消费者从其中读取数据。
- 条件变量:用于通知消费者有新数据可用,或者通知生产者缓冲区有空间可写。
- 互斥锁:保护共享资源,确保在同一时间只有一个线程访问缓冲区。
- 超时等待机制:使用
pthread_cond_timedwait
函数实现条件变量的超时等待,避免无限期等待。
关键代码片段
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#define BUFFER_SIZE 5
int buffer[BUFFER_SIZE];
int in = 0;
int out = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_producer;
pthread_cond_t cond_consumer;
// 生产者线程函数
void* producer(void* arg) {
int item = 0;
while (1) {
pthread_mutex_lock(&mutex);
while ((in + 1) % BUFFER_SIZE == out) {
// 缓冲区满,等待
pthread_cond_wait(&cond_producer, &mutex);
}
buffer[in] = item++;
printf("Produced: %d\n", buffer[in]);
in = (in + 1) % BUFFER_SIZE;
pthread_cond_signal(&cond_consumer);
pthread_mutex_unlock(&mutex);
sleep(1);
}
return NULL;
}
// 消费者线程函数
void* consumer(void* arg) {
while (1) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 2; // 设置超时时间为2秒
pthread_mutex_lock(&mutex);
int ret = pthread_cond_timedwait(&cond_consumer, &mutex, &ts);
if (ret == ETIMEDOUT) {
printf("Consumer timed out\n");
pthread_mutex_unlock(&mutex);
continue;
}
while (in == out) {
// 缓冲区空,等待
pthread_cond_wait(&cond_consumer, &mutex);
}
int item = buffer[out];
printf("Consumed: %d\n", item);
out = (out + 1) % BUFFER_SIZE;
pthread_cond_signal(&cond_producer);
pthread_mutex_unlock(&mutex);
sleep(1);
}
return NULL;
}
主函数
int main() {
pthread_t tid_producer, tid_consumer;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond_producer, NULL);
pthread_cond_init(&cond_consumer, NULL);
pthread_create(&tid_producer, NULL, producer, NULL);
pthread_create(&tid_consumer, NULL, consumer, NULL);
pthread_join(tid_producer, NULL);
pthread_join(tid_consumer, NULL);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond_producer);
pthread_cond_destroy(&cond_consumer);
return 0;
}