RabbitMQClientV2.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. using Newtonsoft.Json;
  2. using RabbitMQ.Client;
  3. using RabbitMQ.Client.Events;
  4. using System;
  5. using System.Text;
  6. namespace MySystem
  7. {
  8. class ConfigModel
  9. {
  10. }
  11. public enum ExchangeTypeEnum
  12. {
  13. /// <summary>
  14. /// 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
  15. /// 很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
  16. /// </summary>
  17. fanout = 1,
  18. /// <summary>
  19. /// 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配
  20. /// 。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,
  21. /// 则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
  22. /// </summary>
  23. direct = 2,
  24. /// <summary>
  25. /// 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
  26. /// 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
  27. /// 因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
  28. /// </summary>
  29. topic = 3,
  30. header = 4
  31. }
  32. /// <summary>
  33. /// 数据被执行后的处理方式
  34. /// </summary>
  35. public enum ProcessingResultsEnum
  36. {
  37. /// <summary>
  38. /// 处理成功
  39. /// </summary>
  40. Accept,
  41. /// <summary>
  42. /// 可以重试的错误
  43. /// </summary>
  44. Retry,
  45. /// <summary>
  46. /// 无需重试的错误
  47. /// </summary>
  48. Reject,
  49. }
  50. /// <summary>
  51. /// 消息队列的配置信息
  52. /// </summary>
  53. public class RabbitMqConfigModel
  54. {
  55. #region host
  56. /// <summary>
  57. /// 服务器IP地址
  58. /// </summary>
  59. public string IP { get; set; }
  60. /// <summary>
  61. /// 服务器端口,默认是 5672
  62. /// </summary>
  63. public int Port { get; set; }
  64. /// <summary>
  65. /// 登录用户名
  66. /// </summary>
  67. public string UserName { get; set; }
  68. /// <summary>
  69. /// 登录密码
  70. /// </summary>
  71. public string Password { get; set; }
  72. /// <summary>
  73. /// 虚拟主机名称
  74. /// </summary>
  75. public string VirtualHost { get; set; }
  76. #endregion
  77. #region Queue
  78. /// <summary>
  79. /// 队列名称
  80. /// </summary>
  81. public string QueueName { get; set; }
  82. /// <summary>
  83. /// 是否持久化该队列
  84. /// </summary>
  85. public bool DurableQueue { get; set; }
  86. #endregion
  87. #region exchange
  88. /// <summary>
  89. /// 路由名称
  90. /// </summary>
  91. public string ExchangeName { get; set; }
  92. /// <summary>
  93. /// 路由的类型枚举
  94. /// </summary>
  95. public ExchangeTypeEnum ExchangeType { get; set; }
  96. /// <summary>
  97. /// 路由的关键字
  98. /// </summary>
  99. public string RoutingKey { get; set; }
  100. #endregion
  101. #region message
  102. /// <summary>
  103. /// 是否持久化队列中的消息
  104. /// </summary>
  105. public bool DurableMessage { get; set; }
  106. #endregion
  107. }
  108. /// <summary>
  109. /// 基类
  110. /// </summary>
  111. public class BaseService
  112. {
  113. public static IConnection _connection;
  114. /// <summary>
  115. /// 服务器配置
  116. /// </summary>
  117. public RabbitMqConfigModel RabbitConfig { get; set; }
  118. #region 构造函数
  119. /// <summary>
  120. /// 构造函数
  121. /// </summary>
  122. /// <param name="config"></param>
  123. public BaseService(RabbitMqConfigModel config)
  124. {
  125. try
  126. {
  127. RabbitConfig = config;
  128. CreateConn();
  129. }
  130. catch (Exception)
  131. {
  132. throw;
  133. }
  134. }
  135. #endregion
  136. #region 方法
  137. #region 初始化
  138. /// <summary>
  139. /// 创建连接
  140. /// </summary>
  141. public void CreateConn()
  142. {
  143. ConnectionFactory cf = new ConnectionFactory();
  144. cf.Port = RabbitConfig.Port; //服务器的端口
  145. cf.Endpoint = new AmqpTcpEndpoint(new Uri("amqp://" + RabbitConfig.IP + "/")); //服务器ip
  146. cf.UserName = RabbitConfig.UserName; //登录账户
  147. cf.Password = RabbitConfig.Password; //登录账户
  148. cf.VirtualHost = RabbitConfig.VirtualHost; //虚拟主机
  149. cf.RequestedHeartbeat = TimeSpan.Parse("60"); //虚拟主机
  150. _connection = cf.CreateConnection();
  151. }
  152. #endregion
  153. #region 发送消息
  154. /// <summary>
  155. /// 发送消息,泛型
  156. /// </summary>
  157. /// <typeparam name="T"></typeparam>
  158. /// <param name="message"></param>
  159. /// <returns></returns>
  160. public bool Send<T>(T messageInfo, ref string errMsg)
  161. {
  162. if (messageInfo == null)
  163. {
  164. errMsg = "消息对象不能为空";
  165. return false;
  166. }
  167. string value = JsonConvert.SerializeObject(messageInfo);
  168. return Send(value, ref errMsg);
  169. }
  170. /// <summary>
  171. /// 发送消息,string类型
  172. /// </summary>
  173. /// <param name="message"></param>
  174. /// <param name="errMsg"></param>
  175. /// <returns></returns>
  176. public bool Send(string message)
  177. {
  178. if (string.IsNullOrEmpty(message))
  179. {
  180. return false;
  181. }
  182. try
  183. {
  184. if (!_connection.IsOpen)
  185. {
  186. CreateConn();
  187. }
  188. using (var channel = _connection.CreateModel())
  189. {
  190. //推送消息
  191. byte[] bytes = Encoding.UTF8.GetBytes(message);
  192. IBasicProperties properties = channel.CreateBasicProperties();
  193. properties.DeliveryMode = Convert.ToByte(RabbitConfig.DurableMessage ? 2 : 1); //支持可持久化数据
  194. if (string.IsNullOrEmpty(RabbitConfig.ExchangeName))
  195. {
  196. //使用自定义的路由
  197. channel.ExchangeDeclare(RabbitConfig.ExchangeName, RabbitConfig.ExchangeType.ToString(), RabbitConfig.DurableMessage, false, null);
  198. channel.BasicPublish("", RabbitConfig.QueueName, properties, bytes);
  199. }
  200. else
  201. {
  202. //申明消息队列,且为可持久化的,如果队列的名称不存在,系统会自动创建,有的话不会覆盖
  203. channel.QueueDeclare(RabbitConfig.QueueName, RabbitConfig.DurableQueue, false, false, null);
  204. channel.BasicPublish(RabbitConfig.ExchangeName, RabbitConfig.RoutingKey, properties, bytes);
  205. }
  206. return true;
  207. }
  208. }
  209. catch (Exception ex)
  210. {
  211. Library.function.WriteLog(DateTime.Now.ToString() + "\r\n" + ex.ToString(), "发送MQ队列消息异常");
  212. return false;
  213. }
  214. }
  215. #endregion
  216. }
  217. public class RabbitBasicService : BaseService
  218. {
  219. /// <summary>
  220. /// 构造函数
  221. /// </summary>
  222. /// <param name="config"></param>
  223. public RabbitBasicService(RabbitMqConfigModel config)
  224. : base(config)
  225. { }
  226. /// <summary>
  227. /// 接受消息,使用Action进行处理
  228. /// </summary>
  229. /// <typeparam name="T"></typeparam>
  230. /// <param name="method"></param>
  231. public void Receive()
  232. {
  233. try
  234. {
  235. using (var channel = _connection.CreateModel())
  236. {
  237. //申明队列
  238. channel.QueueDeclare(RabbitConfig.QueueName, RabbitConfig.DurableQueue, false, false, null);
  239. //使用路由
  240. if (!string.IsNullOrEmpty(RabbitConfig.ExchangeName))
  241. {
  242. //申明路由
  243. channel.ExchangeDeclare(RabbitConfig.ExchangeName, RabbitConfig.ExchangeType.ToString(), RabbitConfig.DurableQueue);
  244. //队列和交换机绑定
  245. channel.QueueBind(RabbitConfig.QueueName, RabbitConfig.ExchangeName, RabbitConfig.RoutingKey);
  246. }
  247. //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
  248. channel.BasicQos(0, 1, false);
  249. //在队列上定义一个消费者
  250. // var customer = new QueueingBasicConsumer(channel);
  251. var customer = new EventingBasicConsumer(channel);
  252. EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
  253. consumer.Received += (a, e) =>
  254. {
  255. string MsgContent = Encoding.Default.GetString(e.Body.ToArray());
  256. channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
  257. };
  258. //消费队列,并设置应答模式为程序主动应答
  259. channel.BasicConsume(RabbitConfig.QueueName, false, customer);
  260. // while (true)//timer
  261. // {
  262. // //阻塞函数,获取队列中的消息
  263. // ProcessingResultsEnum processingResult = ProcessingResultsEnum.Retry;
  264. // ulong deliveryTag = 0;
  265. // try
  266. // {
  267. // //Thread.Sleep(10);
  268. // var ea = customer.Queue.Dequeue();
  269. // deliveryTag = ea.DeliveryTag;
  270. // byte[] bytes = ea.Body;
  271. // string body = Encoding.UTF8.GetString(bytes);
  272. // // T info = JsonConvert.DeserializeObject<T>(body);
  273. // method(body);
  274. // processingResult = ProcessingResultsEnum.Accept;
  275. // }
  276. // catch (Exception ex)
  277. // {
  278. // processingResult = ProcessingResultsEnum.Reject; //系统无法处理的错误
  279. // }
  280. // finally
  281. // {
  282. // switch (processingResult)
  283. // {
  284. // case ProcessingResultsEnum.Accept:
  285. // //回复确认处理成功
  286. // channel.BasicAck(deliveryTag,
  287. // false);//处理单挑信息
  288. // break;
  289. // case ProcessingResultsEnum.Retry:
  290. // //发生错误了,但是还可以重新提交给队列重新分配
  291. // channel.BasicNack(deliveryTag, false, true);
  292. // break;
  293. // case ProcessingResultsEnum.Reject:
  294. // //发生严重错误,无法继续进行,这种情况应该写日志或者是发送消息通知管理员
  295. // channel.BasicNack(deliveryTag, false, false);
  296. // //写日志
  297. // break;
  298. // }
  299. // }
  300. // }
  301. }
  302. }
  303. catch (Exception ex)
  304. {
  305. }
  306. }
  307. }
  308. #endregion
  309. }