近期项目里有需要用到RabbitMq实现一些业务,学习整理之后在此记录一下,如有问题或者不对的地方,欢迎留言指正。
 public class RabbitMQProvider
    {
        private readonly string _ipAddress;
        private readonly int? _port;
        private readonly string _username;
        private readonly string _password;
        public RabbitMQProvider()
        {
            _ipAddress = ConfigurationHelper.GetKey("RabbitMQIPAddress") ?? throw new ArgumentException("IP地址未配置!");
            _username = ConfigurationHelper.GetKey("RabbitMQUserName") ?? throw new ArgumentException("用户名不能为空");
            _password = ConfigurationHelper.GetKey("RabbitMQPassword") ?? throw new ArgumentException("密码不能为空");
            var timeApan = new TimeSpan(0, 5, 0);
            if (ConnectionFactory == null)
            {
                ConnectionFactory = new ConnectionFactory//创建连接工厂对象
                {
                    HostName = _ipAddress,//IP地址
                    UserName = _username,//用户账号
                    Password = _password,//用户密码
                    //启用自动连接恢复
                    AutomaticRecoveryEnabled = true,
                    //VirtualHost = "/mqtest",//RabbitMQ中要请求的VirtualHost名称
                    ContinuationTimeout = timeApan,
                    HandshakeContinuationTimeout = timeApan,
                    RequestedConnectionTimeout = timeApan,
                    SocketReadTimeout = timeApan,
                    SocketWriteTimeout = timeApan,
                    //启用异步消费
                    DispatchConsumersAsync = true,
                    //RequestedChannelMax = 5000
                };
            }
        }
        public ConnectionFactory ConnectionFactory { get; }
        private static IConnection connection;
        /// 
        /// 获取RabbitMQ连接对象方法(创建与RabbitMQ的连接)
        ///  
        ///  
 public class RabbitMQPublisher : IPublisher
    {
        static int x_message_ttl;
        static RabbitMQPublisher()
        {
            int.TryParse(ConfigurationHelper.GetKey("RabbitMQ_x-message-ttl"), out x_message_ttl);
            x_message_ttl = x_message_ttl * 60 * 1000;
        }
        #region
        private readonly RabbitMQProvider _provider;
        private IConnection _connection;
        public RabbitMQPublisher(RabbitMQProvider provider)
        {
            try
            {
                _provider = provider;
                //if (_connection == null || !_connection.IsOpen)
                //{
                //    _connection = _provider.ConnectionFactory.CreateConnection();
                //}
                _connection = _provider.GetConnection();
                _channel = _provider.GetChannel();
            }
            catch (Exception ex)
            {
                //记录异常日志
                Util.LogError($"RabbitMQPublisher createConnection exception. Exception message:{ex.Message}");
            }
        }
        public IConnection Connection
        {
            get
            {
                
                if (_connection != null)
                    return _connection;
                return _connection = _provider.GetConnection(); ;
            }
            //get; set;
        }
        private IModel _channel;
        public IModel Channel
        {
            get
            {
                if (_channel != null)
                    return _channel;
                else
                {
                    //if (_connection == null || !_connection.IsOpen)
                    //{
                    //    _connection = _provider.GetConnection(); ;
                    //}
                    return _channel = _provider.GetChannel();
                }
            }
        }
        /// 
        /// 释放资源
        ///  
        public void Dispose()
        {
            if (Channel != null)
            {
                if (Channel.IsOpen)
                    Channel.Close();
                Channel.Abort();
                Channel.Dispose();
            }
            if (Connection != null)
            {
                if (Connection.IsOpen)
                    Connection.Close();
            }
        }
        #endregion
} 
////// 发布(生产)消息 /// /// 消息内容 /// 交换机名称 /// 队列名称 /// 交换机类型 /// 路由键 /// 是否持久化 /// 是否自动删除 /// 用于插件和代理特定功能,如消息TTL、队列长度限制等 /// 1.x-message-ttl 发送到队列的消息在丢弃之前可以存活多长时间(毫秒)。 /// 2.x-expires 队列在被自动删除(毫秒)之前可以使用多长时间。 /// 3.x-max-length 队列在开始从头部删除之前可以包含多少就绪消息。 /// 4.x-max-length-bytes 队列在开始从头部删除之前可以包含的就绪消息的总体大小。 /// 5.x-dead-letter-exchange 设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称。 /// 6.x-dead-letter-routing-key 可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,将使用消息的原始路由密钥。 /// 7.x-max-priority 队列支持的最大优先级数; 如果未设置,队列将不支持消息优先级。 /// 8.x-queue-mode 将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息。 /// 9.x-queue-master-locator 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。 private Task Publish(string message, string exchangeName, string queueName, string exchangeType, string routingKey = null, bool durable = true, bool autoDelete = false, IDictionaryarguments = null) { if (x_message_ttl > 0) { arguments = new Dictionary (); arguments.Add("x-message-ttl", x_message_ttl); } //声明交换机 Channel.ExchangeDeclare(exchangeName, exchangeType, durable, autoDelete, arguments); //声明队列 Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments); Channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey == null ? string.Empty : routingKey); var msgByte = Encoding.UTF8.GetBytes(message); //设置消息持久化 var props = Channel.CreateBasicProperties(); props.Persistent = true; try { Channel.TxSelect(); Channel.BasicPublish ( exchange: exchangeName, routingKey: routingKey == null ? string.Empty : routingKey, mandatory: false, basicProperties: props, body: msgByte ); Channel.TxCommit(); } catch (Exception ex) { Channel.TxRollback(); //记录异常日志 Util.LogError($"RabbitMQPublisher publish message exception. Exception message:{ex.Message}"); } return Task.FromResult(0); } 
////// 批量发布 /// /// /// /// /// /// /// /// /// ///private async Task PublishAsyncBatch(List message, string exchangeName, string queueName, string exchangeType, string routingKey = null, bool durable = true, bool autoDelete = false, IDictionary arguments = null) { //using (var conn = _provider.ConnectionFactory.CreateConnection()) //{ using (var channel = Connection.CreateModel()) { ///Console.WriteLine(1); if (x_message_ttl > 0) { arguments = new Dictionary (); arguments.Add("x-message-ttl", x_message_ttl); } //声明交换机 Channel.ExchangeDeclare(exchangeName, exchangeType, durable, autoDelete, arguments); //声明队列 Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments); Channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey == null ? string.Empty : routingKey); //设置消息持久化 var props = Channel.CreateBasicProperties(); props.Persistent = true; try { Channel.TxSelect(); var basicPublishBatch = Channel.CreateBasicPublishBatch(); byte[] msgByte; ReadOnlyMemory memory; foreach (var msg in message) { msgByte = Encoding.UTF8.GetBytes(msg); memory = new ReadOnlyMemory (msgByte); basicPublishBatch.Add ( exchange: exchangeName, routingKey: routingKey == null ? string.Empty : routingKey, mandatory: false, properties: props, body: memory ); } basicPublishBatch.Publish(); Channel.TxCommit(); await Task.Yield(); } catch (Exception ex) { Channel.TxRollback(); channel.Close(); channel.Dispose(); //conn.Close(); conn.Dispose(); //记录异常日志 Util.LogError($"RabbitMQPublisher publish message exception. Exception message:{ex.Message}"); Console.WriteLine("消息订阅时错误:" + ex.Message); } } } 
注意:多线程消息发布时,应避免多个线程使用同一个IModel实例,必须保证Imodel被一个线程独享,如果必须要多个线程访问呢一个实例的话,则可以通过加锁来处理,详见:.NET/C# Client API Guide — RabbitMQ
IModel ch = RetrieveSomeSharedIModelInstance();
lock (ch) {
  ch.BasicPublish(...);
} 
1、获取连接、交换机和队列同上消息发布,不再赘述
private void QueueInitialization(string queueName, bool durable = true, bool autoDelete = false, IDictionaryarguments = null) { try { if (x_message_ttl > 0) { arguments = new Dictionary (); arguments.Add("x-message-ttl", x_message_ttl); } Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments); } catch (Exception ) { } } 
////// /// /// /// /// /// 每次消费的消息条数 ///private async Task SubscribeAsync(string queueName, Func callback, bool autoAck, ushort consumPerTimes = 1) { try { QueueInitialization(queueName); //声明为手动确认,每次只消费1条消息。 Channel.BasicQos(0, consumPerTimes, false); //定义消费者 //var consumer = new EventingBasicConsumer(Channel); var consumer = new AsyncEventingBasicConsumer(Channel); //接收事件 consumer.Received += async (eventSender, args) => { var message = args.Body.ToArray();//接收到的消息 var res = callback(Encoding.UTF8.GetString(message)); //返回消息确认 Channel.BasicAck(args.DeliveryTag, res); await Task.Yield(); }; //开启监听 -- gai2023-11-1 Channel.BasicConsume(queueName, autoAck, consumer); // await Task.Delay(1000); } catch (Exception e) { Console.WriteLine("消息订阅时错误:" + e.Message); } } 
internal class SubscribeWorker : BackgroundService
    {
        #region override
        public override Task StartAsync(CancellationToken cancellationToken)
        {
            try
            {
                //一些数据初始化
                _logger.LogInformation($"Settings 初始化完成");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
            }
            return base.StartAsync(cancellationToken);
        }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //这里注意,不能写在while里,否则会一直进行重复订阅,会导致连接数一直增长
            await MainSubscribe();
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await Task.Delay(2000);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, ex.Message);
                }
                
            }
        }
        /// 
        /// 服务停止
        ///  
        /// 
        ///