|
- using System;
- using System.Collections.Generic;
- using System.Text;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using Library;
- using System.Threading;
- namespace MySystem
- {
- public class RabbitMQClient
- {
- public readonly static RabbitMQClient Instance = new RabbitMQClient();
- string UserName,Password,HostName;
- private RabbitMQClient()
- {
- UserName = ConfigurationManager.AppSettings["MqUserName"].ToString();
- Password = ConfigurationManager.AppSettings["MqPassword"].ToString();
- HostName = ConfigurationManager.AppSettings["MqHostName"].ToString();
- }
- #region 单对单发送
- public void SendMsg(string content, string QueueName = "")
- {
- RedisDbconn.Instance.AddList(QueueName, content);
- //创建连接对象工厂
- // var factory = new ConnectionFactory()
- // {
- // UserName = UserName,
- // Password = Password,
- // AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- // TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
- // };
- // List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- // string[] HostNames = HostName.Split(',');
- // foreach (string subHostName in HostNames)
- // {
- // string[] subHostNameData = subHostName.Split(':');
- // p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- // }
- // var conn = factory.CreateConnection(p);
- // var channel = conn.CreateModel();
- // channel.QueueDeclare(QueueName, true, false, false);
- // channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
- // channel.Dispose();
- // conn.Dispose();
- }
- #endregion
- #region 单对单发送
- public void ListenSendMsg()
- {
- Thread th = new Thread(ListenSendMsgDo);
- th.IsBackground = true;
- th.Start();
- }
- Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
- public void ListenSendMsgDo()
- {
- while (true)
- {
- //创建连接对象工厂
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- bool op = true;
- while (op)
- {
- string data = RedisDbconn.Instance.RPop<string>("MainServerMq");
- if (!string.IsNullOrEmpty(data))
- {
- try
- {
- string[] dataList = data.Split("#cut#");
- string QueueName = dataList[0];
- if (!channels.ContainsKey(QueueName))
- {
- var channelCreate = conn.CreateModel();
- channels.Add(QueueName, channelCreate);
- }
- var channel = channels[QueueName];
- channel.QueueDeclare(QueueName, true, false, false);
- channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(dataList[1]));
- }
- catch (Exception ex)
- {
- op = false;
- function.WriteLog(DateTime.Now.ToString() + "\r\n" + ex.ToString(), "MQ消息队列单对单发送监听异常");
- }
- }
- }
- // channel.Dispose();
- conn.Dispose();
- function.WriteLog(DateTime.Now.ToString(), "MQ测试");
- }
- }
- #endregion
- #region 单对单接收
- public static IConnection _connection;
- public void CreateConn()
- {
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
- RequestedHeartbeat = TimeSpan.FromMinutes(1),
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- _connection = factory.CreateConnection(p);
- }
- public void StartReceive(string QueueName)
- {
- if (_connection == null)
- {
- CreateConn();
- }
- else if (!_connection.IsOpen)
- {
- CreateConn();
- }
- var channel = _connection.CreateModel();
- channel.QueueDeclare(QueueName, true, false, false);
- EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
- consumer.Received += (a, e) =>
- {
- string MsgContent = Encoding.Default.GetString(e.Body.ToArray());
- if (QueueName == "TimerStat")
- {
- JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- StatTimerService.Instance.Start(job);
- }
- else if (QueueName == "PosTradeStat")
- {
- // JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- // StatService.Instance.Start(job);
- }
- else if (QueueName == "SycnSpServer")
- {
- JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- SycnSpService.Instance.Start(job);
- }
- else if (QueueName == "CheckWeChatSign")
- {
- JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- CheckWeChatSignService.Instance.Start(job);
- }
- else if (QueueName == "CheckAlipaySign")
- {
- JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- CheckAlipaySignService.Instance.Start(job);
- }
- else if (QueueName == "WeChatPayBack")
- {
- JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- WeChatPayBackService.Instance.Start(job);
- }
- else if (QueueName == "AlipayPayBack")
- {
- JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- AlipayPayBackService.Instance.Start(job);
- }
- else if (QueueName == "AlipayPayBack2")
- {
- // JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- // AlipayPayBack2Service.Instance.Start(job);
- }
- else if (QueueName == "ConsumerOrdersDiviList")
- {
- JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- ReceiveProfitService.Instance.Start(job);
- }
- else if (QueueName == "ConsumerOrdersStat")
- {
- JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- ConsumerOrdersStatService.Instance.Start(job);
- }
- else if (QueueName == "ConsumerOrdersReturnDo")
- {
- JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- ConsumerOrdersReturnDoService.Instance.Start(job);
- }
- else if (QueueName == "ConsumerOrdersReturnStat")
- {
- JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- ConsumerOrdersReturnStatService.Instance.Start(job);
- }
- else if (QueueName == "MerchantConfirmList")
- {
- MerchantConfirmService.Instance.Start(MsgContent);
- }
- else if (QueueName == "DeleteMySqlData")
- {
- DeleteMySqlDataService.Instance.Start(MsgContent);
- }
- // else if (QueueName == "PublicMainServer")
- // {
- // JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- // ReceiveTaskService.Instance.Start(job);
- // }
- // else if (QueueName == "SycnTableData")
- // {
- // JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- // if (job.BrandInfo.DataType == 1)
- // {
- // //同步激活
- // SycnActiveRewardService.Instance.Start(job);
- // }
- // else if (job.BrandInfo.DataType == 2)
- // {
- // //同步交易
- // SycnTradeRecordService.Instance.Start(job);
- // }
- // else if (job.BrandInfo.DataType == 8)
- // {
- // //同步商户
- // SycnMerchantInfoService.Instance.Start(job);
- // }
- // }
- // else if (QueueName == "ProfitForEverMonth")
- // {
- // JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- // ProfitService.Instance.Start(job);
- // }
- // else if (QueueName == "FluxPrize")
- // {
- // JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
- // FluxService.Instance.Start(job);
- // }
- channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
- };
- channel.BasicConsume(QueueName, false, consumer);
- }
- #endregion
- #region 单对多发送
- public void SendMsgToExchange(string content, string Exchange = "")
- {
- //创建连接对象工厂
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- var channel = conn.CreateModel();
- channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
- channel.BasicPublish(Exchange, "", null, Encoding.Default.GetBytes(content));
- channel.Dispose();
- conn.Dispose();
- }
- #endregion
- #region 单对多接收
- public void StartReceiveFromExchange(string QueueName = "", string Exchange = "")
- {
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- var channel = conn.CreateModel();
- //定义队列
- channel.QueueDeclare(QueueName, true, false, false);
- //定义交换机
- channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
- //绑定队列到交换机
- channel.QueueBind(QueueName, Exchange, "");
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (a, e) =>
- {
- Library.function.WriteLog(Encoding.Default.GetString(e.Body.ToArray()), "接收到的MQ消息");
- channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
- };
- channel.BasicConsume(QueueName, false, consumer);
- }
- #endregion
- }
- }
|