使用RabbitMQ处理简单并发任务的基本步骤
- 引入依赖:在
pom.xml
文件中添加RabbitMQ相关依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置RabbitMQ:在
application.properties
文件中配置RabbitMQ连接信息。
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
- 消息队列的初始化:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public Queue taskQueue() {
return new Queue("taskQueue");
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("taskExchange");
}
@Bean
public Binding binding(Queue taskQueue, TopicExchange exchange) {
return BindingBuilder.bind(taskQueue).to(exchange).with("task.#");
}
}
- 生产者发送消息:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) {
rabbitTemplate.convertAndSend("taskExchange", "task.message", message);
}
}
- 消费者接收消息并处理:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TaskConsumer {
@RabbitListener(queues = "taskQueue")
public void receive(String message) {
System.out.println("Received message: " + message);
// 处理具体任务逻辑
}
}
说明
- 依赖引入:通过
spring-boot-starter-amqp
引入Spring Boot对RabbitMQ的支持。
- 配置:在
application.properties
中配置RabbitMQ服务器地址、端口、用户名和密码。
- 初始化:使用Spring AMQP的Java配置方式创建队列、交换机并绑定它们。
- 生产者:
TaskProducer
通过RabbitTemplate
将消息发送到指定的交换机和路由键。
- 消费者:
TaskConsumer
使用@RabbitListener
注解监听指定队列,接收到消息后执行具体处理逻辑。
使用ActiveMQ处理简单并发任务的基本步骤
- 引入依赖:在
pom.xml
文件中添加ActiveMQ相关依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
- 配置ActiveMQ:在
application.properties
文件中配置ActiveMQ连接信息。
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
- 消息队列的初始化:
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.Queue;
@Configuration
public class ActiveMQConfig {
@Bean
public Queue taskQueue() {
return new ActiveMQQueue("taskQueue");
}
}
- 生产者发送消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class TaskProducer {
@Autowired
private JmsTemplate jmsTemplate;
public void send(String message) {
jmsTemplate.send("taskQueue", session -> session.createTextMessage(message));
}
}
- 消费者接收消息并处理:
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class TaskConsumer {
@JmsListener(destination = "taskQueue")
public void receive(String message) {
System.out.println("Received message: " + message);
// 处理具体任务逻辑
}
}
说明
- 依赖引入:通过
spring-boot-starter-activemq
引入Spring Boot对ActiveMQ的支持。
- 配置:在
application.properties
中配置ActiveMQ服务器地址、用户名和密码。
- 初始化:创建一个
ActiveMQQueue
实例作为任务队列。
- 生产者:
TaskProducer
通过JmsTemplate
将消息发送到指定队列。
- 消费者:
TaskConsumer
使用@JmsListener
注解监听指定队列,接收到消息后执行具体处理逻辑。