RocketMQ中的Consumer及ConsumerGroup详解 您所在的位置:网站首页 rocketmq分组作用 RocketMQ中的Consumer及ConsumerGroup详解

RocketMQ中的Consumer及ConsumerGroup详解

2024-04-27 08:28| 来源: 网络整理| 查看: 265

一、基本概念消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

消费者组(Consumer Group)

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

集群消费(Clustering)

集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

广播消费(Broadcasting)

广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

二、订阅关系一致

订阅关系一致指的是同一个消费者Group ID下所有Consumer实例所订阅的Topic、Tag必须完全一致。如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

背景信息

消息队列RocketMQ版里的一个消费者Group ID代表一个Consumer实例群组。对于大多数分布式应用来说,一个消费者Group ID下通常会挂载多个Consumer实例。

由于消息队列RocketMQ版的订阅关系主要由Topic+Tag共同组成,因此,保持订阅关系一致意味着同一个消费者Group ID下所有的Consumer实例需在以下方面均保持一致:

订阅的Topic必须一致,例如:Consumer1订阅TopicA和TopicB,Consumer2也必须订阅TopicA和TopicB,不能只订阅TopicA、只订阅TopicB或订阅TopicA和TopicC。 订阅的同一个Topic中的Tag必须一致,包括Tag的数量和Tag的顺序,例如:Consumer1订阅TopicB且Tag为Tag1||Tag2,Consumer2订阅TopicB的Tag也必须是Tag1||Tag2,不能只订阅Tag1、只订阅Tag2或者订阅Tag2||Tag1。

正确的订阅关系如下,多个Group ID分别订阅了不同的Topic,但是同一个Group ID下的多个Consumer实例C1、C2、C3订阅的Topic和Tag都一致。

三、订阅关系不一致

当RocketMQ订阅关系不一致时,消息消费的逻辑就会混乱,甚至导致消息丢失,接下来我们结合案例及源码做进一步验证分析。

RocketMQ Version : 4.8.0

消费者组内消费者订阅相同的Topic不同的Tag

RocketMQ相同消费者组内订阅相同Topic不同Tag的测试基本信息:

Topic名称: SUBSCRIBE_TEST Topic读队列数量:4 Topic写队列数量:4 Topic读写权限:6 消费者组名称: SUBSCRIBE_TEST_CONSUMER_GROUP 消息TAG类型: TagA,TagB TagA消息消费者: ConsumerForTagA TagB消息消费者: ConsumerForTagB 消费者 ConsumerForTagA订阅信息: 订阅TopicSUBSCRIBE_TEST 和tagA ConsumerForTagB订阅信息: 订阅TopicSUBSCRIBE_TEST 和tagB 消费者代码示例:public class ConsumerForTagA { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SUBSCRIBE_TEST_CONSUMER_GROUP"); consumer.setNamesrvAddr("172.16.1.15:9876"); consumer.subscribe("SUBSCRIBE_TEST", "tagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("Receive New Messages: %s %n", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("ConsumerForTagA started!"); } } public class ConsumerForTagB { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SUBSCRIBE_TEST_CONSUMER_GROUP"); consumer.setNamesrvAddr("172.16.1.15:9876"); consumer.subscribe("SUBSCRIBE_TEST", "tagB"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("Receive New Messages: %s %n", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("ConsumerForTagB started!"); } } 先启动ConsumerForTagA再启动ConsumerForTagB生产者

生产者向SUBSCRIBE_TESTTopic中写入8条消息,前4条消息的TAG为tagA,后4条消息的TAG为tagB.

生产者代码示例public class SyncProducer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer("SUBSCRIBE_TEST_PRODUCER_GROUP"); producer.setNamesrvAddr("172.16.1.15:9876"); producer.setSendMsgTimeout(15000); producer.start(); for (int i = 0; i


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有