面试题答案
一键面试消息队列工作原理
-
解耦流程
- 当用户注册服务完成用户注册操作后,它会向消息队列发送一条消息,这条消息中包含用户注册成功的相关信息,例如用户ID、用户名等。
- 通知服务则从消息队列中订阅相应主题(Topic)或队列(Queue)的消息。一旦消息队列中有新消息,通知服务会接收到该消息,并根据消息内容执行通知用户的操作,比如发送邮件、短信等。
- 这样,用户注册服务和通知服务就通过消息队列实现了解耦,它们不需要直接调用对方的接口,降低了彼此之间的依赖关系。
-
优点
- 异步处理:用户注册服务发送消息后可以立即返回,无需等待通知服务处理完成,提高了用户注册服务的响应速度。
- 削峰填谷:如果在短时间内有大量用户注册,消息队列可以缓存这些消息,通知服务按照自己的处理能力从队列中依次获取消息进行处理,避免通知服务因瞬间高流量而崩溃。
代码层面实现(以RabbitMQ为例)
- 引入依赖:在用户注册服务和通知服务的项目中引入RabbitMQ的相关依赖。例如在Maven项目中添加如下依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
- 用户注册服务发送消息
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class UserRegistrationService {
private static final String QUEUE_NAME = "user_registration_notification";
public void registerUser(String userId, String userName) {
// 假设这里完成用户注册逻辑
// 发送消息到RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "User " + userName + " (ID: " + userId + ") has registered.";
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 通知服务接收消息
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;
public class NotificationService {
private static final String QUEUE_NAME = "user_registration_notification";
public void startListening() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicConsume(QUEUE_NAME, true, "myConsumerTag",
(consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 执行通知逻辑,如发送邮件、短信等
},
consumerTag -> { });
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码层面实现(以Kafka为例)
- 引入依赖:在用户注册服务和通知服务的项目中引入Kafka的相关依赖。例如在Maven项目中添加如下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
- 用户注册服务发送消息
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;
public class UserRegistrationService {
private static final String TOPIC = "user_registration_notification";
public void registerUser(String userId, String userName) {
// 假设这里完成用户注册逻辑
// 发送消息到Kafka
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
String message = "User " + userName + " (ID: " + userId + ") has registered.";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
producer.send(record);
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 通知服务接收消息
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class NotificationService {
private static final String TOPIC = "user_registration_notification";
public void startListening() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "notification_group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(TOPIC));
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println(" [x] Received '" + record.value() + "'");
// 执行通知逻辑,如发送邮件、短信等
});
}
} catch (Exception e) {
e.printStackTrace();
}
}
}