Flink写入Redis集群 重写flink 您所在的位置:网站首页 url重写导入失败 Flink写入Redis集群 重写flink

Flink写入Redis集群 重写flink

2024-04-11 02:13| 来源: 网络整理| 查看: 265

起因:使用flink的时候难免和redis打交道,相信大家都使用过flink-connector-redis来处理,但是当我想要使用RedisSink写入集群时,发现居然不支持使用密码,于是有了这篇笔记。

 

事情的经过是这样的,我准备用Flink往Redis写入数据,我照常引入flink-connector-redis包

org.apache.flink flink-connector-redis_2.11 1.1.5

然后洋洋洒洒写下如下代码:

package org.cube.flink import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} import java.net.InetSocketAddress import java.util.HashSet /** * @Author : Lawrence * @Date : 2022/7/24 23:11 * @Description : Flink结果写入Redis集群 * @Version : 1.0.0 * @Modification_Record: * Version Date Remark * v1.0.0 2022/7/24 First Create */ object RedisClusterSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // source import org.apache.flink.api.scala._ val source = env.fromElements("1 hadoop","2 spark","3 flink", "4 hive", "5 redis", "6 hbase") // process val tupleValue = source.map(_.split(" ")).map(x => (x(0), x(1))) // redis config val builder = new FlinkJedisPoolConfig.Builder builder.setHost("cube01").setPort(7001).setPassword("123456") val redisConf: FlinkJedisPoolConfig = builder.build() // sink val redisSink = new RedisSink[(String, String)](redisConf, new MyRedisMapper()) tupleValue.addSink(redisSink) env.execute("RedisClusterSink") } } class MyRedisMapper extends RedisMapper[(String, String)] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.SET) } override def getKeyFromData(t: (String, String)): String = t._1 override def getValueFromData(t: (String, String)): String = t._2 }

然后兴高采烈地点击了运行,控制台却给了我一抹中国红,

其中最后一条是这样说的:

Caused by: redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 9842 192.168.20.132:7003

哦哦,是因为我的Redis是集群模式,

这并难不倒我,

我只需要把FlinkJedisPoolConfig改成FlinkJedisClusterConfig就万事大吉了。

// redis config val builder = new FlinkJedisClusterConfig.Builder val inetSocketAddress = new InetSocketAddress("cube01", 7001) val nodeSet = new HashSet[InetSocketAddress]() nodeSet.add(inetSocketAddress) builder.setNodes(nodeSet).setPassword("123456") val redisConf: FlinkJedisClusterConfig = builder.build()

可是,这个类并没有setPassword方法,事实上它连"password"这个属性都没有。

这并难不倒我。

先不设密码总行了吧?

燃鹅并不行,控制台又给了我一抹中国红,

他是这样说的:

Caused by: redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required.

呵呵,这可难不倒我,

我的本能反应是,应该到Maven仓库中找到新版的flink-connector-redis包。

燃鹅,当我搜索之后发现,这已经是最新版了。

这也难不倒我。 

FlinkJedisPoolConfig不是可以设置密码吗?

FlinkJedisClusterConfig不是可以访问集群吗?

如果我把他们两个的代码整合一下呢?是不是就好了。

于是我本能地把"FlinkJedisClusterConfig"改写成了"MyFlinkJedisClusterConfig"类,增加了password属性和对应的get,set方法。

package org.cube.flink; /** * @Author : Lawrence * @Date : 2022/7/25 21:14 * @Description : 包含了password的FlinkJedisClusterConfig * @Version : 1.0.0 * @Modification_Record: * Version Date Remark * v1.0.0 2022/7/25 First Create */ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import org.apache.flink.util.Preconditions; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Protocol; import java.net.InetSocketAddress; import java.util.HashSet; import java.util.Iterator; import java.util.Set; public class MyFlinkJedisClusterConfig extends FlinkJedisConfigBase { private static final long serialVersionUID = 1L; private final Set nodes; private final int maxRedirections; private int soTimeout; private String password; private MyFlinkJedisClusterConfig(Set nodes, int connectionTimeout, int soTimeout, int maxRedirections, int maxTotal, int maxIdle, int minIdle, String password) { super(connectionTimeout, maxTotal, maxIdle, minIdle); Preconditions.checkNotNull(nodes, "Node information should be presented"); Preconditions.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty"); this.nodes = new HashSet(nodes); this.soTimeout = soTimeout; this.maxRedirections = maxRedirections; this.password = password; } public Set getNodes() { Set ret = new HashSet(); Iterator var2 = this.nodes.iterator(); while(var2.hasNext()) { InetSocketAddress node = (InetSocketAddress)var2.next(); ret.add(new HostAndPort(node.getHostName(), node.getPort())); } return ret; } public int getMaxRedirections() { return this.maxRedirections; } public int getSoTimeout() { return this.soTimeout; } protected String getPassword() { return this.password; } public String toString() { return "JedisClusterConfig{nodes=" + this.nodes + ", timeout=" + this.connectionTimeout + ", maxRedirections=" + this.maxRedirections + ", maxTotal=" + this.maxTotal + ", maxIdle=" + this.maxIdle + ", minIdle=" + this.minIdle + '}'; } public static class Builder { private Set nodes; private int timeout = Protocol.DEFAULT_TIMEOUT; private int maxRedirections = 5; //新增属性 private int soTimeout = Protocol.DEFAULT_TIMEOUT; private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; //增加的属性 private String password; public Builder() { } public MyFlinkJedisClusterConfig.Builder setNodes(Set nodes) { this.nodes = nodes; return this; } public MyFlinkJedisClusterConfig.Builder setTimeout(int timeout) { this.timeout = timeout; return this; } public MyFlinkJedisClusterConfig.Builder setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; return this; } public MyFlinkJedisClusterConfig.Builder setMaxRedirections(int maxRedirections) { this.maxRedirections = maxRedirections; return this; } public MyFlinkJedisClusterConfig.Builder setMaxTotal(int maxTotal) { this.maxTotal = maxTotal; return this; } public MyFlinkJedisClusterConfig.Builder setMaxIdle(int maxIdle) { this.maxIdle = maxIdle; return this; } public MyFlinkJedisClusterConfig.Builder setMinIdle(int minIdle) { this.minIdle = minIdle; return this; } public MyFlinkJedisClusterConfig.Builder setPassword(String password) { this.password = password; return this; } public MyFlinkJedisClusterConfig build() { return new MyFlinkJedisClusterConfig(this.nodes, this.timeout, this.soTimeout, this.maxRedirections, this.maxTotal, this.maxIdle, this.minIdle, this.password); } } }

燃鹅,中国红却提醒我:

Caused by: java.lang.IllegalArgumentException: Jedis configuration not found

原来,Flink任务执行的时候会调用RedisSink中的open()方法:

public void open(Configuration parameters) throws Exception { this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); }

而这个方法调用的"RedisCommandsContainerBuilder.build"方法,所使用的参数,依然是旧的FlinkJedisClusterConfig类:

public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase)

所以,还得改写这两个类:

MyRedisSink:

package org.cube.flink; /** * @Author : Lawrence * @Date : 2022/7/25 23:52 * @Description : * @Version : 1.0.0 * @Modification_Record : * Version Date Remark * v1.0.0 2022/7/25 First Create */ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; public class MyRedisSink extends RichSinkFunction { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.streaming.connectors.redis.RedisSink.class); private String additionalKey; private RedisMapper redisSinkMapper; private RedisCommand redisCommand; private FlinkJedisConfigBase flinkJedisConfigBase; private RedisCommandsContainer redisCommandsContainer; public MyRedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper redisSinkMapper) { Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null"); Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null"); Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null"); this.flinkJedisConfigBase = flinkJedisConfigBase; this.redisSinkMapper = redisSinkMapper; RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription(); this.redisCommand = redisCommandDescription.getCommand(); this.additionalKey = redisCommandDescription.getAdditionalKey(); } @Override public void invoke(IN input) throws Exception { String key = this.redisSinkMapper.getKeyFromData(input); String value = this.redisSinkMapper.getValueFromData(input); switch(this.redisCommand) { case RPUSH: this.redisCommandsContainer.rpush(key, value); break; case LPUSH: this.redisCommandsContainer.lpush(key, value); break; case SADD: this.redisCommandsContainer.sadd(key, value); break; case SET: this.redisCommandsContainer.set(key, value); break; case PFADD: this.redisCommandsContainer.pfadd(key, value); break; case PUBLISH: this.redisCommandsContainer.publish(key, value); break; case ZADD: this.redisCommandsContainer.zadd(this.additionalKey, value, key); break; case HSET: this.redisCommandsContainer.hset(this.additionalKey, key, value); break; default: throw new IllegalArgumentException("Cannot process such data type: " + this.redisCommand); } } @Override public void open(Configuration parameters) throws Exception { this.redisCommandsContainer = MyRedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); } @Override public void close() throws IOException { if (this.redisCommandsContainer != null) { this.redisCommandsContainer.close(); } } }

MyRedisCommandsContainerBuilder:

package org.cube.flink; /** * @Author : Lawrence * @Date : 2022/7/25 21:30 * @Description : 包含了password的RedisCommandsContainerBuilder * @Version : 1.0.0 * @Modification_Record : * Version Date Remark * v1.0.0 2022/7/25 First Create */ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; import org.apache.flink.streaming.connectors.redis.common.container.RedisClusterContainer; import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; import org.apache.flink.streaming.connectors.redis.common.container.RedisContainer; import org.apache.flink.util.Preconditions; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisSentinelPool; public class MyRedisCommandsContainerBuilder { public MyRedisCommandsContainerBuilder() { } public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase) { if (flinkJedisConfigBase instanceof FlinkJedisPoolConfig) { FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig)flinkJedisConfigBase; return build(flinkJedisPoolConfig); } else if (flinkJedisConfigBase instanceof MyFlinkJedisClusterConfig) { MyFlinkJedisClusterConfig flinkJedisClusterConfig = (MyFlinkJedisClusterConfig)flinkJedisConfigBase; return build(flinkJedisClusterConfig); } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) { FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig)flinkJedisConfigBase; return build(flinkJedisSentinelConfig); } else { throw new IllegalArgumentException("Jedis configuration not found"); } } public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) { Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle()); JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), jedisPoolConfig.getPort() , jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), jedisPoolConfig.getDatabase()); return new RedisContainer(jedisPool); } public static RedisCommandsContainer build(MyFlinkJedisClusterConfig jedisClusterConfig) { Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle()); JedisCluster jedisCluster; if (null == jedisClusterConfig.getPassword()) { jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(), jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig); } else { jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout() , jedisClusterConfig.getSoTimeout(), jedisClusterConfig.getMaxRedirections() , jedisClusterConfig.getPassword(), genericObjectPoolConfig); } return new RedisClusterContainer(jedisCluster); } public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) { Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle()); JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName() , jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, jedisSentinelConfig.getConnectionTimeout() , jedisSentinelConfig.getSoTimeout(), jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase()); return new RedisContainer(jedisSentinelPool); } }

燃鹅,在重写"MyRedisCommandsContainerBuilder"类时,你会惊奇地发现,jedisCluster 也不支持密码。

你可千万别惯性思维去重新jedisCluster ,

因为这回可真的是版本问题了。

所以这依然难不倒我,

只需要把redis.clients包升级到2.9以上版本即可:

redis.clients jedis 2.9.1 org.apache.flink flink-connector-redis_2.11 1.1.5 redis.clients jedis

好了,到这里咱们终于大功告成了。

 

代码写完了,但是咱们却留下一个疑惑,

为什么这么简单的需求却没有jar包更新呢?

我只是想把Flink数据写到带密码的Redis集群里,这过分吗?

这并不过分,那这又是为啥呢?

我想可能是这样的:

首先,先想一个问题,在流计算中我们往Redis写的是什么数据?

通常是一些状态信息,中间结果。而Flink本身支持状态、缓存和广播机制,导致对Redis的需求下降了。

其次,大数据应用实际运行的环境通常是提交到内网的机器上进行的,大家知道大数据集群之间的主机是需要设置免验证登录的,单单给Redis设密码显得有一点点多余。

其三,Redis的密码机制据说是很弱鸡的,出于安全考虑,更多地是通过防火墙来限制端口,所以很多Redis集群处于管理方便并没有设置密码的。

 其四,出于人类懒惰的本性,发现RedisSink不支持密码后,最省事的方式,或许是放弃使用密码。

 

好了该写的写了,该想的也想了,差不多可以愉快地结束这一天了。

那么晚安了,咱们下期再肝。

PS:代码需要可以直接搬运,不过这里只是应急处理,更稳妥的做法是直接改写原来的类重新打包。而不是同时保留两个类似的类,这样容易造成混乱。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有