RabbitMQClient.cs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. using System;
  2. using System.Text;
  3. using RabbitMQ.Client;
  4. using RabbitMQ.Client.Events;
  5. namespace MySystem
  6. {
  7. public class RabbitMQClient
  8. {
  9. public readonly static RabbitMQClient Instance = new RabbitMQClient();
  10. private RabbitMQClient()
  11. {
  12. }
  13. #region 单对单发送
  14. public void SendMsg(string content, string QueueName = "")
  15. {
  16. //创建连接对象工厂
  17. var factory = new ConnectionFactory()
  18. {
  19. UserName = "guest",
  20. Password = "123456",
  21. HostName = "localhost",
  22. Port = 5672, //RabbitMQ默认的端口
  23. };
  24. var conn = factory.CreateConnection();
  25. var channel = conn.CreateModel();
  26. channel.QueueDeclare(QueueName, true, false, false);
  27. channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
  28. channel.Dispose();
  29. conn.Dispose();
  30. }
  31. #endregion
  32. #region 单对单接收
  33. public void StartReceive(string QueueName)
  34. {
  35. var factory = new ConnectionFactory()
  36. {
  37. UserName = "guest",
  38. Password = "123456",
  39. HostName = "localhost",
  40. Port = 5672,
  41. };
  42. var conn = factory.CreateConnection();
  43. var channel = conn.CreateModel();
  44. channel.QueueDeclare(QueueName, true, false, false);
  45. EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
  46. consumer.Received += (a, e) =>
  47. {
  48. Library.function.WriteLog(Encoding.Default.GetString(e.Body.ToArray()), "接收到的MQ消息");
  49. channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
  50. };
  51. channel.BasicConsume(QueueName, false, consumer);
  52. }
  53. #endregion
  54. #region 单对多发送
  55. public void SendMsgToExchange(string content, string Exchange = "")
  56. {
  57. //创建连接对象工厂
  58. var factory = new ConnectionFactory()
  59. {
  60. UserName = "guest",
  61. Password = "123456",
  62. HostName = "localhost",
  63. Port = 5672, //RabbitMQ默认的端口
  64. };
  65. var conn = factory.CreateConnection();
  66. var channel = conn.CreateModel();
  67. channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
  68. channel.BasicPublish(Exchange, "", null, Encoding.Default.GetBytes(content));
  69. channel.Dispose();
  70. conn.Dispose();
  71. }
  72. #endregion
  73. #region 单对多接收
  74. public void StartReceiveFromExchange(string QueueName = "", string Exchange = "")
  75. {
  76. var factory = new ConnectionFactory()
  77. {
  78. UserName = "guest",
  79. Password = "123456",
  80. HostName = "localhost",
  81. Port = 5672,
  82. };
  83. var conn = factory.CreateConnection();
  84. var channel = conn.CreateModel();
  85. //定义队列
  86. channel.QueueDeclare(QueueName, true, false, false);
  87. //定义交换机
  88. channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
  89. //绑定队列到交换机
  90. channel.QueueBind(QueueName, Exchange, "");
  91. var consumer = new EventingBasicConsumer(channel);
  92. consumer.Received += (a, e) =>
  93. {
  94. Library.function.WriteLog(Encoding.Default.GetString(e.Body.ToArray()), "接收到的MQ消息");
  95. channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
  96. };
  97. channel.BasicConsume(QueueName, false, consumer);
  98. }
  99. #endregion
  100. }
  101. }