两种实现方式

您所在的位置:网站首页 怎么看苹果id的消费记录清单 两种实现方式

两种实现方式

2024-07-02 14:01:12| 来源: 网络整理| 查看: 265

版本:kafka_2.11-1.1.0

本文提供两种方式来查看消费者组的消费情况,分别通过命令行和 java api 的方式来消费 __consumer_offsets 。

一、通过命令行来查看消费情况

查看 kafka 消费者组列表:

代码语言:javascript复制./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --list

查看 kafka 中某一个消费者组的消费情况:

代码语言:javascript复制./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --group test-17 --describe

test-17 是消费者组。

在上面这张图中,我们可以看到该消费者组消费的 topic、partition、当前消费到的 offset 、最新 offset 、LAG(消费进度) 等等。如果消费者的 offset 很长时间没有提交导致 LAG 越来越大,则证明消费 Kafka 的服务异常。

消费者组消费 topic 的元数据信息,在旧版本里面是存储在 zookeeper 中,但由于 zookeeper 并不适合大批量的频繁写入操作,新版 kafka 已将消费者组的元数据信息保存在 kafka 内部的 topic 中,即 __consumer_offsets ,并提供了 kafka-console-consumer.sh 脚本供用户查看消费者组的元数据信息。

那么如何使用 kafka 提供的脚本查询某消费者组的元数据信息呢?

__consumer_offsets 默认有 50 个 partition,kafka 会根据 group.id 的 hash 值选择往哪个 partition 里面存放该 group 的元数据信息。计算 group.id 对应的 partition 的公式为:

代码语言:javascript复制Math.abs(groupID.hashCode()) % numPartitions

举例:Math.abs("test-17".hashCode()) % 50,其中 test-17 是 group.id 。

找到 group.id 对应的 partition 后,就可以指定分区消费了:

代码语言:javascript复制./bin/kafka-console-consumer.sh --bootstrap-server message-1:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --partition 45

kafka 0.11.0.0 版本(含)之前需要使用 formatter 为 kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter , 0.11.0.0 版本以后(含)使用上面脚本中使用的 Class 。

脚本执行后输出的元数据信息有:

代码语言:javascript复制[test-17,topic-test,0]::[OffsetMetadata[36672,NO_METADATA],CommitTime 1599746660064,ExpirationTime 1599833060064]

[消费者组 : 消费的topic : 消费的分区] :: [offset位移], [offset提交时间], [元数据过期时间]

二、通过 java api 来查看消费情况,方便做告警监控使用 java api 来查看 __consumer_offsets 元数据信息,更加灵活方便。比如我就用了以下方式做了消费者组消费的告警监控。

pom 依赖:

代码语言:javascript复制 org.apache.kafka kafka_2.11 1.1.0 kafka-clients org.apache.kafka slf4j-log4j12 org.slf4j org.apache.kafka kafka-clients 1.1.0

消费者配置:

__consumer_offsets 中的数据需要用字节来转码,所以消费者配置中需要设置 ByteArrayDeserializer 序列化和反序列化:

代码语言:javascript复制props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

消费者示例代码:

代码语言:javascript复制/** * 指定分区消费__consumer_offsets */ public void consumerTopic() { consumer.assign(Arrays.asList(new TopicPartition("__consumer_offsets", 45))); ConsumerRecords records; while (true) { records = consumer.poll(500); for (ConsumerRecord record : records) { BaseKey key = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key())); if (key instanceof OffsetKey) { GroupTopicPartition partition = (GroupTopicPartition) key.key(); String topic = partition.topicPartition().topic(); String group = partition.group(); log.info("consumer group: [{}], consumer topic: [{}], key: [{}]", group, topic, key.toString()); } else if (key instanceof GroupMetadataKey){ log.info("groupMetadataKey: [{}]", key.key()); //第一个参数为group id,先将key转换为GroupMetadataKey类型,再调用它的key()方法就可以获得group id GroupMetadata groupMetadata = GroupMetadataManager.readGroupMessageValue(((GroupMetadataKey) key).key(), ByteBuffer.wrap(record.value())); log.info("GroupMetadata: [{}]", groupMetadata.toString()); } } } }

只要消费者组提交位移,__consumer_offsets 里面就会增加对应的元数据信息。我们可以通过指定分区消费 __consumer_offsets ,来监控某消费者组的消费情况,避免在生产环境中消费程序假死而不自知。

小结

1、解析 __consumer_offsets 元数据信息需要设置 ByteArrayDeserializer 序列化和反序列化。

2、通过 kafka.coordinator.GroupMetadataManager 来解析元数据信息

3、__consumer_offsets 的元数据信息有两种:

[key:OffsetKey,value:OffsetAndMetadata]:保存了消费者组各 partition 的 offset 元数据信息。[key:GroupMetadataKey,value:GroupMetadata]:保存了消费者组中各个消费者的信息。


【本文地址】

公司简介

联系我们

今日新闻


点击排行

实验室常用的仪器、试剂和
说到实验室常用到的东西,主要就分为仪器、试剂和耗
不用再找了,全球10大实验
01、赛默飞世尔科技(热电)Thermo Fisher Scientif
三代水柜的量产巅峰T-72坦
作者:寞寒最近,西边闹腾挺大,本来小寞以为忙完这
通风柜跟实验室通风系统有
说到通风柜跟实验室通风,不少人都纠结二者到底是不
集消毒杀菌、烘干收纳为一
厨房是家里细菌较多的地方,潮湿的环境、没有完全密
实验室设备之全钢实验台如
全钢实验台是实验室家具中较为重要的家具之一,很多

推荐新闻


图片新闻

实验室药品柜的特性有哪些
实验室药品柜是实验室家具的重要组成部分之一,主要
小学科学实验中有哪些教学
计算机 计算器 一般 打孔器 打气筒 仪器车 显微镜
实验室各种仪器原理动图讲
1.紫外分光光谱UV分析原理:吸收紫外光能量,引起分
高中化学常见仪器及实验装
1、可加热仪器:2、计量仪器:(1)仪器A的名称:量
微生物操作主要设备和器具
今天盘点一下微生物操作主要设备和器具,别嫌我啰嗦
浅谈通风柜使用基本常识
 众所周知,通风柜功能中最主要的就是排气功能。在

专题文章

    CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭