【Springboot+Redis】Springboot+Redis实现消息队列(生产者/消费者、发布订阅模式)

您所在的位置:网站首页 redis发布订阅模式是否支持多消费者购买行为 【Springboot+Redis】Springboot+Redis实现消息队列(生产者/消费者、发布订阅模式)

【Springboot+Redis】Springboot+Redis实现消息队列(生产者/消费者、发布订阅模式)

2024-07-16 20:16:11| 来源: 网络整理| 查看: 265

 

转载自,原文格式清晰:https://blog.csdn.net/niuchenliang524/article/details/81326238

一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式。利用redis这两种场景的消息队列都能够实现。

定义:         生产者消费者模式:生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。         发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。

Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:

由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。

在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部(left)和尾部(right)添加新的元素。在插入时,如果该键并不存在,Redis将为该键创建一个新的链表。与此相反,如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是4294967295。 从元素插入和删除的效率视角来看,如果我们是在链表的两头插入或删除元素,这将会是非常高效的操作,即使链表中已经存储了百万条记录,该操作也可以在常量时间内完成。然而需要说明的是,如果元素插入或删除操作是作用于链表中间,那将会是非常低效的。相信对于有良好数据结构基础的开发者而言,这一点并不难理解。

Redis List的主要操作为lpush/lpop/rpush/rpop四种,分别代表从头部和尾部的push/pop,除此之外List还提供了两种pop操作的阻塞版本blpop/brpop,用于阻塞获取一个对象。

Redis通常都被用做一个处理各种后台工作或消息任务的消息服务器。 一个简单的队列模式就是:生产者把消息放入一个列表中,等待消息的消费者用 RPOP 命令(用轮询方式), 或者用 BRPOP 命令(如果客户端使用阻塞操作会更好)来得到这个消息。

1、Redis 生产者/消费者模式实现消息队列的简单例子(此外用的是jedisCluster连接redis)

springboot的application-dev.yml中redis的配置如下:

redis:     database: 0     password: # Redis服务器连接密码(默认为空)     timeout: 0 # 连接超时时间(毫秒)     commandTimeout: 5000     cluster:         nodes: 10.201.1.27: 7001, 10.201.1.27: 7002, 10.201.1.27: 7003, 10.201.1.27: 7004, 10.201.1.27: 7005, 10.201.1.27: 7006

redis放入Spring容器中:

import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import java.util.HashSet; import java.util.Set; @Configuration public class RedisHostsConfig {     @Value( "${spring.redis.cluster.nodes}")     private String hosts; /** * 获取jedis链接 * @return */ @Bean public JedisCluster redisCluster(){     try{         Set nodes = new HashSet();         String[] hostArray = hosts.split( ",");         for(String host : hostArray){             String[] ipAndPort = host.split( ":");             nodes.add( new HostAndPort(ipAndPort[ 0], Integer.parseInt(ipAndPort[ 1])));         }         JedisCluster cluster = new JedisCluster(nodes);         return cluster;     } catch(Exception e){         e.printStackTrace();         return null;     } } }

 

生产者代码:

public class RedisProducer {     @Autowired     JedisCluster jedisCluster;     public static void main(String[] args){         for( int i = 0;i< 10;i++) {             jedisCluster.lpush( "listingList", "value_" + i);           }     }   }

 

import lombok.extern.slf4j.Slf4j; import redis.clients.jedis.JedisCluster; import java.util.List; @Slf4j public class NetListingConsumer extends Thread {     private static JedisCluster redisCluster;     private static JedisCluster getRedisCluster(){         if (redisCluster == null){             redisCluster = SpringContextHolder.getBean( "redisCluster");         }         return redisCluster;     }     @Override     public void run() {         JedisCluster redis = getRedisCluster();         while ( true){             //阻塞式brpop,List中无数据时阻塞,参数0表示一直阻塞下去,直到List出现数据             List listingList = redis.brpop( 0, "listingList");             log.info( "线程取数据:{}", listingList.get( 1));         }     } }

 

消费者代码:

运行消费者:

public class RedisProducer {      public static void main(String[] args){         PriceConsumer redisConsumer = new PriceConsumer();         redisConsumer.start();     } }

spring-redis使用RedisMessageListenerContainer进行消息监听,客户程序需要自己实现MessageListener,并以指定的topic注册到RedisMessageListenerContainer,这样,在指定的topic上如果有消息,RedisMessageListenerContainer便会通知该MessageListener。2、Redis 发布/订阅模式实现消息队列的简单例子(此外用的是redisTemplater操作redis)

在此,我是监听了四个topic,为每一个topic写了一个消息处理方法。

import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import java.util.concurrent.CountDownLatch; @Configuration public class RedisMessageListener { /** * redis消息监听器容器 * @param connectionFactory * @param priceListenerAdapter 价格趋势和市场消息订阅处理器 * @param listingListenerAdapter 挂牌案例消息订阅处理器 * @param caseListenerAdapter 交易案例消息订阅处理器 * @param foreclosureListenerAdapter 法拍案例消息订阅处理器 * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,                 MessageListenerAdapter priceListenerAdapter,                 MessageListenerAdapter listingListenerAdapter,                 MessageListenerAdapter caseListenerAdapter,                 MessageListenerAdapter foreclosureListenerAdapter) {     RedisMessageListenerContainer container = new RedisMessageListenerContainer();     container.setConnectionFactory(connectionFactory);     //监听价格趋势和市场情况主题并绑定消息订阅处理器     container.addMessageListener(priceListenerAdapter, new PatternTopic( "PRICE_TOPIC"));     //监听挂牌案例主题并绑定消息订阅处理器     container.addMessageListener(listingListenerAdapter, new PatternTopic( "LISTING_TOPIC"));     //监听交易案例主题并绑定消息订阅处理器     container.addMessageListener(caseListenerAdapter, new PatternTopic( "CASE_TOPIC"));     //监听法拍案例主题并绑定消息订阅处理器     container.addMessageListener(foreclosureListenerAdapter, new PatternTopic( "FORECLOSURE_TOPIC"));     return container; } /** * 价格趋势和市场消息订阅处理器,并指定处理方法 * @param receiver * @return */ @Bean MessageListenerAdapter priceListenerAdapter(ReceiverRedisMessage receiver) {     MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "priceReceive");     Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = this.jacksonSerializer();     messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);     return messageListenerAdapter; } /** * 挂牌案例消息订阅处理器,并指定处理方法 * @param receiver * @return */ @Bean MessageListenerAdapter listingListenerAdapter(ReceiverRedisMessage receiver) {     MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "listingReceive");     Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = this.jacksonSerializer();     messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);     return messageListenerAdapter; } /** * 交易案例消息订阅处理器,并指定处理方法 * @param receiver * @return */ @Bean MessageListenerAdapter caseListenerAdapter(ReceiverRedisMessage receiver) {     MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "caseReceive");     Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = this.jacksonSerializer();     messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);     return messageListenerAdapter; } /** * 法拍案例消息订阅处理器,并指定处理方法 * @param receiver * @return */ @Bean MessageListenerAdapter foreclosureListenerAdapter(ReceiverRedisMessage receiver) {     MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "foreclosureReceive");     Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = this.jacksonSerializer();     messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);     return messageListenerAdapter; } @Bean ReceiverRedisMessage receiver(CountDownLatch latch) {     return new ReceiverRedisMessage(latch); } @Bean CountDownLatch latch() {     return new CountDownLatch( 1); } @Bean RedisTemplate template(RedisTemplate redisTemplate) {     return redisTemplate; } private Jackson2JsonRedisSerializer jacksonSerializer(){     Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);     ObjectMapper objectMapper = new ObjectMapper();     objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);     objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);     jackson2JsonRedisSerializer.setObjectMapper(objectMapper);     return jackson2JsonRedisSerializer; } }  

 

import com.alibaba.fastjson.JSON; import com.cindata.esp.domian.casehistory.CaseHistory; import com.cindata.esp.domian.foreclosure.ForeclosureHistory; import com.cindata.esp.domian.netlisting.NetListingHistory; import com.cindata.esp.domian.pricetendency.PriceTendencySituation; import com.cindata.esp.service.ICaseService; import com.cindata.esp.service.IForeclosureService; import com.cindata.esp.service.INetListingService; import com.cindata.esp.service.IPirceTendencyService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import java.util.concurrent.CountDownLatch; @Slf4j public class ReceiverRedisMessage {     private CountDownLatch latch;     @Autowired     IPirceTendencyService priceTendencyService;     @Autowired     INetListingService netListingService;     @Autowired     ICaseService caseService;     @Autowired     IForeclosureService foreclosureService;     @Autowired     public ReceiverRedisMessage(CountDownLatch latch) {         this.latch = latch;     } /** * 价格趋势和市场队列消息接收方法 * @param jsonMsg */ public void priceReceive( String jsonMsg) {     log.info( "[开始消费REDIS消息队列PRICE_TOPIC数据...]");     try{         PriceTendencySituation priceTendencySituation = JSON.parseObject(jsonMsg, PriceTendencySituation.class);         priceTendencyService.save(priceTendencySituation);         log.info( "[消费REDIS消息队列PRICE_TOPIC数据成功.]");     } catch(Exception e){         log.error( "[消费REDIS消息队列PRICE_TOPIC数据失败,失败信息:{}]", e.getMessage());     }     latch.countDown(); } /** * 挂牌案例队列消息接收方法 * @param jsonMsg */ public void listingReceive( String jsonMsg) {     log.info( "[开始消费REDIS消息队列LISTING_TOPIC数据...]");     try{         NetListingHistory netListingHistory = JSON.parseObject(jsonMsg, NetListingHistory.class);         netListingService.save(netListingHistory);         log.info( "[消费REDIS消息队列LISTING_TOPIC数据成功.]");     } catch(Exception e){         log.error( "[消费REDIS消息队列LISTING_TOPIC数据失败,失败信息:{}]", e.getMessage());     }     latch.countDown(); } /** * 交易案例队列消息接收方法 * @param jsonMsg */ public void caseReceive( String jsonMsg) {     log.info( "[开始消费REDIS消息队列CASE_TOPIC数据...]");     try{         CaseHistory caseHistory = JSON.parseObject(jsonMsg, CaseHistory.class);         caseService.save(caseHistory);         log.info( "[消费REDIS消息队列CASE_TOPIC数据成功.]");     } catch(Exception e){         log.error( "[消费REDIS消息队列CASE_TOPIC数据失败,失败信息:{}]", e.getMessage());     }     latch.countDown(); } /** * 法拍案例队列消息接收方法 * @param jsonMsg */ public void foreclosureReceive( String jsonMsg) {     log.info( "[开始消费REDIS消息队列FORECLOSURE_TOPIC数据...]");     try{         ForeclosureHistory foreclosureHistory = JSON.parseObject(jsonMsg, ForeclosureHistory.class);         foreclosureService.save(foreclosureHistory);         log.info( "[消费REDIS消息队列FORECLOSURE_TOPIC数据成功.]");     } catch(Exception e){         log.error( "[消费REDIS消息队列FORECLOSURE_TOPIC数据失败,失败信息:{}]", e.getMessage());     }     latch.countDown(); } }

这样,应用启动时,消息的订阅方(subscriber)就注册好了。这时候只要使用一个简单的程序,模拟publisher,向指定topic发布消息,RedisMessageListener就可以接收到消息,spring-redis的写法是这样:

redisTemplate.convertAndSend("PRICE_TOPIC", "hello world!");

发布订阅模式,在集群化部署时会重复消费数据,暂时还没找到一个类似消费kafka数据手工维护偏移量的方法。



【本文地址】

公司简介

联系我们

今日新闻


点击排行

实验室常用的仪器、试剂和
说到实验室常用到的东西,主要就分为仪器、试剂和耗
不用再找了,全球10大实验
01、赛默飞世尔科技(热电)Thermo Fisher Scientif
三代水柜的量产巅峰T-72坦
作者:寞寒最近,西边闹腾挺大,本来小寞以为忙完这
通风柜跟实验室通风系统有
说到通风柜跟实验室通风,不少人都纠结二者到底是不
集消毒杀菌、烘干收纳为一
厨房是家里细菌较多的地方,潮湿的环境、没有完全密
实验室设备之全钢实验台如
全钢实验台是实验室家具中较为重要的家具之一,很多

推荐新闻


图片新闻

实验室药品柜的特性有哪些
实验室药品柜是实验室家具的重要组成部分之一,主要
小学科学实验中有哪些教学
计算机 计算器 一般 打孔器 打气筒 仪器车 显微镜
实验室各种仪器原理动图讲
1.紫外分光光谱UV分析原理:吸收紫外光能量,引起分
高中化学常见仪器及实验装
1、可加热仪器:2、计量仪器:(1)仪器A的名称:量
微生物操作主要设备和器具
今天盘点一下微生物操作主要设备和器具,别嫌我啰嗦
浅谈通风柜使用基本常识
 众所周知,通风柜功能中最主要的就是排气功能。在

专题文章

    CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭