MST
星途 面试题库

面试题:C#中RabbitMQ集成时消息的可靠性投递机制

在C#使用RabbitMQ作为消息队列时,为确保消息可靠投递到队列,阐述生产者端和消费者端需要采取哪些措施,例如确认机制、事务机制等,并给出具体的C#代码示例及说明。
26.2万 热度难度
编程语言C#

知识考点

AI 面试

面试题答案

一键面试

生产者端措施及代码示例

  1. 确认机制(Publisher Confirms)
    • 原理:生产者将信道设置为确认模式,一旦信道进入确认模式,所有在该信道上发布的消息都会被指派一个唯一的ID(从1开始)。当消息被投递到所有匹配的队列后,RabbitMQ会发送一个确认(Basic.Ack)给生产者,包含消息的唯一ID。如果RabbitMQ因为某些原因不能处理消息,会发送一个拒绝确认(Basic.Nack)。
    • 代码示例
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

class Producer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            // 开启确认模式
            channel.ConfirmSelect();

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello",
                                 autoAck: true,
                                 consumer: consumer);

            var message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                                 routingKey: "hello",
                                 basicProperties: null,
                                 body: body);

            // 等待确认
            channel.WaitForConfirmsOrDie();
            Console.WriteLine(" [x] Sent {0}", message);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}
  • 说明:首先创建连接和信道,声明队列。通过channel.ConfirmSelect()开启确认模式。发布消息后,使用channel.WaitForConfirmsOrDie()等待RabbitMQ的确认,如果消息成功投递则继续执行,否则会抛出异常。
  1. 事务机制
    • 原理:生产者通过将信道设置为事务模式(channel.TxSelect()),然后在发布消息前开启事务(channel.TxBegin()),发布消息后提交事务(channel.TxCommit())。如果在发布消息过程中出现异常,可以回滚事务(channel.TxRollback()),确保消息不会被错误地投递。
    • 代码示例
using RabbitMQ.Client;
using System;
using System.Text;

class ProducerTx
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            // 开启事务模式
            channel.TxSelect();

            try
            {
                var message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "",
                                     routingKey: "hello",
                                     basicProperties: null,
                                     body: body);

                // 提交事务
                channel.TxCommit();
                Console.WriteLine(" [x] Sent {0}", message);
            }
            catch (Exception ex)
            {
                // 回滚事务
                channel.TxRollback();
                Console.WriteLine(" [x] Transaction rolled back due to exception: {0}", ex.Message);
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}
  • 说明:开启事务模式后,在try块中发布消息并提交事务,如果出现异常在catch块中回滚事务,保证消息投递的可靠性。不过事务机制会降低性能,因为它是同步阻塞的,相比确认机制,确认机制是异步的,性能更好。

消费者端措施及代码示例

  1. 手动确认(Manual Ack)
    • 原理:消费者从队列中获取消息后,不会自动确认消息已被处理,而是需要显式地调用Basic.Ack方法来告诉RabbitMQ消息已被成功处理。如果消费者在处理消息过程中崩溃,RabbitMQ会将该消息重新放入队列,以便其他消费者可以处理。
    • 代码示例
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

class Consumer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);

                // 手动确认消息
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: "hello",
                                 autoAck: false,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}
  • 说明:创建消费者时,通过设置autoAck: false关闭自动确认。在接收到消息并处理后,调用channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false)手动确认消息,deliveryTag是RabbitMQ为每个消息分配的唯一标识,multiple设置为false表示只确认当前消息,如果设置为true则确认所有小于等于该deliveryTag的消息。
  1. 设置QoS(Quality of Service)
    • 原理:通过设置Basic.Qos,可以限制消费者在同一时间从队列中获取的消息数量。这样可以避免消费者处理能力不足时,积压过多消息在消费者端,保证消息处理的稳定性。
    • 代码示例
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

class ConsumerQos
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            // 设置QoS,一次最多获取1条消息
            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);

                // 手动确认消息
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: "hello",
                                 autoAck: false,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}
  • 说明channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false)设置prefetchCount为1,表示消费者在同一时间最多从队列中获取1条消息进行处理,prefetchSize设置为0表示不限制消息大小,global设置为false表示该设置仅应用于当前消费者。