记一次线上kafka重复消费的问题解决及思考 您所在的位置:网站首页 kafka多消费者造成重复消费 记一次线上kafka重复消费的问题解决及思考

记一次线上kafka重复消费的问题解决及思考

2023-09-04 07:42| 来源: 网络整理| 查看: 265

目录 问题排查原因生产者原因消费者可能原因1. 重复消息被分到同一消费者的同一批次处理2. 重复消息被分到同一消费者的不同批次处理3. 重复消息被几乎同时分到不同消费者处理 解决方案1.对批量消息进行去重2. 对分配到不同消费者的消息进行redis去重3. 生产者在发送消息前先进行路由分区 消费超时发送重平衡导致重复消费kafka消费原理简介enable.auto.commitauto.commit.interval.mssession.timeout.msmax.poll.interval.msmax.poll.records重复消费原因解决办法 其他poll(5000)中5000的含义如果poll处理时发生异常怎么办?

问题

线上ELK日志发现kafka消费者消费到重复消息

排查原因 生产者原因

由于生产方本身就发送了重复的消息,导致消费到重复消息

消费者可能原因

消费方采用的是循环poll的模式,具体是在多线程分租户去批量处理的消息

while(true) { ConsumerRecords consumerRecords; do { consumerRecords = this.consumer.poll(60000L); } while(consumerRecords == null); if (!consumerRecords.isEmpty()) { long beginTime = System.currentTimeMillis(); Map recordMap = this.consumerHandler.slice(consumerRecords); List sliceFutureList = this.submitToExecutorService(recordMap); this.checkReinstanceConsumerExecutorService(unDoneSliceFutures); sliceFutureList.forEach((item) -> { unDoneSliceFutures.add(item.getFuture()); }); if (this.checkSliceFuture(beginTime, sliceFutureList, unDoneSliceFutures)) { this.commitOffset(this.consumer, consumerRecords); LOGGER.info("消费消息(kafka-core)- 异常时提交偏移量"); } else { this.commitOffset(this.consumer, consumerRecords); } } }

已有的幂等处理:在对poll出来的同一批次消息做分组时(按照同一租户),根据消息唯一业务字段标志sessionId去数据库中查询是否已有改消息,如果有就不处理。 消费业务逻辑处理时间:假设为5s,最后一步才对消息落库。

发生重复消费的原因可能有以下这几种情况

1. 重复消息被分到同一消费者的同一批次处理

重复的消息在某一个消费者同一poll的消息批次。假如该消息之前都没有入过库,那么这个时候根据sessionId去数据库查询,是查询不到的,所以重复消息的幂等校验就失效了,会造成重复消费。

2. 重复消息被分到同一消费者的不同批次处理

重复的消息在某一个消费者不同poll的消息批次,即两条消息是第一条执行完后第二条才执行的。由于第一条已经入库,所以上述的幂等校验就起作用了,第二条消息消费时,不会重复消费。

3. 重复消息被几乎同时分到不同消费者处理

重复的消息被分到不同的消费者,并且几乎同时处理消息。这样会造成,上述的幂等校验失效,因为查询数据库时,此时消息都未落库(该消息之前尚未被消费过),根据sessionId去过滤,查不到,所以两个消费者都会对该消息做处理。

解决方案 1.对批量消息进行去重

针对同一消费者同一批次的重复消息,在分组时先进行根据唯一标志sessionId进行去重。

public Map transferMealResourceDeduct(List eventList) { if (CollectionUtils.isEmpty(eventList)) { return Collections.emptyMap(); } //做幂等去重 eventList = eventList.stream() .filter(e -> StringUtils.isNotBlank(e.getSessionId())) .collect( Collectors.collectingAndThen( Collectors.toCollection( () -> new TreeSet(Comparator.comparing(MealResourceTextRobotEventDto::getSessionId))), ArrayList::new)); } 2. 对分配到不同消费者的消息进行redis去重

消费者执行消费前,先把唯一标识sessionId,放入到redis,如果已存在,那么就不进行处理

3. 生产者在发送消息前先进行路由分区

生产者发送消息按照唯一标识相关字段进行分区,如租户id,这样可以保证重复的消息都在同一分区。由于patition只能被同一消费者组的某一个消费者消费,所以可以保证重复消息不会被多个消费者消费。这种情况如果是多个消费者组就不适用了。

消费超时发送重平衡导致重复消费

以上都是项目实际情况,产生原因是因为生产者发送了重复消息,而消费方未对各种情况做幂等校验。如果生产者本身没有发送重复消息,消费者会不会发送重复消费呢?答案是也有可能。

kafka消费原理简介

先介绍几个关键的配置参数

enable.auto.commit

是否自动提交,默认为true

auto.commit.interval.ms

如果设置了 enable.auto.commit的值为true,则该值定义了消费者偏移量向Kafka提交的频率。默认为5000ms,即5s。kafka consumer 是在每次 poll () 之前去判断:是否已经到了要 commint 的时间( commitTime consumerRecords = this.consumer.poll(60000L); } while(consumerRecords == null); if (!consumerRecords.isEmpty()) { long beginTime = System.currentTimeMillis(); Map recordMap = this.consumerHandler.slice(consumerRecords); //提交线程池,并返回执行结果Future List sliceFutureList = this.submitToExecutorService(recordMap); this.checkReinstanceConsumerExecutorService(unDoneSliceFutures); sliceFutureList.forEach((item) -> { unDoneSliceFutures.add(item.getFuture()); }); if (this.checkSliceFuture(beginTime, sliceFutureList, unDoneSliceFutures)) { this.commitOffset(this.consumer, consumerRecords); LOGGER.info("消费消息(kafka-core)- 异常时提交偏移量"); } else { this.commitOffset(this.consumer, consumerRecords); } } }

上述的例子中就是线程池多线程处理,如果消费线程发生异常,也会执行commitOffset。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有