123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- using System;
- using System.Text;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- namespace MySystem
- {
- public class RabbitMQClient
- {
- public readonly static RabbitMQClient Instance = new RabbitMQClient();
- private RabbitMQClient()
- {
- }
- #region 单对单发送
- public void SendMsg(string content, string QueueName = "")
- {
- //创建连接对象工厂
- var factory = new ConnectionFactory()
- {
- UserName = "guest",
- Password = "123456",
- HostName = "localhost",
- Port = 5672, //RabbitMQ默认的端口
- };
- var conn = factory.CreateConnection();
- var channel = conn.CreateModel();
- channel.QueueDeclare(QueueName, true, false, false);
- channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
- channel.Dispose();
- conn.Dispose();
- }
- #endregion
- #region 单对单接收
- public void StartReceive(string QueueName)
- {
- var factory = new ConnectionFactory()
- {
- UserName = "guest",
- Password = "123456",
- HostName = "localhost",
- Port = 5672,
- };
- var conn = factory.CreateConnection();
- var channel = conn.CreateModel();
- channel.QueueDeclare(QueueName, true, false, false);
- EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
- consumer.Received += (a, e) =>
- {
- Library.function.WriteLog(Encoding.Default.GetString(e.Body.ToArray()), "接收到的MQ消息");
- channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
- };
- channel.BasicConsume(QueueName, false, consumer);
- }
- #endregion
- #region 单对多发送
- public void SendMsgToExchange(string content, string Exchange = "")
- {
- //创建连接对象工厂
- var factory = new ConnectionFactory()
- {
- UserName = "guest",
- Password = "123456",
- HostName = "localhost",
- Port = 5672, //RabbitMQ默认的端口
- };
- var conn = factory.CreateConnection();
- var channel = conn.CreateModel();
- channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
- channel.BasicPublish(Exchange, "", null, Encoding.Default.GetBytes(content));
- channel.Dispose();
- conn.Dispose();
- }
- #endregion
- #region 单对多接收
- public void StartReceiveFromExchange(string QueueName = "", string Exchange = "")
- {
- var factory = new ConnectionFactory()
- {
- UserName = "guest",
- Password = "123456",
- HostName = "localhost",
- Port = 5672,
- };
- var conn = factory.CreateConnection();
- var channel = conn.CreateModel();
- //定义队列
- channel.QueueDeclare(QueueName, true, false, false);
- //定义交换机
- channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
- //绑定队列到交换机
- channel.QueueBind(QueueName, Exchange, "");
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (a, e) =>
- {
- Library.function.WriteLog(Encoding.Default.GetString(e.Body.ToArray()), "接收到的MQ消息");
- channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
- };
- channel.BasicConsume(QueueName, false, consumer);
- }
- #endregion
- }
- }
|