java实现Redis消息发布订阅 | 您所在的位置:网站首页 › redis实现发布订阅 › java实现Redis消息发布订阅 |
Redis发布订阅架构
Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者和Channel。 发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。 Redis发布订阅功能(1)发送消息 Redis采用PUBLISH命令发送消息,其返回值为接收到该消息的订阅者的数量。 这里写图片描述 (2)订阅某个频道 Redis采用SUBSCRIBE命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。 上代码:首先引入相关依赖 redis.clients jedis 2.9.0定义消息发布者类-Publisher package com.cicc.config.management.subsciber; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; public class Publisher extends Thread{ private final JedisPool jedisPool; public Publisher(JedisPool jedisPool) { this.jedisPool = jedisPool; } @Override public void run() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); Jedis jedis = jedisPool.getResource(); //连接池中取出一个连接 while (true) { try { jedis.publish("mychannel", reader.readLine()); //从 mychannel 的频道上推送消息 } catch (IOException e) { e.printStackTrace(); } } } }定义消息订阅类-SubThread package com.cicc.config.management.subsciber; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class SubThread extends Thread { private final JedisPool jedisPool; private final Subscriber subscriber = new Subscriber(); private final String channel = "mychannel"; public SubThread(JedisPool jedisPool) { super("SubThread"); this.jedisPool = jedisPool; } @Override public void run() { System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel)); Jedis jedis = null; try { jedis = jedisPool.getResource(); //取出一个连接 jedis.subscribe(subscriber, channel); //通过subscribe 的api去订阅,入参是订阅者和频道名 } catch (Exception e) { System.out.println(String.format("subsrcibe channel error, %s", e)); } finally { if (jedis != null) { jedis.close(); } } } }定义订阅消息处理类-Subscriber 继承了JedisPubSub,其中onMessage可以根据业务需求来重写 package com.cicc.config.management.subsciber; import redis.clients.jedis.JedisPubSub; public class Subscriber extends JedisPubSub { public Subscriber(){} @Override public void onMessage(String channel, String message) { //收到消息会调用 System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message)); } @Override public void onSubscribe(String channel, int subscribedChannels) { //订阅了频道会调用 System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d", channel, subscribedChannels)); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { //取消订阅 会调用 System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d", channel, subscribedChannels)); } }编写测试类:PSTest package com.cicc.config.management.test; import com.cicc.config.management.subsciber.Publisher; import com.cicc.config.management.subsciber.SubThread; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class PSTest{ public static void main( String[] args ) { // 连接本地redis服务端 JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379); Publisher publisher = new Publisher(jedisPool); //发布者 publisher.start(); SubThread subThread = new SubThread(jedisPool); //订阅者 subThread.start(); } } |
CopyRight 2018-2019 实验室设备网 版权所有 |