123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- using System;
- using System.Collections.Generic;
- using System.Text;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using System.Threading;
- using Infrastructure.Model;
- using Base;
- using Common;
- using Util;
- using Infrastructure;
- using Services;
- namespace MySystem
- {
- public class RabbitMQClient
- {
- public readonly static RabbitMQClient Instance = new RabbitMQClient();
- string UserName,Password,HostName,VirtualHostName;
- private RabbitMQClient()
- {
- MqSettings mqSettings = new();
- AppSettings.Bind("MqSettings", mqSettings);
- UserName = mqSettings.UserName;
- Password = mqSettings.Password;
- HostName = mqSettings.HostName;
- VirtualHostName = mqSettings.VirtualHostName;
- }
- #region 单对单接收
- public static IConnection _connection;
- public void CreateConn()
- {
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
- RequestedHeartbeat = TimeSpan.FromMinutes(1),
- VirtualHost = VirtualHostName,
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- _connection = factory.CreateConnection(p);
- }
- public void StartReceive(string QueueName, string Exchange = "", string RoutingKey = "")
- {
- if (_connection == null)
- {
- CreateConn();
- }
- else if (!_connection.IsOpen)
- {
- CreateConn();
- }
- var channel = _connection.CreateModel();
- var queue = channel.QueueDeclare(QueueName, true, false, false, null);
- channel.QueueBind(QueueName, Exchange, RoutingKey);
- channel.BasicQos(0, 1, false);
- EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
- consumer.Received += (a, e) =>
- {
- string MsgContent = Encoding.Default.GetString(e.Body.ToArray());
- try
- {
- Function.WriteLog(DateTime.Now.ToString() + "-" + MsgContent, "接收mq数据队列");
- if(QueueName == "QUEUE_KXS_RETRY_MACHINE_PRIZE_CONFIG_DIVISION")
- {
- PrizeDo.addPrize(QueueName, MsgContent);
- }
- else
- {
- PrizeDo.sendPrize(QueueName, MsgContent);
- }
- Function.WriteLog(DateTime.Now.ToString() + "-end", "接收mq数据队列");
- }
- catch(Exception ex)
- {
- Function.WriteLog(DateTime.Now + "\n" + ex.ToString() + "\n" + MsgContent, "发奖异常");
- }
- channel.BasicAck(e.DeliveryTag, false); //收到回复后,RabbitMQ会直接在队列中删除这条消息
- Function.WritePage("/count/", QueueName + ".txt", queue.MessageCount.ToString());
- };
- channel.BasicConsume(QueueName, false, consumer);
- }
- public void Start()
- {
- var setService = App.GetService<IPriPrizeInSetService>();
- var list = setService.GetList(m => m.status == 1);
- foreach(var sub in list)
- {
- if(sub.mqQueueName == "QUEUE_KXS_MACHINE_PRIZE_CONFIG_DIVISION" || sub.mqQueueName == "QUEUE_KXS_HAODA_PRIZE_CONFIG_DIVISION")
- {
- for (int i = 0; i < 8; i++)
- {
- StartReceive(sub.mqQueueName, "kxs_direct_ranch", "/");
- }
- }
- else
- {
- StartReceive(sub.mqQueueName, "kxs_direct_ranch", "/");
- }
- }
- StartReceive("QUEUE_KXS_RETRY_MACHINE_PRIZE_CONFIG_DIVISION", "kxs_direct_ranch", "/");
- }
- #endregion
- }
- }
|