Java 端配置与消息发送
- 引入依赖:在
pom.xml
中添加 RocketMQ 客户端依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>[具体版本号]</version>
</dependency>
- 生产者配置:创建生产者实例并配置名称服务器地址等。
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("namesrvAddr:9876");
producer.start();
- 消息格式转换与发送:将消息对象转换为字节数组发送。例如,若消息为 JSON 格式,先序列化为 JSON 字符串再转字节数组。
String messageContent = "{\"key\":\"value\"}";
Message msg = new Message("topic", "tag", messageContent.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
Python 端配置与消息接收
- 安装依赖:使用
pip install rocketmq-client-python
安装 RocketMQ Python 客户端。
- 消费者配置:创建消费者实例并配置名称服务器地址、消费组等。
from rocketmq.client import PushConsumer
consumer = PushConsumer("consumerGroup")
consumer.set_namesrv_addr("namesrvAddr:9876")
consumer.subscribe("topic", "*")
- 消息格式转换与接收:接收消息后,根据发送端的格式进行转换。若发送端为 JSON 格式,接收到字节数组后转字符串再解析为 JSON 对象。
def callback(msg):
message_str = msg.body.decode('utf-8')
import json
json_obj = json.loads(message_str)
print("Received message:", json_obj)
consumer.register_message_listener(callback)
consumer.start()
关键要点
- 名称服务器配置:确保 Java 和 Python 端配置的名称服务器地址一致,以保证能正确连接到 RocketMQ 集群。
- 消息格式约定:在跨语言集成时,必须预先约定好消息格式,如 JSON、Protobuf 等,并在两端按照约定进行转换,避免消息解析错误。
- 集群模式:若采用集群模式,需确保 Java 和 Python 端的生产者组与消费者组名称配置正确,以实现负载均衡和消息的有序消费等功能。