MST

星途 面试题库

面试题:微服务架构中消息队列在解耦服务间依赖场景下的原理及实现

在微服务架构里,假设存在用户注册服务和通知服务。用户注册成功后需通知用户,若使用消息队列来解耦这两个服务,请阐述消息队列是如何工作的,以及在代码层面一般如何实现这种解耦(可简要说明使用的消息队列技术如RabbitMQ、Kafka的相关操作)。
11.8万 热度难度
后端开发微服务架构

知识考点

AI 面试

面试题答案

一键面试

消息队列工作原理

  1. 解耦流程

    • 当用户注册服务完成用户注册操作后,它会向消息队列发送一条消息,这条消息中包含用户注册成功的相关信息,例如用户ID、用户名等。
    • 通知服务则从消息队列中订阅相应主题(Topic)或队列(Queue)的消息。一旦消息队列中有新消息,通知服务会接收到该消息,并根据消息内容执行通知用户的操作,比如发送邮件、短信等。
    • 这样,用户注册服务和通知服务就通过消息队列实现了解耦,它们不需要直接调用对方的接口,降低了彼此之间的依赖关系。
  2. 优点

    • 异步处理:用户注册服务发送消息后可以立即返回,无需等待通知服务处理完成,提高了用户注册服务的响应速度。
    • 削峰填谷:如果在短时间内有大量用户注册,消息队列可以缓存这些消息,通知服务按照自己的处理能力从队列中依次获取消息进行处理,避免通知服务因瞬间高流量而崩溃。

代码层面实现(以RabbitMQ为例)

  1. 引入依赖:在用户注册服务和通知服务的项目中引入RabbitMQ的相关依赖。例如在Maven项目中添加如下依赖:
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.2</version>
</dependency>
  1. 用户注册服务发送消息
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class UserRegistrationService {
    private static final String QUEUE_NAME = "user_registration_notification";

    public void registerUser(String userId, String userName) {
        // 假设这里完成用户注册逻辑

        // 发送消息到RabbitMQ
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "User " + userName + " (ID: " + userId + ") has registered.";
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  1. 通知服务接收消息
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;

public class NotificationService {
    private static final String QUEUE_NAME = "user_registration_notification";

    public void startListening() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            channel.basicConsume(QUEUE_NAME, true, "myConsumerTag",
                    (consumerTag, delivery) -> {
                        String message = new String(delivery.getBody(), "UTF-8");
                        System.out.println(" [x] Received '" + message + "'");
                        // 执行通知逻辑,如发送邮件、短信等
                    },
                    consumerTag -> { });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

代码层面实现(以Kafka为例)

  1. 引入依赖:在用户注册服务和通知服务的项目中引入Kafka的相关依赖。例如在Maven项目中添加如下依赖:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.3.1</version>
</dependency>
  1. 用户注册服务发送消息
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;

public class UserRegistrationService {
    private static final String TOPIC = "user_registration_notification";

    public void registerUser(String userId, String userName) {
        // 假设这里完成用户注册逻辑

        // 发送消息到Kafka
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            String message = "User " + userName + " (ID: " + userId + ") has registered.";
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
            producer.send(record);
            System.out.println(" [x] Sent '" + message + "'");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  1. 通知服务接收消息
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class NotificationService {
    private static final String TOPIC = "user_registration_notification";

    public void startListening() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "notification_group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList(TOPIC));
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(record -> {
                    System.out.println(" [x] Received '" + record.value() + "'");
                    // 执行通知逻辑,如发送邮件、短信等
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}