面试题答案
一键面试利用消息队列特性实现生产者 - 消费者模型同步机制
- 思路:
- 生产者向消息队列中发送消息,消费者从消息队列中接收消息。消息队列本身就具有一定的同步性,因为多个进程对其操作时,系统会保证操作的原子性。
- 生产者在发送消息时,要确保消息队列有足够的空间。消费者在接收消息时,要确保队列中有消息。
- 关键代码片段:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#define MSG_SIZE 1024
struct msgbuf {
long mtype;
char mtext[MSG_SIZE];
};
// 生产者
void producer(int msqid) {
struct msgbuf buf;
buf.mtype = 1;
strcpy(buf.mtext, "Hello, Consumer!");
if (msgsnd(msqid, &buf, strlen(buf.mtext) + 1, 0) == -1) {
perror("msgsnd");
exit(1);
}
}
// 消费者
void consumer(int msqid) {
struct msgbuf buf;
if (msgrcv(msqid, &buf, MSG_SIZE, 1, 0) == -1) {
perror("msgrcv");
exit(1);
}
printf("Received: %s\n", buf.mtext);
}
在主函数中:
int main() {
key_t key;
int msqid;
if ((key = ftok(".", 'a')) == -1) {
perror("ftok");
exit(1);
}
if ((msqid = msgget(key, IPC_CREAT | 0666)) == -1) {
perror("msgget");
exit(1);
}
// 创建子进程模拟生产者和消费者
pid_t pid = fork();
if (pid == -1) {
perror("fork");
exit(1);
} else if (pid == 0) {
consumer(msqid);
} else {
producer(msqid);
wait(NULL);
// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl");
exit(1);
}
}
return 0;
}
实现异步消息处理
- 思路:
- 使用多线程或多进程来实现异步。以多线程为例,主线程可以作为消息的接收者,将接收到的消息放入一个任务队列中。然后启动多个工作线程,这些工作线程从任务队列中取出消息并进行处理。
- 为了保护任务队列的访问,需要使用互斥锁(mutex)。同时,为了通知工作线程有新消息到来,可以使用条件变量(condition variable)。
- 关键代码片段:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#define MSG_SIZE 1024
struct msgbuf {
long mtype;
char mtext[MSG_SIZE];
};
struct task {
struct msgbuf data;
struct task *next;
};
struct task *task_queue = NULL;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 工作线程处理函数
void* worker(void* arg) {
while (1) {
struct task *cur_task;
pthread_mutex_lock(&mutex);
while (task_queue == NULL) {
pthread_cond_wait(&cond, &mutex);
}
cur_task = task_queue;
task_queue = task_queue->next;
pthread_mutex_unlock(&mutex);
printf("Worker received: %s\n", cur_task->data.mtext);
free(cur_task);
}
return NULL;
}
// 主线程接收消息
void main_thread(int msqid) {
struct msgbuf buf;
while (1) {
if (msgrcv(msqid, &buf, MSG_SIZE, 1, 0) == -1) {
perror("msgrcv");
break;
}
struct task *new_task = (struct task*)malloc(sizeof(struct task));
new_task->data = buf;
new_task->next = NULL;
pthread_mutex_lock(&mutex);
if (task_queue == NULL) {
task_queue = new_task;
} else {
struct task *cur = task_queue;
while (cur->next != NULL) {
cur = cur->next;
}
cur->next = new_task;
}
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
}
在主函数中:
int main() {
key_t key;
int msqid;
if ((key = ftok(".", 'a')) == -1) {
perror("ftok");
exit(1);
}
if ((msqid = msgget(key, IPC_CREAT | 0666)) == -1) {
perror("msgget");
exit(1);
}
pthread_t tid;
pthread_create(&tid, NULL, worker, NULL);
main_thread(msqid);
pthread_cancel(tid);
pthread_join(tid, NULL);
// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl");
exit(1);
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
return 0;
}