java实现Redis消息发布订阅 您所在的位置:网站首页 redis实现发布订阅 java实现Redis消息发布订阅

java实现Redis消息发布订阅

2023-09-19 22:43| 来源: 网络整理| 查看: 265

Redis发布订阅架构

Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者和Channel。 发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。

Redis发布订阅功能

(1)发送消息 Redis采用PUBLISH命令发送消息,其返回值为接收到该消息的订阅者的数量。 这里写图片描述 (2)订阅某个频道 Redis采用SUBSCRIBE命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。

上代码:

首先引入相关依赖

redis.clients jedis 2.9.0

定义消息发布者类-Publisher

package com.cicc.config.management.subsciber; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; public class Publisher extends Thread{ private final JedisPool jedisPool; public Publisher(JedisPool jedisPool) { this.jedisPool = jedisPool; } @Override public void run() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); Jedis jedis = jedisPool.getResource(); //连接池中取出一个连接 while (true) { try { jedis.publish("mychannel", reader.readLine()); //从 mychannel 的频道上推送消息 } catch (IOException e) { e.printStackTrace(); } } } }

定义消息订阅类-SubThread

package com.cicc.config.management.subsciber; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class SubThread extends Thread { private final JedisPool jedisPool; private final Subscriber subscriber = new Subscriber(); private final String channel = "mychannel"; public SubThread(JedisPool jedisPool) { super("SubThread"); this.jedisPool = jedisPool; } @Override public void run() { System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel)); Jedis jedis = null; try { jedis = jedisPool.getResource(); //取出一个连接 jedis.subscribe(subscriber, channel); //通过subscribe 的api去订阅,入参是订阅者和频道名 } catch (Exception e) { System.out.println(String.format("subsrcibe channel error, %s", e)); } finally { if (jedis != null) { jedis.close(); } } } }

定义订阅消息处理类-Subscriber 继承了JedisPubSub,其中onMessage可以根据业务需求来重写

package com.cicc.config.management.subsciber; import redis.clients.jedis.JedisPubSub; public class Subscriber extends JedisPubSub { public Subscriber(){} @Override public void onMessage(String channel, String message) { //收到消息会调用 System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message)); } @Override public void onSubscribe(String channel, int subscribedChannels) { //订阅了频道会调用 System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d", channel, subscribedChannels)); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { //取消订阅 会调用 System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d", channel, subscribedChannels)); } }

编写测试类:PSTest

package com.cicc.config.management.test; import com.cicc.config.management.subsciber.Publisher; import com.cicc.config.management.subsciber.SubThread; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class PSTest{ public static void main( String[] args ) { // 连接本地redis服务端 JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379); Publisher publisher = new Publisher(jedisPool); //发布者 publisher.start(); SubThread subThread = new SubThread(jedisPool); //订阅者 subThread.start(); } }


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有