using System; using System.Collections.Generic; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Threading; using Infrastructure.Model; using Base; using Common; using Util; using Infrastructure; using Services; namespace MySystem { public class RabbitMQClient { public readonly static RabbitMQClient Instance = new RabbitMQClient(); string UserName,Password,HostName,VirtualHostName; private RabbitMQClient() { MqSettings mqSettings = new(); AppSettings.Bind("MqSettings", mqSettings); UserName = mqSettings.UserName; Password = mqSettings.Password; HostName = mqSettings.HostName; VirtualHostName = mqSettings.VirtualHostName; } #region 单对单接收 public static IConnection _connection; public void CreateConn() { var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复 RequestedHeartbeat = TimeSpan.FromMinutes(1), VirtualHost = VirtualHostName, }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } _connection = factory.CreateConnection(p); } public void StartReceive(string QueueName, string Exchange = "", string RoutingKey = "") { if (_connection == null) { CreateConn(); } else if (!_connection.IsOpen) { CreateConn(); } var channel = _connection.CreateModel(); var queue = channel.QueueDeclare(QueueName, true, false, false, null); channel.QueueBind(QueueName, Exchange, RoutingKey); channel.BasicQos(0, 1, false); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { string MsgContent = Encoding.Default.GetString(e.Body.ToArray()); try { Function.WriteLog(DateTime.Now.ToString() + "-" + MsgContent, "接收mq数据队列"); if(QueueName == "QUEUE_KXS_RETRY_MACHINE_PRIZE_CONFIG_DIVISION") { PrizeDo.addPrize(QueueName, MsgContent); } else { PrizeDo.sendPrize(QueueName, MsgContent); } Function.WriteLog(DateTime.Now.ToString() + "-end", "接收mq数据队列"); } catch(Exception ex) { Function.WriteLog(DateTime.Now + "\n" + ex.ToString() + "\n" + MsgContent, "发奖异常"); } channel.BasicAck(e.DeliveryTag, false); //收到回复后,RabbitMQ会直接在队列中删除这条消息 Function.WritePage("/count/", QueueName + ".txt", queue.MessageCount.ToString()); }; channel.BasicConsume(QueueName, false, consumer); } public void Start() { var setService = App.GetService(); var list = setService.GetList(m => m.status == 1); foreach(var sub in list) { if(sub.mqQueueName == "QUEUE_KXS_MACHINE_PRIZE_CONFIG_DIVISION" || sub.mqQueueName == "QUEUE_KXS_HAODA_PRIZE_CONFIG_DIVISION") { for (int i = 0; i < 8; i++) { StartReceive(sub.mqQueueName, "kxs_direct_ranch", "/"); } } else { StartReceive(sub.mqQueueName, "kxs_direct_ranch", "/"); } } StartReceive("QUEUE_KXS_RETRY_MACHINE_PRIZE_CONFIG_DIVISION", "kxs_direct_ranch", "/"); } #endregion } }