如何使用spring boot整合kafka和延迟启动消费者 | 您所在的位置:网站首页 › kafka消费延时 › 如何使用spring boot整合kafka和延迟启动消费者 |
如何使用spring boot整合kafka和延迟启动消费者
发布时间:2021-08-09 11:43:24
来源:亿速云
阅读:274
作者:小新
栏目:开发技术
这篇文章给大家分享的是有关如何使用spring boot整合kafka和延迟启动消费者的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。 spring boot 整合kafka,延迟启动消费者spring boot整合kafka的时候一般使用@KafkaListener来设置消费者,但是这种方式在spring启动的时候就会立即开启消费者。如果有需要根据配置信息延迟开启指定的消费者就不能使用这种方式。 为了方便使用,我自定义了一个注解:import org.springframework.kafka.annotation.TopicPartition; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target({ ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface DelayKafkaConsumer { String id() default ""; String[] topics() default {}; String errorHandler() default ""; String groupId() default ""; TopicPartition[] topicPartitions() default {}; String beanRef() default "__listener"; }配合注解使用的factory:import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.Advised; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.ObjectFactory; import org.springframework.beans.factory.config.*; import org.springframework.context.expression.StandardBeanExpressionResolver; import org.springframework.core.MethodIntrospector; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.core.convert.converter.Converter; import org.springframework.core.convert.converter.GenericConverter; import org.springframework.format.Formatter; import org.springframework.format.FormatterRegistry; import org.springframework.format.support.DefaultFormattingConversionService; import org.springframework.kafka.annotation.KafkaListenerConfigurer; import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.config.KafkaListenerEndpoint; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.messaging.converter.GenericMessageConverter; import org.springframework.messaging.handler.annotation.support.*; import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; @Service public class MyKafkaConsumerFactory implements KafkaListenerConfigurer,BeanFactoryAware { private static final Logger logger = LoggerFactory.getLogger(MyKafkaConsumerFactory.class); private KafkaListenerEndpointRegistrar kafkaListenerEndpointRegistrar; private final AtomicInteger counter = new AtomicInteger(); private BeanFactory beanFactory; private BeanExpressionResolver resolver = new StandardBeanExpressionResolver(); private BeanExpressionContext expressionContext; private final ListenerScope listenerScope = new ListenerScope(); private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory = new KafkaHandlerMethodFactoryAdapter(); @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { this.kafkaListenerEndpointRegistrar = registrar; addFormatters(messageHandlerMethodFactory.defaultFormattingConversionService); } public void startConsumer(KafkaListenerEndpoint endpoint){ kafkaListenerEndpointRegistrar.registerEndpoint(endpoint); } public void startConsumer(Object target){ logger.info("start consumer {} ...",target.getClass()); Class targetClass = AopUtils.getTargetClass(target); Map annotatedMethods = MethodIntrospector.selectMethods(targetClass, new MethodIntrospector.MetadataLookup() { @Override public Set inspect(Method method) { Set listenerMethods = findListenerAnnotations(method); return (!listenerMethods.isEmpty() ? listenerMethods : null); } }); if (annotatedMethods.size()==0) throw new IllegalArgumentException(target.getClass()+" need have method with @DelayKafkaConsumer"); for (Map.Entry entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); logger.info("find message listen handler method : {} , object : {}",method.getName(),target.getClass()); for (DelayKafkaConsumer listener : entry.getValue()) { if(listener.topics().length==0) { logger.info("topics value is empty , will skip it , method : {} , target object : {}",method.getName(),target.getClass()); continue; } processKafkaListener(listener,method,target); logger.info("register method {} success , target object : {}",method.getName(),target.getClass()); } } logger.info("{} consumer start complete .",target.getClass()); } protected void processKafkaListener(DelayKafkaConsumer kafkaListener, Method method, Object bean) { Method methodToUse = checkProxy(method, bean); MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint(); endpoint.setMethod(methodToUse); endpoint.setBeanFactory(this.beanFactory); String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler"); if (StringUtils.hasText(errorHandlerBeanName)) { endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class)); } processListener(endpoint, kafkaListener, bean, methodToUse); } protected void processListener(MethodKafkaListenerEndpoint endpoint, DelayKafkaConsumer kafkaListener, Object bean, Object adminTarget) { String beanRef = kafkaListener.beanRef(); if (StringUtils.hasText(beanRef)) { this.listenerScope.addListener(beanRef, bean); } endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(kafkaListener)); endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId())); endpoint.setTopics(resolveTopics(kafkaListener)); endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener)); kafkaListenerEndpointRegistrar.registerEndpoint(endpoint); if (StringUtils.hasText(beanRef)) { this.listenerScope.removeListener(beanRef); } } private String getEndpointId(DelayKafkaConsumer kafkaListener) { if (StringUtils.hasText(kafkaListener.id())) { return resolve(kafkaListener.id()); } else { return "Custom-Consumer" + this.counter.getAndIncrement(); } } private String getEndpointGroupId(DelayKafkaConsumer kafkaListener, String id) { String groupId = null; if (StringUtils.hasText(kafkaListener.groupId())) { groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId"); } if (groupId == null && StringUtils.hasText(kafkaListener.id())) { groupId = id; } return groupId; } private String[] resolveTopics(DelayKafkaConsumer kafkaListener) { String[] topics = kafkaListener.topics(); List result = new ArrayList(); if (topics.length > 0) { for (int i = 0; i 0) { for (TopicPartition topicPartition : topicPartitions) { result.addAll(resolveTopicPartitionsList(topicPartition)); } } return result.toArray(new TopicPartitionInitialOffset[result.size()]); } private List resolveTopicPartitionsList(TopicPartition topicPartition) { Object topic = resolveExpression(topicPartition.topic()); Assert.state(topic instanceof String, "topic in @TopicPartition must resolve to a String, not " + topic.getClass()); Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty"); String[] partitions = topicPartition.partitions(); PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets(); Assert.state(partitions.length > 0 || partitionOffsets.length > 0, "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'"); List result = new ArrayList(); for (int i = 0; i |
CopyRight 2018-2019 实验室设备网 版权所有 |