using System; using System.Collections.Generic; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Library; using System.Threading; using LitJson; namespace MySystem { public class RabbitMQClient { public readonly static RabbitMQClient Instance = new RabbitMQClient(); string UserName,Password,HostName,VirtualHostName; private RabbitMQClient() { UserName = ConfigurationManager.AppSettings["MqUserName"].ToString(); Password = ConfigurationManager.AppSettings["MqPassword"].ToString(); HostName = ConfigurationManager.AppSettings["MqHostName"].ToString(); VirtualHostName = ConfigurationManager.AppSettings["MqVirtualHostName"].ToString(); } #region 单对单发送 public void SendMsg(string content, string QueueName, uint delayMilliseconds = 0) { if(delayMilliseconds > 0) { // 设置消息的延迟时间 var properties = _channel_send.CreateBasicProperties(); properties.Headers = new Dictionary { { "x-delay", delayMilliseconds } // 延迟5秒 }; _channel_send.BasicPublish("kxs_direct_ranch", QueueName, properties, Encoding.Default.GetBytes(content)); } else { _channel_send.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content)); } function.WriteLog(DateTime.Now.ToString() + "\n" + QueueName + "\n" + content + "\n\n\n", "SendMsg2"); } #endregion #region 单对单接收 public static IModel _channel_send; public void CreateConn(string QueueName) { var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复 VirtualHost = VirtualHostName, }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } var conn = factory.CreateConnection(p); _channel_send = conn.CreateModel(); _channel_send.ExchangeDeclare("kxs_direct_ranch", "x-delayed-message", true); _channel_send.QueueDeclare(QueueName, true, false, false); string routingKey = "delayed_routing_key"; IDictionary arguments = new Dictionary { { "x-delayed-type", "direct" } // 这里可以指定不同的交换机类型 }; _channel_send.QueueBind(QueueName, "kxs_direct_ranch", routingKey, arguments); } public static IConnection _connection; public void CreateConn() { var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复 RequestedHeartbeat = TimeSpan.FromMinutes(1), VirtualHost = VirtualHostName, }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } _connection = factory.CreateConnection(p); } public void StartReceive(string QueueName) { if (_connection == null) { CreateConn(); } else if (!_connection.IsOpen) { CreateConn(); } var channel = _connection.CreateModel(); channel.QueueBind(QueueName, "kxs_direct_ranch", QueueName); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { string MsgContent = Encoding.Default.GetString(e.Body.ToArray()); if(QueueName == "KXS_DEPOSIT_QUEUE") { if(ChangePosFeeQueue.Instance.ChangePosDeposit(MsgContent)) { channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息 } } if(QueueName == "KXS_FEE_QUEUE") { if(SetDepositPostService.Instance.ChangeFee(MsgContent)) { channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息 } } }; channel.BasicConsume(QueueName, false, consumer); } #endregion } }