这篇博客介绍订阅、路由和通配符模式,之所以放在一起介绍,是因为这三种模式都是用了Exchange交换机,消息没有直接发送到队列,而是发送到了交换机,经过队列绑定交换机到达队列。

性能排序:fanout > direct >> topic。比例大约为11:10:6     

一、订阅模式(Fanout Exchange):

   一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的exchange上面,那么该消息将会丢失,这是因为在rabbitMQ中exchange不具备存储消息的能力,只有队列具备存储消息的能力。

      

         

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

1.可以理解为路由表的模式

2.这种模式不需要RouteKey

3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。

4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

示例代码:

生产者:

[java]  

  1. public class Send {  

  2.   

  3.     private final static String EXCHANGE_NAME = "test_exchange_fanout";  

  4.   

  5.     public static void main(String[] argv) throws Exception {  

  6.         // 获取到连接以及mq通道  

  7.         Connection connection = ConnectionUtil.getConnection();  

  8.         //从连接中创建通道  

  9.         Channel channel = connection.createChannel();  

  10.   

  11.         // 声明exchange  

  12.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  

  13.   

  14.         // 消息内容  

  15.         String message = "商品已经新增,id = 1000";  

  16.         channel.basicPublish(EXCHANGE_NAME, ""null, message.getBytes());  

  17.           

  18.         System.out.println(" [x] Sent '" + message + "'");  

  19.   

  20.         channel.close();  

  21.         connection.close();  

  22.     }  

  23. }  

消费者1:

[java]  

  1. public class Recv {  

  2.   

  3.     private final static String QUEUE_NAME = "test_queue_fanout_1";  

  4.   

  5.     private final static String EXCHANGE_NAME = "test_exchange_fanout";  

  6.   

  7.     public static void main(String[] argv) throws Exception {  

  8.   

  9.         // 获取到连接以及mq通道  

  10.         Connection connection = ConnectionUtil.getConnection();  

  11.         Channel channel = connection.createChannel();  

  12.   

  13.         // 声明队列  

  14.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  

  15.   

  16.         // 绑定队列到交换机  

  17.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");  

  18.   

  19.         // 同一时刻服务器只会发一条消息给消费者  

  20.         channel.basicQos(1);  

  21.   

  22.         // 定义队列的消费者  

  23.         QueueingConsumer consumer = new QueueingConsumer(channel);  

  24.         // 监听队列,手动返回完成  

  25.         channel.basicConsume(QUEUE_NAME, true, consumer);  

  26.   

  27.         // 获取消息  

  28.         while (true) {  

  29.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

  30.             String message = new String(delivery.getBody());  

  31.             System.out.println(" 前台系统: '" + message + "'");  

  32.             Thread.sleep(10);  

  33.   

  34.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

  35.         }  

  36.     }  

  37. }  

  消费者2的代码和消费者1的代码大致相同,只是队列的名称不一样,这样两个消费者有自己的队列,都可以接收到生产者发送的消息

  但是如果生产者有新增商品,修改商品,删除商品的消息,消费者包快前台系统和搜索系统,要求前台系统接收修改和删除商品的消息,搜索系统接收新增商品、修改商品和删除商品的消息。所以使用这种订阅模式实现商品数据的同步并不合理。因此我们介绍下一种模式:路由模式。

二、路由模式(Direct Exchange)

  这种模式添加了一个路由键,生产者发布消息的时候添加路由键,消费者绑定队列到交换机时添加键值,这样就可以接收到需要接收的消息。

       

任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue

1.一般情况可以使用rabbitMQ自带的Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)。

2.这种模式下不需要将Exchange进行任何绑定(binding)操作

3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。

4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。

示例代码:

生产者:

[java]  

  1. public class Send {  

  2.   

  3.     private final static String EXCHANGE_NAME = "test_exchange_direct";  

  4.   

  5.     public static void main(String[] argv) throws Exception {  

  6.         // 获取到连接以及mq通道  

  7.         Connection connection = ConnectionUtil.getConnection();  

  8.         Channel channel = connection.createChannel();  

  9.   

  10.         // 声明exchange  

  11.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  

  12.   

  13.         // 消息内容  

  14.         String message = "删除商品, id = 1001";  

  15.         channel.basicPublish(EXCHANGE_NAME, "delete"null, message.getBytes());  

  16.         System.out.println(" [x] Sent '" + message + "'");  

  17.   

  18.         channel.close();  

  19.         connection.close();  

  20.     }  

  21. }  

消费者1:接收更新和删除消息

[java]  

  1. public class Recv {  

  2.   

  3.     private final static String QUEUE_NAME = "test_queue_direct_1";  

  4.   

  5.     private final static String EXCHANGE_NAME = "test_exchange_direct";  

  6.   

  7.     public static void main(String[] argv) throws Exception {  

  8.   

  9.         // 获取到连接以及mq通道  

  10.         Connection connection = ConnectionUtil.getConnection();  

  11.         Channel channel = connection.createChannel();  

  12.   

  13.         // 声明队列  

  14.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  

  15.   

  16.         // 绑定队列到交换机  

  17.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");  

  18.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");  

  19.   

  20.         // 同一时刻服务器只会发一条消息给消费者  

  21.         channel.basicQos(1);  

  22.   

  23.         // 定义队列的消费者  

  24.         QueueingConsumer consumer = new QueueingConsumer(channel);  

  25.         // 监听队列,手动返回完成  

  26.         channel.basicConsume(QUEUE_NAME, false, consumer);  

  27.   

  28.         // 获取消息  

  29.         while (true) {  

  30.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

  31.             String message = new String(delivery.getBody());  

  32.             System.out.println(" 前台系统: '" + message + "'");  

  33.             Thread.sleep(10);  

  34.   

  35.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

  36.         }  

  37.     }  

  38. }  

消费者2:接收insert,update,delete的消息

[java]  

  1. public class Recv2 {  

  2.   

  3.     private final static String QUEUE_NAME = "test_queue_direct_2";  

  4.   

  5.     private final static String EXCHANGE_NAME = "test_exchange_direct";  

  6.   

  7.     public static void main(String[] argv) throws Exception {  

  8.   

  9.         // 获取到连接以及mq通道  

  10.         Connection connection = ConnectionUtil.getConnection();  

  11.         Channel channel = connection.createChannel();  

  12.   

  13.         // 声明队列  

  14.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  

  15.   

  16.         // 绑定队列到交换机  

  17.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");  

  18.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");  

  19.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");  

  20.   

  21.         // 同一时刻服务器只会发一条消息给消费者  

  22.         channel.basicQos(1);  

  23.   

  24.         // 定义队列的消费者  

  25.         QueueingConsumer consumer = new QueueingConsumer(channel);  

  26.         // 监听队列,手动返回完成  

  27.         channel.basicConsume(QUEUE_NAME, false, consumer);  

  28.   

  29.         // 获取消息  

  30.         while (true) {  

  31.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

  32.             String message = new String(delivery.getBody());  

  33.             System.out.println(" 搜索系统: '" + message + "'");  

  34.             Thread.sleep(10);  

  35.   

  36.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

  37.         }  

  38.     }  

  39. }  

  如果生产者发布了insert消息,那么消费者2可以收到,消费者 1收不到,如果发布了update或者delete消息,两个消费者都可以收到。如果发布ABC消息两个消费者都收不到,因为没有绑定这个键值。这种模式基本满足了我们的需求,但是还不够灵活,下面介绍另外一个模式。

三、通配符模式(Topic Exchange)

   基本思想和路由模式是一样的,只不过路由键支持模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词

       

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。

2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。

3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。

4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。

示例代码:

生产者:

[java]  

  1. public class Send {  

  2.   

  3.     private final static String EXCHANGE_NAME = "test_exchange_topic";  

  4.   

  5.     public static void main(String[] argv) throws Exception {  

  6.         // 获取到连接以及mq通道  

  7.         Connection connection = ConnectionUtil.getConnection();  

  8.         Channel channel = connection.createChannel();  

  9.   

  10.         // 声明exchange  

  11.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  

  12.   

  13.         // 消息内容  

  14.         String message = "删除商品,id = 1001";  

  15.         channel.basicPublish(EXCHANGE_NAME, "item.delete"null, message.getBytes());  

  16.         System.out.println(" [x] Sent '" + message + "'");  

  17.   

  18.         channel.close();  

  19.         connection.close();  

  20.     }  

  21. }  

消费者1:

[java]  

  1. public class Recv {  

  2.   

  3.     private final static String QUEUE_NAME = "test_queue_topic_1";  

  4.   

  5.     private final static String EXCHANGE_NAME = "test_exchange_topic";  

  6.   

  7.     public static void main(String[] argv) throws Exception {  

  8.   

  9.         // 获取到连接以及mq通道  

  10.         Connection connection = ConnectionUtil.getConnection();  

  11.         Channel channel = connection.createChannel();  

  12.   

  13.         // 声明队列  

  14.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  

  15.   

  16.         // 绑定队列到交换机  

  17.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");  

  18.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");  

  19.   

  20.         // 同一时刻服务器只会发一条消息给消费者  

  21.         channel.basicQos(1);  

  22.   

  23.         // 定义队列的消费者  

  24.         QueueingConsumer consumer = new QueueingConsumer(channel);  

  25.         // 监听队列,手动返回完成  

  26.         channel.basicConsume(QUEUE_NAME, false, consumer);  

  27.   

  28.         // 获取消息  

  29.         while (true) {  

  30.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

  31.             String message = new String(delivery.getBody());  

  32.             System.out.println(" 前台系统: '" + message + "'");  

  33.             Thread.sleep(10);  

  34.   

  35.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

  36.         }  

  37.     }  

  38. }  

消费者2:

[java]  

  1. public class Recv2 {  

  2.   

  3.     private final static String QUEUE_NAME = "test_queue_topic_2";  

  4.   

  5.     private final static String EXCHANGE_NAME = "test_exchange_topic";  

  6.   

  7.     public static void main(String[] argv) throws Exception {  

  8.   

  9.         // 获取到连接以及mq通道  

  10.         Connection connection = ConnectionUtil.getConnection();  

  11.         Channel channel = connection.createChannel();  

  12.   

  13.         // 声明队列  

  14.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  

  15.   

  16.         // 绑定队列到交换机  

  17.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");  

  18.   

  19.         // 同一时刻服务器只会发一条消息给消费者  

  20.         channel.basicQos(1);  

  21.   

  22.         // 定义队列的消费者  

  23.         QueueingConsumer consumer = new QueueingConsumer(channel);  

  24.         // 监听队列,手动返回完成  

  25.         channel.basicConsume(QUEUE_NAME, false, consumer);  

  26.   

  27.         // 获取消息  

  28.         while (true) {  

  29.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

  30.             String message = new String(delivery.getBody());  

  31.             System.out.println(" 搜索系统: '" + message + "'");  

  32.             Thread.sleep(10);  

  33.   

  34.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

  35.         }  

  36.     }  

  37. }  

  消费者1是按需索取,并没有使用通配符模式,而是用的完全匹配,消费者2使用通配符模式,这样以item.开头的消息都会全部接收。

小结:

  1.与简单模式和work模式对比,前面两种同一个消息只能被一个消费者获取,而今天的这三种模式,可以实现一个消息被多个消费者 获取。

  2.fanout这种模式没有加入路由器,队列与exchange绑定后,就会接收到所有的消息,其余两种增加了路由键,并且第三种增加通配符,更加便利。

本文出自https://blog.csdn.net/ww130929/article/details/72842234