近期项目里有需要用到RabbitMq实现一些业务,学习整理之后在此记录一下,如有问题或者不对的地方,欢迎留言指正。
public class RabbitMQProvider { private readonly string _ipAddress; private readonly int? _port; private readonly string _username; private readonly string _password; public RabbitMQProvider() { _ipAddress = ConfigurationHelper.GetKey("RabbitMQIPAddress") ?? throw new ArgumentException("IP地址未配置!"); _username = ConfigurationHelper.GetKey("RabbitMQUserName") ?? throw new ArgumentException("用户名不能为空"); _password = ConfigurationHelper.GetKey("RabbitMQPassword") ?? throw new ArgumentException("密码不能为空"); var timeApan = new TimeSpan(0, 5, 0); if (ConnectionFactory == null) { ConnectionFactory = new ConnectionFactory//创建连接工厂对象 { HostName = _ipAddress,//IP地址 UserName = _username,//用户账号 Password = _password,//用户密码 //启用自动连接恢复 AutomaticRecoveryEnabled = true, //VirtualHost = "/mqtest",//RabbitMQ中要请求的VirtualHost名称 ContinuationTimeout = timeApan, HandshakeContinuationTimeout = timeApan, RequestedConnectionTimeout = timeApan, SocketReadTimeout = timeApan, SocketWriteTimeout = timeApan, //启用异步消费 DispatchConsumersAsync = true, //RequestedChannelMax = 5000 }; } } public ConnectionFactory ConnectionFactory { get; } private static IConnection connection; ////// 获取RabbitMQ连接对象方法(创建与RabbitMQ的连接) /// ///public IConnection GetConnection() { if (connection == null || !connection.IsOpen) { //通过工厂创建连接对象 connection = ConnectionFactory.CreateConnection(); } return connection; } int times = 0; private static IModel Channel; public IModel GetChannel() { if (Channel != null) return Channel; else { //times++; // Console.WriteLine($"CreateModel{times}次"); return GetConnection().CreateModel(); } } }
public class RabbitMQPublisher : IPublisher { static int x_message_ttl; static RabbitMQPublisher() { int.TryParse(ConfigurationHelper.GetKey("RabbitMQ_x-message-ttl"), out x_message_ttl); x_message_ttl = x_message_ttl * 60 * 1000; } #region private readonly RabbitMQProvider _provider; private IConnection _connection; public RabbitMQPublisher(RabbitMQProvider provider) { try { _provider = provider; //if (_connection == null || !_connection.IsOpen) //{ // _connection = _provider.ConnectionFactory.CreateConnection(); //} _connection = _provider.GetConnection(); _channel = _provider.GetChannel(); } catch (Exception ex) { //记录异常日志 Util.LogError($"RabbitMQPublisher createConnection exception. Exception message:{ex.Message}"); } } public IConnection Connection { get { if (_connection != null) return _connection; return _connection = _provider.GetConnection(); ; } //get; set; } private IModel _channel; public IModel Channel { get { if (_channel != null) return _channel; else { //if (_connection == null || !_connection.IsOpen) //{ // _connection = _provider.GetConnection(); ; //} return _channel = _provider.GetChannel(); } } } ////// 释放资源 /// public void Dispose() { if (Channel != null) { if (Channel.IsOpen) Channel.Close(); Channel.Abort(); Channel.Dispose(); } if (Connection != null) { if (Connection.IsOpen) Connection.Close(); } } #endregion }
////// 发布(生产)消息 /// /// 消息内容 /// 交换机名称 /// 队列名称 /// 交换机类型 /// 路由键 /// 是否持久化 /// 是否自动删除 /// 用于插件和代理特定功能,如消息TTL、队列长度限制等 /// 1.x-message-ttl 发送到队列的消息在丢弃之前可以存活多长时间(毫秒)。 /// 2.x-expires 队列在被自动删除(毫秒)之前可以使用多长时间。 /// 3.x-max-length 队列在开始从头部删除之前可以包含多少就绪消息。 /// 4.x-max-length-bytes 队列在开始从头部删除之前可以包含的就绪消息的总体大小。 /// 5.x-dead-letter-exchange 设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称。 /// 6.x-dead-letter-routing-key 可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,将使用消息的原始路由密钥。 /// 7.x-max-priority 队列支持的最大优先级数; 如果未设置,队列将不支持消息优先级。 /// 8.x-queue-mode 将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息。 /// 9.x-queue-master-locator 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。 private Task Publish(string message, string exchangeName, string queueName, string exchangeType, string routingKey = null, bool durable = true, bool autoDelete = false, IDictionaryarguments = null) { if (x_message_ttl > 0) { arguments = new Dictionary (); arguments.Add("x-message-ttl", x_message_ttl); } //声明交换机 Channel.ExchangeDeclare(exchangeName, exchangeType, durable, autoDelete, arguments); //声明队列 Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments); Channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey == null ? string.Empty : routingKey); var msgByte = Encoding.UTF8.GetBytes(message); //设置消息持久化 var props = Channel.CreateBasicProperties(); props.Persistent = true; try { Channel.TxSelect(); Channel.BasicPublish ( exchange: exchangeName, routingKey: routingKey == null ? string.Empty : routingKey, mandatory: false, basicProperties: props, body: msgByte ); Channel.TxCommit(); } catch (Exception ex) { Channel.TxRollback(); //记录异常日志 Util.LogError($"RabbitMQPublisher publish message exception. Exception message:{ex.Message}"); } return Task.FromResult(0); }
////// 批量发布 /// /// /// /// /// /// /// /// /// ///private async Task PublishAsyncBatch(List message, string exchangeName, string queueName, string exchangeType, string routingKey = null, bool durable = true, bool autoDelete = false, IDictionary arguments = null) { //using (var conn = _provider.ConnectionFactory.CreateConnection()) //{ using (var channel = Connection.CreateModel()) { ///Console.WriteLine(1); if (x_message_ttl > 0) { arguments = new Dictionary (); arguments.Add("x-message-ttl", x_message_ttl); } //声明交换机 Channel.ExchangeDeclare(exchangeName, exchangeType, durable, autoDelete, arguments); //声明队列 Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments); Channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey == null ? string.Empty : routingKey); //设置消息持久化 var props = Channel.CreateBasicProperties(); props.Persistent = true; try { Channel.TxSelect(); var basicPublishBatch = Channel.CreateBasicPublishBatch(); byte[] msgByte; ReadOnlyMemory memory; foreach (var msg in message) { msgByte = Encoding.UTF8.GetBytes(msg); memory = new ReadOnlyMemory (msgByte); basicPublishBatch.Add ( exchange: exchangeName, routingKey: routingKey == null ? string.Empty : routingKey, mandatory: false, properties: props, body: memory ); } basicPublishBatch.Publish(); Channel.TxCommit(); await Task.Yield(); } catch (Exception ex) { Channel.TxRollback(); channel.Close(); channel.Dispose(); //conn.Close(); conn.Dispose(); //记录异常日志 Util.LogError($"RabbitMQPublisher publish message exception. Exception message:{ex.Message}"); Console.WriteLine("消息订阅时错误:" + ex.Message); } } }
注意:多线程消息发布时,应避免多个线程使用同一个IModel实例,必须保证Imodel被一个线程独享,如果必须要多个线程访问呢一个实例的话,则可以通过加锁来处理,详见:.NET/C# Client API Guide — RabbitMQ
IModel ch = RetrieveSomeSharedIModelInstance(); lock (ch) { ch.BasicPublish(...); }
1、获取连接、交换机和队列同上消息发布,不再赘述
private void QueueInitialization(string queueName, bool durable = true, bool autoDelete = false, IDictionaryarguments = null) { try { if (x_message_ttl > 0) { arguments = new Dictionary (); arguments.Add("x-message-ttl", x_message_ttl); } Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments); } catch (Exception ) { } }
////// /// /// /// /// /// 每次消费的消息条数 ///private async Task SubscribeAsync(string queueName, Func callback, bool autoAck, ushort consumPerTimes = 1) { try { QueueInitialization(queueName); //声明为手动确认,每次只消费1条消息。 Channel.BasicQos(0, consumPerTimes, false); //定义消费者 //var consumer = new EventingBasicConsumer(Channel); var consumer = new AsyncEventingBasicConsumer(Channel); //接收事件 consumer.Received += async (eventSender, args) => { var message = args.Body.ToArray();//接收到的消息 var res = callback(Encoding.UTF8.GetString(message)); //返回消息确认 Channel.BasicAck(args.DeliveryTag, res); await Task.Yield(); }; //开启监听 -- gai2023-11-1 Channel.BasicConsume(queueName, autoAck, consumer); // await Task.Delay(1000); } catch (Exception e) { Console.WriteLine("消息订阅时错误:" + e.Message); } }
internal class SubscribeWorker : BackgroundService { #region override public override Task StartAsync(CancellationToken cancellationToken) { try { //一些数据初始化 _logger.LogInformation($"Settings 初始化完成"); } catch (Exception ex) { _logger.LogError(ex, ex.Message); } return base.StartAsync(cancellationToken); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { //这里注意,不能写在while里,否则会一直进行重复订阅,会导致连接数一直增长 await MainSubscribe(); while (!stoppingToken.IsCancellationRequested) { try { await Task.Delay(2000); } catch (Exception ex) { _logger.LogError(ex, ex.Message); } } } ////// 服务停止 /// /// ///public override Task StopAsync(CancellationToken cancellationToken) { Task.WaitAll(); subscriber.Dispose(); _logger.LogInformation("Worker stop at: {time}", DateTimeOffset.Now); return base.StopAsync(cancellationToken); } #endregion }