|
@@ -0,0 +1,75 @@
|
|
|
+using System;
|
|
|
+using System.Collections.Generic;
|
|
|
+using System.Text;
|
|
|
+using RabbitMQ.Client;
|
|
|
+using RabbitMQ.Client.Events;
|
|
|
+using System.Threading;
|
|
|
+using Infrastructure.Model;
|
|
|
+using Base;
|
|
|
+using Common;
|
|
|
+using Util;
|
|
|
+
|
|
|
+namespace MySystem
|
|
|
+{
|
|
|
+ public class RabbitMQClient
|
|
|
+ {
|
|
|
+ public readonly static RabbitMQClient Instance = new RabbitMQClient();
|
|
|
+ string UserName,Password,HostName,VirtualHostName;
|
|
|
+ private RabbitMQClient()
|
|
|
+ {
|
|
|
+ MqSettings mqSettings = new();
|
|
|
+ AppSettings.Bind("MqSettings", mqSettings);
|
|
|
+ UserName = mqSettings.UserName;
|
|
|
+ Password = mqSettings.Password;
|
|
|
+ HostName = mqSettings.HostName;
|
|
|
+ VirtualHostName = mqSettings.VirtualHostName;
|
|
|
+ }
|
|
|
+
|
|
|
+ #region 单对单接收
|
|
|
+ public static IConnection _connection;
|
|
|
+ public void CreateConn()
|
|
|
+ {
|
|
|
+ var factory = new ConnectionFactory()
|
|
|
+ {
|
|
|
+ UserName = UserName,
|
|
|
+ Password = Password,
|
|
|
+ AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
|
|
|
+ TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
|
|
|
+ RequestedHeartbeat = TimeSpan.FromMinutes(1),
|
|
|
+ VirtualHost = VirtualHostName,
|
|
|
+ };
|
|
|
+ List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
|
|
|
+ string[] HostNames = HostName.Split(',');
|
|
|
+ foreach (string subHostName in HostNames)
|
|
|
+ {
|
|
|
+ string[] subHostNameData = subHostName.Split(':');
|
|
|
+ p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
|
|
|
+ }
|
|
|
+ _connection = factory.CreateConnection(p);
|
|
|
+ }
|
|
|
+ public void StartReceive(string QueueName, string Exchange = "", string RoutingKey = "")
|
|
|
+ {
|
|
|
+ if (_connection == null)
|
|
|
+ {
|
|
|
+ CreateConn();
|
|
|
+ }
|
|
|
+ else if (!_connection.IsOpen)
|
|
|
+ {
|
|
|
+ CreateConn();
|
|
|
+ }
|
|
|
+ var channel = _connection.CreateModel();
|
|
|
+ channel.QueueBind(QueueName, Exchange, RoutingKey);
|
|
|
+ EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
|
|
|
+ consumer.Received += (a, e) =>
|
|
|
+ {
|
|
|
+ string MsgContent = Encoding.Default.GetString(e.Body.ToArray());
|
|
|
+ Function.WriteLog(MsgContent, "接收mq数据队列");
|
|
|
+ PrizeDo.sendPrize(MsgContent);
|
|
|
+ channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
|
|
|
+ };
|
|
|
+ channel.BasicConsume(QueueName, false, consumer);
|
|
|
+ }
|
|
|
+ #endregion
|
|
|
+
|
|
|
+ }
|
|
|
+}
|