MST

星途 面试题库

面试题:C语言实现Linux消息队列的同步与异步机制

在Linux中使用C语言开发消息队列应用,假设需要实现一个生产者 - 消费者模型,如何利用消息队列的特性结合C语言来实现同步机制以确保数据的正确处理,同时,如果要实现异步消息处理,又该如何设计和实现,阐述具体的思路和关键代码片段。
41.0万 热度难度
编程语言C

知识考点

AI 面试

面试题答案

一键面试

利用消息队列特性实现生产者 - 消费者模型同步机制

  1. 思路
    • 生产者向消息队列中发送消息,消费者从消息队列中接收消息。消息队列本身就具有一定的同步性,因为多个进程对其操作时,系统会保证操作的原子性。
    • 生产者在发送消息时,要确保消息队列有足够的空间。消费者在接收消息时,要确保队列中有消息。
  2. 关键代码片段
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>

#define MSG_SIZE 1024
struct msgbuf {
    long mtype;
    char mtext[MSG_SIZE];
};

// 生产者
void producer(int msqid) {
    struct msgbuf buf;
    buf.mtype = 1;
    strcpy(buf.mtext, "Hello, Consumer!");
    if (msgsnd(msqid, &buf, strlen(buf.mtext) + 1, 0) == -1) {
        perror("msgsnd");
        exit(1);
    }
}

// 消费者
void consumer(int msqid) {
    struct msgbuf buf;
    if (msgrcv(msqid, &buf, MSG_SIZE, 1, 0) == -1) {
        perror("msgrcv");
        exit(1);
    }
    printf("Received: %s\n", buf.mtext);
}

在主函数中:

int main() {
    key_t key;
    int msqid;
    if ((key = ftok(".", 'a')) == -1) {
        perror("ftok");
        exit(1);
    }
    if ((msqid = msgget(key, IPC_CREAT | 0666)) == -1) {
        perror("msgget");
        exit(1);
    }
    // 创建子进程模拟生产者和消费者
    pid_t pid = fork();
    if (pid == -1) {
        perror("fork");
        exit(1);
    } else if (pid == 0) {
        consumer(msqid);
    } else {
        producer(msqid);
        wait(NULL);
        // 删除消息队列
        if (msgctl(msqid, IPC_RMID, NULL) == -1) {
            perror("msgctl");
            exit(1);
        }
    }
    return 0;
}

实现异步消息处理

  1. 思路
    • 使用多线程或多进程来实现异步。以多线程为例,主线程可以作为消息的接收者,将接收到的消息放入一个任务队列中。然后启动多个工作线程,这些工作线程从任务队列中取出消息并进行处理。
    • 为了保护任务队列的访问,需要使用互斥锁(mutex)。同时,为了通知工作线程有新消息到来,可以使用条件变量(condition variable)。
  2. 关键代码片段
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>

#define MSG_SIZE 1024
struct msgbuf {
    long mtype;
    char mtext[MSG_SIZE];
};

struct task {
    struct msgbuf data;
    struct task *next;
};

struct task *task_queue = NULL;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

// 工作线程处理函数
void* worker(void* arg) {
    while (1) {
        struct task *cur_task;
        pthread_mutex_lock(&mutex);
        while (task_queue == NULL) {
            pthread_cond_wait(&cond, &mutex);
        }
        cur_task = task_queue;
        task_queue = task_queue->next;
        pthread_mutex_unlock(&mutex);
        printf("Worker received: %s\n", cur_task->data.mtext);
        free(cur_task);
    }
    return NULL;
}

// 主线程接收消息
void main_thread(int msqid) {
    struct msgbuf buf;
    while (1) {
        if (msgrcv(msqid, &buf, MSG_SIZE, 1, 0) == -1) {
            perror("msgrcv");
            break;
        }
        struct task *new_task = (struct task*)malloc(sizeof(struct task));
        new_task->data = buf;
        new_task->next = NULL;
        pthread_mutex_lock(&mutex);
        if (task_queue == NULL) {
            task_queue = new_task;
        } else {
            struct task *cur = task_queue;
            while (cur->next != NULL) {
                cur = cur->next;
            }
            cur->next = new_task;
        }
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
    }
}

在主函数中:

int main() {
    key_t key;
    int msqid;
    if ((key = ftok(".", 'a')) == -1) {
        perror("ftok");
        exit(1);
    }
    if ((msqid = msgget(key, IPC_CREAT | 0666)) == -1) {
        perror("msgget");
        exit(1);
    }
    pthread_t tid;
    pthread_create(&tid, NULL, worker, NULL);
    main_thread(msqid);
    pthread_cancel(tid);
    pthread_join(tid, NULL);
    // 删除消息队列
    if (msgctl(msqid, IPC_RMID, NULL) == -1) {
        perror("msgctl");
        exit(1);
    }
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    return 0;
}