RabbitMQClient.cs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using RabbitMQ.Client;
  5. using RabbitMQ.Client.Events;
  6. using System.Threading;
  7. using Infrastructure.Model;
  8. using Base;
  9. using Common;
  10. using Util;
  11. using Infrastructure;
  12. using Services;
  13. namespace MySystem
  14. {
  15. public class RabbitMQClient
  16. {
  17. public readonly static RabbitMQClient Instance = new RabbitMQClient();
  18. string UserName,Password,HostName,VirtualHostName;
  19. private RabbitMQClient()
  20. {
  21. MqSettings mqSettings = new();
  22. AppSettings.Bind("MqSettings", mqSettings);
  23. UserName = mqSettings.UserName;
  24. Password = mqSettings.Password;
  25. HostName = mqSettings.HostName;
  26. VirtualHostName = mqSettings.VirtualHostName;
  27. }
  28. #region 单对单接收
  29. public static IConnection _connection;
  30. public void CreateConn()
  31. {
  32. var factory = new ConnectionFactory()
  33. {
  34. UserName = UserName,
  35. Password = Password,
  36. AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
  37. TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
  38. RequestedHeartbeat = TimeSpan.FromMinutes(1),
  39. VirtualHost = VirtualHostName,
  40. };
  41. List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
  42. string[] HostNames = HostName.Split(',');
  43. foreach (string subHostName in HostNames)
  44. {
  45. string[] subHostNameData = subHostName.Split(':');
  46. p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
  47. }
  48. _connection = factory.CreateConnection(p);
  49. }
  50. public void StartReceive(string QueueName, string Exchange = "", string RoutingKey = "")
  51. {
  52. if (_connection == null)
  53. {
  54. CreateConn();
  55. }
  56. else if (!_connection.IsOpen)
  57. {
  58. CreateConn();
  59. }
  60. var channel = _connection.CreateModel();
  61. var queue = channel.QueueDeclare(QueueName, true, false, false, null);
  62. channel.QueueBind(QueueName, Exchange, RoutingKey);
  63. channel.BasicQos(0, 1, false);
  64. EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
  65. consumer.Received += (a, e) =>
  66. {
  67. string MsgContent = Encoding.Default.GetString(e.Body.ToArray());
  68. try
  69. {
  70. Function.WriteLog(DateTime.Now.ToString() + "-" + MsgContent, "接收mq数据队列");
  71. if(QueueName == "QUEUE_KXS_RETRY_MACHINE_PRIZE_CONFIG_DIVISION")
  72. {
  73. PrizeDo.addPrize(QueueName, MsgContent);
  74. }
  75. else
  76. {
  77. PrizeDo.sendPrize(QueueName, MsgContent);
  78. }
  79. Function.WriteLog(DateTime.Now.ToString() + "-end", "接收mq数据队列");
  80. }
  81. catch(Exception ex)
  82. {
  83. Function.WriteLog(DateTime.Now + "\n" + ex.ToString() + "\n" + MsgContent, "发奖异常");
  84. }
  85. channel.BasicAck(e.DeliveryTag, false); //收到回复后,RabbitMQ会直接在队列中删除这条消息
  86. Function.WritePage("/count/", QueueName + ".txt", queue.MessageCount.ToString());
  87. };
  88. channel.BasicConsume(QueueName, false, consumer);
  89. }
  90. public void Start()
  91. {
  92. var setService = App.GetService<IPriPrizeInSetService>();
  93. var list = setService.GetList(m => m.status == 1);
  94. foreach(var sub in list)
  95. {
  96. if(sub.mqQueueName == "QUEUE_KXS_MACHINE_PRIZE_CONFIG_DIVISION" || sub.mqQueueName == "QUEUE_KXS_HAODA_PRIZE_CONFIG_DIVISION")
  97. {
  98. for (int i = 0; i < 8; i++)
  99. {
  100. StartReceive(sub.mqQueueName, "kxs_direct_ranch", "/");
  101. }
  102. }
  103. else
  104. {
  105. StartReceive(sub.mqQueueName, "kxs_direct_ranch", "/");
  106. }
  107. }
  108. StartReceive("QUEUE_KXS_RETRY_MACHINE_PRIZE_CONFIG_DIVISION", "kxs_direct_ranch", "/");
  109. }
  110. #endregion
  111. }
  112. }