123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- using System;
- using System.Collections.Generic;
- using System.Text;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using System.Threading;
- using Library;
- namespace MySystem
- {
- public class RabbitMQClient
- {
- public readonly static RabbitMQClient Instance = new RabbitMQClient();
- string UserName,Password,HostName,OneHostName,OneOnePort;
- private RabbitMQClient()
- {
- UserName = ConfigurationManager.AppSettings["MqUserName"].ToString();
- Password = ConfigurationManager.AppSettings["MqPassword"].ToString();
- HostName = ConfigurationManager.AppSettings["MqHostName"].ToString();
- // OneHostName = ConfigurationManager.AppSettings["MqOneHostName"].ToString();
- // OneOnePort = ConfigurationManager.AppSettings["MqOnePort"].ToString();
- }
- public static IConnection _connection;
- public void CreateConn()
- {
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
- RequestedHeartbeat = TimeSpan.FromMinutes(1),
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- _connection = factory.CreateConnection(p);
- }
- #region 单对单发送
- public void SendMsg(string content, string QueueName = "")
- {
- return;
- if (_connection == null)
- {
- CreateConn();
- }
- else if (!_connection.IsOpen)
- {
- CreateConn();
- }
- var channel = _connection.CreateModel();
- channel.QueueDeclare(QueueName, true, false, false);
- channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
- channel.Dispose();
- _connection.Dispose();
- }
- #endregion
- #region 单对单接收
- public void StartReceive(string QueueName)
- {
- if (_connection == null)
- {
- CreateConn();
- }
- else if (!_connection.IsOpen)
- {
- CreateConn();
- }
- var channel = _connection.CreateModel();
- channel.QueueDeclare(QueueName, true, false, false);
- EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
- consumer.Received += (a, e) =>
- {
- string MsgContent = Encoding.Default.GetString(e.Body.ToArray());
- if (QueueName == "SetRedisDataList")
- {
- // ReceiveSycnTmpTableService.Instance.Start(MsgContent);
- }
- channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
- };
- channel.BasicConsume(QueueName, false, consumer);
- }
- #endregion
- #region 单对多发送
- public void SendMsgToExchange(string content, string Exchange = "")
- {
- //创建连接对象工厂
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- var channel = conn.CreateModel();
- channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
- channel.BasicPublish(Exchange, "", null, Encoding.Default.GetBytes(content));
- channel.Dispose();
- conn.Dispose();
- }
- #endregion
- #region 单对多接收
- public void StartReceiveFromExchange(string QueueName = "", string Exchange = "")
- {
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- var channel = conn.CreateModel();
- //定义队列
- channel.QueueDeclare(QueueName, true, false, false);
- //定义交换机
- channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
- //绑定队列到交换机
- channel.QueueBind(QueueName, Exchange, "");
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (a, e) =>
- {
- Library.function.WriteLog(Encoding.Default.GetString(e.Body.ToArray()), "接收到的MQ消息");
- channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
- };
- channel.BasicConsume(QueueName, false, consumer);
- }
- #endregion
- }
- }
|