Kafka 提交位移的正确姿势 您所在的位置:网站首页 kafka自动提交重复消费问题解决 Kafka 提交位移的正确姿势

Kafka 提交位移的正确姿势

2023-08-12 15:24| 来源: 网络整理| 查看: 265

kafka在创建topic的时候,可以指定分区数,然后Producer发送消息后,通过负载均衡将消息分配给一个分区,各个分区间的offset也是相互独立,consumer提交位移也是按照分区进行区分的;

比如,主题为foo,分区数为3,经过consumer提交唯一后,可能是如下的状态。

image.png

consumer默认采用自动提交位移,每隔5秒,consumer自动提交一次位移,但是由于自动提交位移可能导致消息丢失,一般都是关闭自动提交位移,采用手动提交的方式,手动提交的方式非常灵活,不受任何限制,broker端也会无脑的接受consumer提交的位移,即,如果我们实际消费的消息位移是15,但是我们手动提交了20,那么broker端就会认为offset前20条消息都已经消费成功,不会报错。

手动提交位移策略 关闭自动提交位移参数enable.auto.commit=false 手动提交位移api 同步 void commitSync(); void commitSync(Duration timeout); void commitSync(Map offsets); void commitSync(final Map offsets, final Duration timeout); 复制代码

调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。

我们可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。

异步 void commitAsync(); void commitAsync(OffsetCommitCallback callback); void commitAsync(Map offsets, OffsetCommitCallback callback); 复制代码

调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等

commitAsync 是否能够替代 commitSync 呢?

答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

同步异步结合提交方式

没有出现异常时,使用异步提交的方式,如果发生了异常,使用同步提价,利用同步提交的重试策略,帮我们自动进行重试。代码如下:

try { while (true) { ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(100)); handlerRecord(records); kafkaConsumer.commitAsync(); } } catch ( Exception e) { errHandler(e); } finally { try { kafkaConsumer.commitSync(); } catch (Exception e) { kafkaConsumer.close(); } } 复制代码 细粒度提交offset

kafka默认是提交所有pool下来的消息,如果我们一次性拉取了大量的消息,我们可以分段提交位置,就像将一个大事务切分成一个小事务一个一个执行一样,这样做可以减小出错时的恢复时间。

最开始的时候提到kafka的offset管理是每个分区隔离的,所以我们可以对每个分区的消息处理树分别进行统计然后分别提交。

具体代码如下:

Map offsets = new HashMap(); AtomicInteger count = new AtomicInteger(); try { ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(100)); records.forEach(record -> { handlerRecord(record); offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())); count.getAndIncrement(); // 分区数处理的消息达到100后,进行提交 if (count.get() % 100 == 0) { kafkaConsumer.commitAsync(offsets, null); } }); } catch (Exception e) { errHandler(e); } finally { try { kafkaConsumer.commitSync(); } catch (Exception e) { kafkaConsumer.close(); } } 复制代码


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有