using System; using System.Collections.Generic; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Threading; using Infrastructure.Model; using Base; using Common; using Util; namespace MySystem { public class RabbitMQClient { public readonly static RabbitMQClient Instance = new RabbitMQClient(); string UserName,Password,HostName,VirtualHostName; private RabbitMQClient() { MqSettings mqSettings = new(); AppSettings.Bind("MqSettings", mqSettings); UserName = mqSettings.UserName; Password = mqSettings.Password; HostName = mqSettings.HostName; VirtualHostName = mqSettings.VirtualHostName; } #region 单对单接收 public static IConnection _connection; public void CreateConn() { var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复 RequestedHeartbeat = TimeSpan.FromMinutes(1), VirtualHost = VirtualHostName, }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } _connection = factory.CreateConnection(p); } public void StartReceive(string QueueName, string Exchange = "", string RoutingKey = "") { if (_connection == null) { CreateConn(); } else if (!_connection.IsOpen) { CreateConn(); } var channel = _connection.CreateModel(); channel.QueueBind(QueueName, Exchange, RoutingKey); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { // string MsgContent = Encoding.Default.GetString(e.Body.ToArray()); // Function.WriteLog(DateTime.Now.ToString() + "\n" + MsgContent + "\n\n", "接收mq数据队列"); // PrizeDo.sendPrize(MsgContent); channel.BasicAck(e.DeliveryTag, false); //收到回复后,RabbitMQ会直接在队列中删除这条消息 }; channel.BasicConsume(QueueName, false, consumer); } #endregion } }