Spring Boot集成Kafka动态创建消费者并实现多消费者发布订阅模型 您所在的位置:网站首页 不能创建xmlhtrequest对象 Spring Boot集成Kafka动态创建消费者并实现多消费者发布订阅模型

Spring Boot集成Kafka动态创建消费者并实现多消费者发布订阅模型

2023-03-24 12:03| 来源: 网络整理| 查看: 265

在Spring Boot集成Kafka时,大家都知道可以使用@KafkaListener注解创建消费者。但是@KafkaListener注解是静态的,意味着在编译时就已经确定了消费者,无法动态地创建消费者。

不过事实上,使用Kafka提供的Java API,使用KafkaConsumer类就可以完成消费者的动态创建。

我们也知道在一个消费者组中,同一条消息只会被消费一次。而动态创建消费者的情景也通常是满足动态的发布订阅模型(一个发布者,但是可能有不定量的消费者),所以在这里我们使每个动态创建的消费者的消费者组也不一样即可。

下面,我们就来实现一下这个功能。

1,创建消费者对象

我们可以定义一个“消费者工厂”类,专门用于创建Kafka消费者对象,如下:

package com.gitee.swsk33.kafkadynamicconsumer.factory; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.stereotype.Component; import java.util.Collections; import java.util.Properties; @Component public class KafkaDynamicConsumerFactory { @Autowired private KafkaProperties kafkaProperties; @Value("${spring.kafka.consumer.key-deserializer}") private String keyDeSerializerClassName; @Value("${spring.kafka.consumer.value-deserializer}") private String valueDeSerializerClassName; /** * 创建一个Kafka消费者 * * @param topic 消费者订阅的话题 * @param groupId 消费者组名 * @return 消费者对象 */ public KafkaConsumer createConsumer(String topic, String groupId) throws ClassNotFoundException { Properties consumerProperties = new Properties(); // 设定一些关于新的消费者的配置信息 consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); // 设定新的消费者的组名 consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 设定反序列化方式 consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Class.forName(keyDeSerializerClassName)); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Class.forName(valueDeSerializerClassName)); // 新建一个消费者 KafkaConsumer consumer = new KafkaConsumer(consumerProperties); // 使这个消费者订阅对应话题 consumer.subscribe(Collections.singleton(topic)); return consumer; } } 复制代码

可见这里我们注入了配置文件中反序列化的配置,并用于新创建的消费者对象。

2,使用定时任务实现消费者实时订阅

上面仅仅是创建了消费者,但是消费者接收消息以及处理消息的操作,也是需要我们手动定义的。

如何让创建的消费者都去不停的接收并处理我们的消息呢?大致思路如下:

使用定时任务,在定时任务中使消费者不停地接收并处理消息与此同时,将每个定时任务和消费者都存起来,后面在消费者不需要的时候可以移除它们并关闭定时任务

这里,我们编写一个上下文类,用于存放所有的消费者定时任务,并编写增加和移除定时任务的方法:

package com.gitee.swsk33.kafkadynamicconsumer.context; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Map; import java.util.concurrent.*; /** * Kafka消费者任务上下文 */ public class KafkaConsumerContext { /** * 存放所有自己创建的Kafka消费者任务 * key: groupId * value: kafka消费者任务 */ private static final Map> scheduleMap = new ConcurrentHashMap(); /** * 任务调度器,用于定时任务 */ private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(24); /** * 添加一个Kafka消费者任务 * * @param groupId 消费者的组名 * @param consumer 消费者对象 * @param 消息键类型 * @param 消息值类型 */ public static void addConsumerTask(String groupId, KafkaConsumer consumer) { // 创建定时任务 // 每隔1s拉取消息并处理 ScheduledFuture future = executor.scheduleAtFixedRate(() -> { // 拉取消息 ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { // 自定义处理每次拉取的消息 System.out.println(record.value()); } }, 0, 1, TimeUnit.SECONDS); // 将任务和存入对应的列表以后续管理 consumerMap.put(groupId, consumer); scheduleMap.put(groupId, future); } /** * 移除Kafka消费者定时任务并关闭消费者订阅 * * @param groupId 消费者的组名 */ public static void removeConsumerTask(String groupId) { if (!consumerMap.containsKey(groupId)) { return; } // 取出对应的消费者与任务,并停止 KafkaConsumer consumer = consumerMap.get(groupId); ScheduledFuture future = scheduleMap.get(groupId); consumer.close(); future.cancel(true); // 移除列表中的消费者和任务 consumerMap.remove(groupId); scheduleMap.remove(groupId); } } 复制代码

在增加消费者定时任务的方法中,调用消费者对象的poll方法能够拉取一次消息,一次通常可能拉取到多条消息,遍历并处理即可。这样在定时任务中,我们每隔一段时间就拉取一次消息并处理,就实现了消费者实时订阅消息的效果。

除此之外,在使用定时任务时,即ScheduledExecutorService对象的scheduleAtFixedRate方法,可以实现每隔一定的时间执行一次任务,上述第一个参数传入Runnable接口的实现类,这里使用匿名内部类传入,即自定义的任务,第二个参数是启动延迟时间,第三个参数是每隔多长时间重复执行任务,第四个参数是时间单位。该方法返回一个任务对象,通过这个对象的cancel方法可以取消掉任务。

在移除消费者定时任务方法中,调用消费者对象的close方法即可关闭消费者取消订阅。

3,编写个API测试

现在编写一个API测试一下效果:

package com.gitee.swsk33.kafkadynamicconsumer.api; import com.gitee.swsk33.kafkadynamicconsumer.context.KafkaConsumerContext; import com.gitee.swsk33.kafkadynamicconsumer.factory.KafkaDynamicConsumerFactory; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 消息测试api */ @RestController @RequestMapping("/api/kafka") public class KafkaTestAPI { @Autowired private KafkaTemplate kafkaTemplate; @Autowired private KafkaDynamicConsumerFactory factory; @GetMapping("/send") public String send() { kafkaTemplate.send("my-topic", "hello!"); return "发送完成!"; } @GetMapping("/create/{groupId}") public String create(@PathVariable String groupId) throws ClassNotFoundException { // 这里统一使用一个topic KafkaConsumer consumer = factory.createConsumer("my-topic", groupId); KafkaConsumerContext.addConsumerTask(groupId, consumer); return "创建成功!"; } @GetMapping("/remove/{groupId}") public String remove(@PathVariable String groupId) { KafkaConsumerContext.removeConsumerTask(groupId); return "移除成功!"; } } 复制代码

现在依次访问/api/kafka/create/a和/api/kafka/create/b,就创建了两个消费者,然后访问/api/kafka/send发送消息,结果如下:

可见,两个消费者都接收到了消息。

4,总结

可见要动态地创建Kafka消费者,只需创建并设置好Kafka消费者对象,并使用定时任务使它们一直拉取消息,就可以实现发布订阅的效果。当然,我们要管理好创建的所有的消费者和定时任务,防止资源浪费。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有