RabbitMQ的基本概念和七种队列模式 您所在的位置:网站首页 rabbitmq消费多个队列 RabbitMQ的基本概念和七种队列模式

RabbitMQ的基本概念和七种队列模式

2023-08-19 03:12| 来源: 网络整理| 查看: 265

I. RabbitMQ的基本概念 1. 生产者/消费者

生产者(Producer) 消息的创建者。 负责创建和推送数据到消息服务器。

消费者(Consumer) 消息的接收方。 负责接收消息和处理数据。

 

2. 消息队列(Queue)

消息队列是RabbitMQ的内部对象,用于存储生产者的消息直到发送给消费者,它是消费者接收消息的地方。

消息队列的重要属性:

持久性 broker重启前都有效。 自动删除 在所有消费者停止使用之后自动删除。 惰性 没有主动声明队列,调用会导致异常。 排他性 -一旦启用,声明它的消费者才能使用。

 

3. 交换机(Exchange)

交换机用于接收,分配消息。

1. 生产者要先指定一个routing key,然后将消息发送到交换机。 2. routing key需要与exchange type和binding key联合使用才能最终生效。 3. 交换机将消息路由到一个或多个队列中,或丢弃。

交换机包含4中类型: direct, topic, fanout, headers。

direct(直连交换机) 具有路由功能的交换机,绑定到此交换机的时候需要指定一个routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列。

先匹配,再投送。

Direct Exchange是RabbitMQ的默认交换机模式。 这是最简单的模式。 它根据routing key全文匹配去寻找队列。

在绑定队列时会设定一个routing key(通常是队列的名字)。 只有在消息的routing key与队列匹配时,消息才会被交换机投送到绑定的队列中。 topic(主题交换机) 在直连交换机基础上增加模式匹配,也就是对routing_key进行模式匹配,*代表一个单词,#代表多个单词。 按规则转发消息。

主题交换机(Topic Exchange)主要根据通配符转发消息。 这种方式最灵活。 交换机和队列的绑定会定义一种路由模式。 路由键(routing key)和路由模式匹配后,交换机才能转发消息。

在这种交换机模式下,路由键(routing key)必须是一串字符,用"."隔开。 路由模式必须包含一个星号"*", 主要用于匹配路由键指定位置的一个单词。 * 匹配一个单词。 # 匹配0个或多个单词。 eg: binding key: *.com.# 匹配的routing key: cn.com, us.com.aa 不匹配: com.bb headers(首部交换机) 忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则。 根据应用程序消息的特定属性进行匹配。 fanout(扇形交换机) 广播消息到所有队列,没有任何处理,速度最快。 消息广播的模式。

这种方式将消息广播到所有绑定到它的队列中。 不考虑routing key的值,即使配置了路由键,依然会被忽略。

 

4. 消息确认

消息确认是指当一个消息从队列中投递给消费者(consumer)后,消费者会通知一下消息代理(broker)。

消息确认可以自动,也可以由处理消息的开发者手动执行。 当启用消息确认后,消息代理需要收到来自消费者的确认回执后,才完全将消息从队列中删除。

   

II. 七种队列模式 1. 简单模式(Hello World)

做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B。 单生产者,单消费者,单队列。

应用场景:

将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人。

 

2. 工作队列模式(Work queues)

在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者。 适用于资源密集型任务, 单个消费者处理不过来,需要多个消费者进行处理的场景。 单生产者,多消费者,单队列。

应用场景:

一个订单的处理需要10s,有多个订单可以同时放到消息队列,

然后让多个消费者同时并行处理,而不是单个消费者的串行消费。

 

3. 发布订阅模式(Publish/Subscribe)

一次向许多消费者发送消息,将消息将广播到所有的消费者。 单生产者,多消费者,多队列。

应用场景:

更新商品库存后需要通知多个缓存和多个数据库。

结构如下:

一个fanout类型交换机扇出两个消息队列,分别为缓存消息队列、数据库消息队列 一个缓存消息队列对应着多个缓存消费者 一个数据库消息队列对应着多个数据库消费者

 

4. 路由模式(Routing)

路由模式(Routing)

根据Routing Key有选择地接收消息。 多消费者,选择性多队列,每个队列通过routing key全文匹配。

发送消息到交换机并且要指定路由键(Routing key) 。 消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息。

应用场景:

在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12 promote, 只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息。

 

5. 主题模式(Topics)

主题模式(Topics)

主题交换机方式接收消息,将routing key和模式进行匹配。 多消费者,选择性多队列,每个队列通过模式匹配。

队列需要绑定在一个模式上。 #匹配一个词或多个词,*只匹配一个词。

应用场景:

iphone促销活动可以接收主题为多种iPhone的消息,如iphone12、iphone13等。

 

6. 远程过程调用(RPC)

远程过程调用(RPC)

在远程计算机上运行功能并等待结果。

应用场景:

需要等待接口返回数据,如订单支付。

 

7. 发布者确认(Publisher Confirms) 与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。

在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。

应用场景:

对于消息可靠性要求较高,比如钱包扣款。

   

III. 实战代码 1. 准备工作

首先,我们需要加入rabbitmq的amqp client依赖

com.rabbitmq amqp-client 5.12.0

其次,我们需要编写一个连接mq和通道的工具类ConnectionUtils,如下:

package com.mcp.lab.mq.rabbit.common.util; import com.mcp.lab.mq.rabbit.common.domain.ConnInfo; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtils { /** * 连接器(默认) * * @return * @throws IOException * @throws TimeoutException */ public static Connection getConnection() throws IOException, TimeoutException { ConnInfo connInfo = new ConnInfo.Builder() .setHost("Your RabbitMQ Broker Host") .setPort(5672) .setVirtualHost("Your Virtual Host(自定义)") .setUsername("your rabbit admin user") .setPassword("your rabbit admin password") .build(); return getConnection(connInfo); } }

 

2. 简单模式实例 生产者(Producer)代码 ConsoleSender import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class ConsoleSender { private static final String QUIT = "Q"; public static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); // 创建队列声明 // queue:队列名 // durable:是否持久化 // exclusive:是否排外 即只允许该channel访问该队列 一般等于true的话用于一个队列只能有一个消费者来消费的场景 // autoDelete:是否自动删除 消费完删除 // arguments:其他属性 AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 输入发送的消息 Scanner input = new Scanner(System.in); String msg = ""; while (true) { System.out.print("请输入发送的消息: "); msg = input.nextLine(); if (QUIT.equals(msg.toUpperCase())) { break; } // exchange,队列,参数,消息字节体 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("Producer发送的消息: " + msg); } // 清理工作 channel.close(); connection.close(); } } 消费者(Consumer)代码 SimpleReceiver import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class SimpleReceiver { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [Consumer] Received from queue - '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } 运行结果: 1. 首先运行结果生产者(ConsoleSender) 我们在控制台的输入如下: 请输入发送的消息: 789 Producer发送的消息: 789 请输入发送的消息: 111 Producer发送的消息: 111 请输入发送的消息: q 2. 其次运行消费者(SimpleReceiver) 显示如下: [Consumer] Received from queue - 'simple_queue':'789' [Consumer] Received from queue - 'simple_queue':'111'

 

3. 工作队列模式实例 生产者(Producer)代码 WorkQueueSender import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkQueueSender { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { String message = "work mode message" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[Producer] Sent '" + message + "'"); Thread.sleep(i * 10); } channel.close(); connection.close(); } } 消费者代码(模拟2个消费者) WorkQueueReceiver1 import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class WorkQueueReceiver1 { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一时刻服务器只会发送一条消息给消费者 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [Work Consumer 1] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }

WorkQueueReceiver2

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class WorkQueueReceiver2 { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一时刻服务器只会发送一条消息给消费者 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [Work Consumer 2] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } 运行结果: 1. Producer运行: [Producer] Sent 'work mode message0' ... [Producer] Sent 'work mode message99' --------------------------------------------------- 2. Consumer1运行: [Work Consumer 1] Received 'work_queue':'work mode message0' [Work Consumer 1] Received 'work_queue':'work mode message2' ... [Work Consumer 1] Received 'work_queue':'work mode message98' --------------------------------------------------- 3. Consumer2运行: [Work Consumer 2] Received 'work_queue':'work mode message1' [Work Consumer 2] Received 'work_queue':'work mode message3' ... [Work Consumer 2] Received 'work_queue':'work mode message99'

注: 从上面结果可以看出,2个消费者以抢占的方式消费消息且不重复。

 

4. 发布订阅模式实例 生产者(Producer)代码 ConsolePublishSender import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.nio.charset.StandardCharsets; import java.util.Scanner; public class ConsolePublishSender { private static final String QUIT = "Q"; private static final String EXCHANGE_NAME = "publish_logs"; public static void main(String[] argv) throws Exception { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 输入发送的消息 Scanner input = new Scanner(System.in); String msg = ""; while (true) { System.out.print("请输入发送的消息: "); msg = input.nextLine(); if (QUIT.equals(msg.toUpperCase())) { break; } channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println(" [Publisher] Sent '" + msg + "'"); } channel.close(); connection.close(); } } 消费者代码(模拟2个消费者) SubscribeReceive1 import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; public class SubscribeReceive1 { private static final String EXCHANGE_NAME = "publish_logs"; public static void main(String[] argv) throws Exception { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 订阅消息的回调函数 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [Subscriber 1] Received '" + message + "'"); }; // 消费者,有消息时触发订阅回调函数 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }

SubscribeReceive2

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; public class SubscribeReceive2 { private static final String EXCHANGE_NAME = "publish_logs"; public static void main(String[] argv) throws Exception { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 订阅消息的回调函数 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [Subscriber 2] Received '" + message + "'"); }; // 消费者,有消息时触发订阅回调函数 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } 运行结果: 1. Producer运行: 请输入发送的消息: topic 1 [Publisher] Sent 'topic 1' 请输入发送的消息: topic 2 [Publisher] Sent 'topic 2' 请输入发送的消息: haha [Publisher] Sent 'haha' 请输入发送的消息: q --------------------------------------------------- 2. Consumer1运行: [*] Waiting for messages. To exit press CTRL+C [Subscriber 1] Received '发布的主题信息' [Subscriber 1] Received 'topic 1' [Subscriber 1] Received 'topic 2' [Subscriber 1] Received 'haha' --------------------------------------------------- 3. Consumer2运行: [*] Waiting for messages. To exit press CTRL+C [Subscriber 2] Received '发布的主题信息' [Subscriber 2] Received 'topic 1' [Subscriber 2] Received 'topic 2' [Subscriber 2] Received 'haha'

注: 多个接收者接收到一模一样的消息。该模式用于多个消费者订阅同一个主题。

 

5. 路由模式实例 生产者(Producer)代码

ConsoleRouteSender

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class ConsoleRouteSender { private static final String QUIT = "Q"; private final static String EXCHANGE_NAME = "exchange_direct"; private final static String EXCHANGE_TYPE = "direct"; public static void main(String[] args) throws IOException, TimeoutException { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); // 交换机声明 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); // 输入发送的消息 Scanner input = new Scanner(System.in); String msg = ""; while (true) { System.out.print("请输入发送的消息: "); msg = input.nextLine(); if (QUIT.equals(msg.toUpperCase())) { break; } // 只有routingKey相同的才会消费 channel.basicPublish(EXCHANGE_NAME, "key2", null, msg.getBytes()); //channel.basicPublish(EXCHANGE_NAME, "key", null, msg.getBytes()); System.out.println("[Route Producer] Sent '" + msg + "'"); } channel.close(); connection.close(); } } 消费者代码(模拟2个消费者)

RouteReceiver1

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class RouteReceiver1 { private final static String QUEUE_NAME = "queue_routing"; private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 指定路由的key,接收key和key2 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [Route Consumer 1] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }

RouteReceiver2

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class RouteReceiver2 { private final static String QUEUE_NAME = "queue_routing2"; private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 仅接收key2 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [Route Consumer 2] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } 运行结果: 1. Producer运行: 请输入发送的消息: abc [Route Producer] Sent 'abc' 请输入发送的消息: test [Route Producer] Sent 'test' 请输入发送的消息: q --------------------------------------------------- 2. Consumer1运行: [Route Consumer 1] Received 'key2':'abc' [Route Consumer 1] Received 'key2':'test' --------------------------------------------------- 3. Consumer2运行: [Route Consumer 2] Received 'key2':'abc' [Route Consumer 2] Received 'key2':'test' 如果把sender中的key2改成key,运行结果如下: 请输入发送的消息: 123 [Route Producer] Sent '123' 请输入发送的消息: 456 [Route Producer] Sent '456' 请输入发送的消息: 789 [Route Producer] Sent '789' 请输入发送的消息: q [Route Consumer 1] Received 'key':'123' [Route Consumer 1] Received 'key':'456' [Route Consumer 1] Received 'key':'789' consumer2没有数据,因为route key没有匹配。

 

6. 主题模式实例 生产者(Producer)代码

SimpleTopicSender

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class SimpleTopicSender { private final static String EXCHANGE_NAME = "exchange_topic"; private final static String EXCHANGE_TYPE = "topic"; public static void main(String[] args) throws IOException, TimeoutException { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); String message = "topics model message with key.1"; channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes()); System.out.println("[Producer] Sent '" + message + "'"); String message2 = "topics model message with key.1.2"; channel.basicPublish(EXCHANGE_NAME, "key.1.2", null, message2.getBytes()); System.out.println("[Producer] Sent '" + message2 + "'"); channel.close(); connection.close(); } } 消费者代码(模拟2个消费者)

TopicReceiver1

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class TopicReceiver1 { private final static String QUEUE_NAME = "queue_topic"; private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 可以接收key.1 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [Consumer 1] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }

TopicReceiver2

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class TopicReceiver2 { private final static String QUEUE_NAME = "queue_topic2"; private final static String EXCHANGE_NAME = "exchange_topic"; private final static String EXCHANGE_TYPE = "topic"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { // 获取一个连接 Connection connection = ConnectionUtils.getConnection(); // 从连接获取一个通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // *号代表单个单词,可以接收key.1 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*"); // #号代表多个单词,可以接收key.1.2 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.#"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [Consumer 2] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } 运行结果: 1. Producer运行: [Producer] Sent 'topics model message with key.1' [Producer] Sent 'topics model message with key.1.2' --------------------------------------------------- 2. Consumer1运行: [Consumer 1] Received 'key.1':'topics model message with key.1' --------------------------------------------------- 3. Consumer2运行: [Consumer 2] Received 'key.1':'topics model message with key.1' [Consumer 2] Received 'key.1.2':'topics model message with key.1.2'


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有