生产者端措施及代码示例
- 确认机制(Publisher Confirms):
- 原理:生产者将信道设置为确认模式,一旦信道进入确认模式,所有在该信道上发布的消息都会被指派一个唯一的ID(从1开始)。当消息被投递到所有匹配的队列后,RabbitMQ会发送一个确认(Basic.Ack)给生产者,包含消息的唯一ID。如果RabbitMQ因为某些原因不能处理消息,会发送一个拒绝确认(Basic.Nack)。
- 代码示例:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
class Producer
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
// 开启确认模式
channel.ConfirmSelect();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);
var message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
// 等待确认
channel.WaitForConfirmsOrDie();
Console.WriteLine(" [x] Sent {0}", message);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
- 说明:首先创建连接和信道,声明队列。通过
channel.ConfirmSelect()
开启确认模式。发布消息后,使用channel.WaitForConfirmsOrDie()
等待RabbitMQ的确认,如果消息成功投递则继续执行,否则会抛出异常。
- 事务机制:
- 原理:生产者通过将信道设置为事务模式(
channel.TxSelect()
),然后在发布消息前开启事务(channel.TxBegin()
),发布消息后提交事务(channel.TxCommit()
)。如果在发布消息过程中出现异常,可以回滚事务(channel.TxRollback()
),确保消息不会被错误地投递。
- 代码示例:
using RabbitMQ.Client;
using System;
using System.Text;
class ProducerTx
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
// 开启事务模式
channel.TxSelect();
try
{
var message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
// 提交事务
channel.TxCommit();
Console.WriteLine(" [x] Sent {0}", message);
}
catch (Exception ex)
{
// 回滚事务
channel.TxRollback();
Console.WriteLine(" [x] Transaction rolled back due to exception: {0}", ex.Message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
- 说明:开启事务模式后,在
try
块中发布消息并提交事务,如果出现异常在catch
块中回滚事务,保证消息投递的可靠性。不过事务机制会降低性能,因为它是同步阻塞的,相比确认机制,确认机制是异步的,性能更好。
消费者端措施及代码示例
- 手动确认(Manual Ack):
- 原理:消费者从队列中获取消息后,不会自动确认消息已被处理,而是需要显式地调用
Basic.Ack
方法来告诉RabbitMQ消息已被成功处理。如果消费者在处理消息过程中崩溃,RabbitMQ会将该消息重新放入队列,以便其他消费者可以处理。
- 代码示例:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
class Consumer
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
// 手动确认消息
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
- 说明:创建消费者时,通过设置
autoAck: false
关闭自动确认。在接收到消息并处理后,调用channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false)
手动确认消息,deliveryTag
是RabbitMQ为每个消息分配的唯一标识,multiple
设置为false
表示只确认当前消息,如果设置为true
则确认所有小于等于该deliveryTag
的消息。
- 设置QoS(Quality of Service):
- 原理:通过设置
Basic.Qos
,可以限制消费者在同一时间从队列中获取的消息数量。这样可以避免消费者处理能力不足时,积压过多消息在消费者端,保证消息处理的稳定性。
- 代码示例:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
class ConsumerQos
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
// 设置QoS,一次最多获取1条消息
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
// 手动确认消息
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
- 说明:
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false)
设置prefetchCount
为1,表示消费者在同一时间最多从队列中获取1条消息进行处理,prefetchSize
设置为0表示不限制消息大小,global
设置为false
表示该设置仅应用于当前消费者。