消息发送模式实战之发布订阅模式
1. 前言
Hello,大家好。在上一小节中,我们对 RabbitMQ 中的消息模式之直接模式和主题模式,做了基础概念的回顾,以及对两种不同的消息发送模式进行了代码实操,让同学们可以清楚地知道实际的代码应该怎么编写。
本小节将继续介绍 RabbitMQ 中其他消息发送模式的代码实操内容,同样地,在正式介绍代码实操部分内容之前,我们还是需要回顾一下,相应消息发送模式的基础概念。
本节主要内容:
-
发布订阅模式基础概念回顾;
-
发布订阅模式代码实操。
2.发布订阅模式基础概念回顾
在之前的小节中,我们对发布订阅模式做了详细的介绍,现在让我们来回顾一下我们对发布订阅模式做过的一些基础性的介绍。
定义:
发布订阅模式,即生产者发布消息,消费者通过订阅的方式来消费消息。
其实,发布订阅模式的本质还是生产者生产消息,消费者从消息队列中获取消息并进行消费。
所谓的发布订阅模式,只不过是给传统的发送和接收起一个高大上的名字罢了,本质上仍热是消息的生产和消费,只不过这种模式更像于发布和订阅,因此得名发布订阅模式。
描述:
在 RabbitMQ 的消息发送模式中,发布订阅模式对应的只有一种实际的业务场景,我们把他称为群发模式。
如上图所示,此场景是发布订阅模式中的一种,但是这种模式没有任何存在意义,因为在这种模式下,生产者生产出一条消息之后,将消息直接发送到了交换机上,之后的流程就没有了。
大家注意看,此时的交换机上没有绑定任何消息队列,所以,此时位于交换机上的消息将丢失,消费者无法拿到消息进行消费。这种模式场景并没有任何实际意义,因为我们生产的消息无法被消费掉,我们的业务流程也就不能正常流转。
接下来让我们看看具有实际意义的发布订阅模式:
生产者生产一条消息后,将消息首先发送到交换机上,交换机进行检测,发现存在两个队列都绑定在自身上面,于是,将消息全部投递到所绑定的队列上面,最后再由消费者接收消息并消费。
发布订阅模式的特点,就是一个生产者、一个交换机、多个队列、多个消费者, 由于生产者生产出来的消息会发送到绑定在交换机上的所有队列上,这种场景很类似于我们给很多人群发消息,所以,这种模式被叫做群发模式。
3 发布订阅模式代码实操
3.1 第一种业务场景
在上述发布订阅消息发送模式中,我们介绍了发布订阅模式的基础概念,同时,引入了两种发布订阅模式的业务场景,并且对不同的业务场景做了基本的介绍,下面让我们来看一下如何使用代码来实现这两种不同的业务场景。
实现代码:
// 发布订阅模式-第一种业务场景
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.close();
connection.close();
代码解释:
第 1-5 行,我们使用了 RabbitMQ 的 ConnectionFactory 连接工厂,来初始化了一些获取 RabbitMQ 连接的必要参数。
第 6 行,我们从 RabbitMQ 的连接工厂中,获取到了一个 RabbitMQ Conneciton 连接实例。
第 7 行,我们通过 RabbitMQ 连接实例,创建了一个 channel 通道,为之后的消息通信提供媒介。
第 8 行,我们使用了 channel 通道中的 exchangeDeclare 方法,来为我们的 channel 通道绑定了一个 exchange 交换机。
由于在第一种业务场景中,我们不会在 exchange 交换机上绑定任何的消息队列,所以在上述代码中,我们看不到 channel 通道与 queue 消息队列进行绑定的方法。
在此种业务场景下,当生产者生产出来消息之后,在将消息发布到我们的 exchange 交换机上,整个流程就结束了,没有消费者可以拿到这一消息,所以,这种业务场景毫无意义。
最后,让我们来看第二种业务场景。
3.2 第二种业务场景
我们来看主体模式的代码实现:
实现代码:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
channel.basicConsume(QUEUE_NAME, false, consumer);
channel.close();
connection.close();
代码解释:
第 1-7 行,代码和第一种业务场景相似,这里不再赘述。
第 8 行,我们使用 channel 的 queueBind 方法来将消息队列和通道进行绑定,这里注意,我们并没有为我们的消息队列指定一个 Routing Key 值,如果这里指定了,就不是发布订阅模式了。
第 9 行,我们使用 channel 的 basicConsume 方法,来获取我们的消息,并对消息进行一个消费。
第 10-11 行,代码和第一种业务场景相似,这里不再赘述。
通过上述代码段,我们可以看到,我们在 channel 通道上,绑定了一个消息队列,这样我们的消息就可以被消费者进行消费了。
Tips:
1. 我们在实现发布订阅模式的时候,在生产者端,我们需要将 exchange 交换机的类型,声明为 fanout 类型,这种类型才是我们说的发布订阅模式;
2. 发布订阅模式更多的应用场景,是用在消息通知群发、批量传送数据等业务场景,消息传送效率还是很高的。
4. 小结
本小节为同学们详细介绍了 RabbitMQ 消息发送模式之发布订阅模式的代码实操等内容,包括发布订阅模式的基础概念等内容的回顾,以及发布订阅模式常见的两种业务场景的代码实现,两种业务场景的不同之处及使用场景等内容,同学们需要理清代码实现的思路和步骤。