cba赛程 cba



文章插图
cba赛程 cba

文章插图
在消息队列模型中 , 如何将消息广播到所有的消费者 , 这种模式称为“发布/订阅” 。本文主要以一个简单的小例子 , 简述通过fanout交换机 , 实现消息的发布与订阅 , 仅供学习分享使用 , 如有不足之处 , 还请指正 。
Fanout交换机模型
扇形交换机 , 采用广播模式 , 根据绑定的交换机 , 路由到与之对应的所有队列 。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上 。很像子网广播 , 每台子网内的主机都获得了一份复制的消息 。Fanout交换机转发消息是最快的 。
RabbitMQ控制台操作新增两个队列
在同一个Virtual host下新增两个队列Q1,Q2 , 如下图所示:
绑定fanout交换机
将两个队列绑定到系统默认的fanout交换机 , 如下所示:
示例效果图
生产者 , 采用Fanout类型交换机发布消息 , 如下图所示:
当生产者发布 一条消息时 , Q1,Q2两个队列均会收到 , 如下图所示:
当启动消费者后 , 两个消费者 , 均会订阅到相关消息 , 如下图所示:
【cba赛程 cba】核心代码消息发布
建立连接后 , 将通道声明类型为Fanout的交换机 , 如下所示:
1/// <summary> 2/// fanout类型交换机 , 发送消息 3/// </summary> 4public class RabbitMqFanoutSendHelper : RabbitMqHelper { 5/// <summary> 6/// 发送消息 7/// </summary> 8/// <param name="msg"></param> 9/// <returns></returns>10public bool SendMsg(string msg)11{12try13{14using (var conn = GetConnection("/Alan.hsiang"))15{16using (var channel = conn.CreateModel())17{18channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);1920var body = Encoding.UTF8.GetBytes(msg);21 22channel.BasicPublish(exchange: "amq.fanout",23routingKey: "",24basicProperties: null,25body: body);26 27//Console.WriteLine(" [x] Sent {0}", message);28};29};30return true;31}32catch (Exception ex)33{34throw ex;35}36}37}
消息订阅
建立连接后 , 通道声明类型为Fanout的交换机 , 并绑定队列进行订阅 , 如下所示:
1/// <summary> 2/// 扇形交换机接收消息 3/// </summary> 4public class RabbitMqFanoutReceiveHelper : RabbitMqHelper 5{ 6public RabbitMqReceiveEventHandler OnReceiveEvent; 78private IConnection conn; 9 10private IModel channel;11 12private EventingBasicConsumer consumer;13 14public bool StartReceiveMsg(string queueName)15{16try17{18conn = GetConnection("/Alan.hsiang");19 20channel = conn.CreateModel();21channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);22//此处随机取出交换机下的队列23//var queueName = channel.QueueDeclare().QueueName;24channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");25consumer = new EventingBasicConsumer(channel);26consumer.Received += (model, ea) =>27{28var body = ea.Body.ToArray();29var message = Encoding.UTF8.GetString(body);30//Console.WriteLine(" [x] Received {0}", message);31if (OnReceiveEvent != null)32{33OnReceiveEvent(queueName+"::"+message);34}35};36channel.BasicConsume(queue: queueName,37autoAck: true,38consumer: consumer);39return true;40}41catch (Exception ex)42{43throw ex;44}45}46}