Flink 源码之数据写入HBase 您所在的位置:网站首页 hbase怎么写入数据 Flink 源码之数据写入HBase

Flink 源码之数据写入HBase

2023-04-01 05:53| 来源: 网络整理| 查看: 265

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

前言

近期有同事询问Flink写入数据到HBase的方法。借着这个机会分析下Flink 写入HBase相关部分的源代码。

HBaseSinkFunction

HBaseSinkFunction是Flink写入数据到HBase的官方功能实现。

当然,作为一个数据落地端,HBaseSinkFunction毫无疑问需要实现SinkFunction接口。SinkFunction的使用参见:Flink 源码之两阶段提交。

作为一个有状态的数据落地端,HBaseSinkFunction继承自RichSinkFunction。我们从它的open方法开始,分析下Flink如何做到:

如何初始化和关闭HBase连接 如何高效率将数据插入HBase 如何处理checkpoint问题 open方法

我们查看下它的open方法,如下所示:

@Override public void open(Configuration parameters) throws Exception { LOG.info("start open ..."); // 获取HBase的配置 org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration(); try { // 初始化mutationConverter // mutationConverter负责转换Flink元素为HBase Mutation this.mutationConverter.open(); // 记录尚未flush到HBase的数据条数 this.numPendingRequests = new AtomicLong(0); // 创建HBase连接 if (null == connection) { this.connection = ConnectionFactory.createConnection(config); } // create a parameter instance, set the table name and custom listener reference. // 创建BufferedMutator构建参数 // 包含写入HBase的目标表名称和ExceptionListener BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(hTableName)).listener(this); // 设置BufferedMutator的写缓存字节数 if (bufferFlushMaxSizeInBytes > 0) { params.writeBufferSize(bufferFlushMaxSizeInBytes); } // 根据前面的参数,创建一个BufferedMutator this.mutator = connection.getBufferedMutator(params); // 如果flush间隔大于0ms,且最大待flush数据条数不等于1时 // 创建一个定时flush BufferedMutator的任务 if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) { this.executor = Executors.newScheduledThreadPool( 1, new ExecutorThreadFactory("hbase-upsert-sink-flusher")); this.scheduledFuture = this.executor.scheduleWithFixedDelay( () -> { if (closed) { return; } try { flush(); } catch (Exception e) { // fail the sink and skip the rest of the items // if the failure handler decides to throw an exception // 保存发生的异常,等到检查的时候抛出 failureThrowable.compareAndSet(null, e); } }, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS); } } catch (TableNotFoundException tnfe) { LOG.error("The table " + hTableName + " not found ", tnfe); throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe); } catch (IOException ioe) { LOG.error("Exception while creating connection to HBase.", ioe); throw new RuntimeException("Cannot create connection to HBase.", ioe); } LOG.info("end open."); }

由以上分析可知,open方法创建了HBase的BufferedMutator,用于缓存待插入的数据,以批量方式将这些数据插入。相比依次插入单条数据而言,极大的提升了操作HBase的效率。

获取HBase配置的过程

这部分逻辑位于prepareRuntimeConfiguration方法。

获取HBase配置文件的逻辑如下列表所示。按照从上到下的顺序查找hbase-site.xml文件,无论找到与否,都会尝试整个列表。列表后者找到的配置项会覆盖前者。

首先从classpath中查找hbase-site.xml和hbase-default.xml文件。 从HBASE_HOME环境变量的conf目录查找hbase-site.xml和hbase-default.xml文件。 从HBASE_CONF_DIR环境变量查找hbase-site.xml和hbase-default.xml文件。

prepareRuntimeConfiguration方法代码如下所示:

private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException { // create default configuration from current runtime env (`hbase-site.xml` in classpath) // first, // and overwrite configuration using serialized configuration from client-side env // (`hbase-site.xml` in classpath). // user params from client-side have the highest priority // 获取Configuration org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration( serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration()); // do validation: check key option(s) in final runtime configuration // 检查获取到的HBase配置 // 如果配置中不包含hbase.zookeeper.quorum配置项(HBase必须),抛出异常 if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) { LOG.error( "Can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM); throw new IOException( "Check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!"); } return runtimeConfig; }

获取配置信息的具体逻辑位于HBaseConfigurationUtil.getHBaseConfiguration()。我们继续跟踪。

public static Configuration getHBaseConfiguration() { // Instantiate an HBaseConfiguration to load the hbase-default.xml and hbase-site.xml from // the classpath. // 默认方式,从classpath读取hbase-default.xml和hbase-site.xml Configuration result = HBaseConfiguration.create(); boolean foundHBaseConfiguration = false; // We need to load both hbase-default.xml and hbase-site.xml to the hbase configuration // The properties of a newly added resource will override the ones in previous resources, so // a configuration // file with higher priority should be added later. // Approach 1: HBASE_HOME environment variables String possibleHBaseConfPath = null; // 读取HBASE_HOME环境变量 final String hbaseHome = System.getenv("HBASE_HOME"); // 如果该环境变量存在,尝试从${HBASE_HOME}/conf读取配置 if (hbaseHome != null) { LOG.debug("Searching HBase configuration files in HBASE_HOME: {}", hbaseHome); possibleHBaseConfPath = hbaseHome + "/conf"; } if (possibleHBaseConfPath != null) { foundHBaseConfiguration = addHBaseConfIfFound(result, possibleHBaseConfPath); } // Approach 2: HBASE_CONF_DIR environment variable // 读取HBASE_CONF_DIR环境变量,如果该变量存在,尝试从该路径下读取配置 String hbaseConfDir = System.getenv("HBASE_CONF_DIR"); if (hbaseConfDir != null) { LOG.debug("Searching HBase configuration files in HBASE_CONF_DIR: {}", hbaseConfDir); foundHBaseConfiguration = addHBaseConfIfFound(result, hbaseConfDir) || foundHBaseConfiguration; } if (!foundHBaseConfiguration) { LOG.warn( "Could not find HBase configuration via any of the supported methods " + "(Flink configuration, environment variables)."); } return result; } Invoke方法

HBaseSinkFunction每次接收到一个数据,都会调用invoke方法。该方法负责将数据转换为HBase的mutation,放入到BufferedMutator中。当BufferedMutator累积的mutation数超过bufferFlushMaxMutations时候,强制flush到HBase中。

@Override public void invoke(T value, Context context) throws Exception { // 检查是否有异常发生 checkErrorAndRethrow(); // 使用mutationConverter转换数据为HBase的mutation // mutationConverter在后面章节介绍 // 把这个mutation加入到BufferedMutator中 mutator.mutate(mutationConverter.convertToMutation(value)); // flush when the buffer number of mutations greater than the configured max size. // numPendingRequests记录了未发送的写入请求个数 // 如果numPendingRequests大于bufferFlushMaxMutations,强制flush到HBase if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) { flush(); } }

接下来分析下强制把数据写入HBase的flush方法。

private void flush() throws IOException { // BufferedMutator is thread-safe // 调用BufferedMutator的flush方法 mutator.flush(); // 重置pendingRequest计数器为0 numPendingRequests.set(0); // 检查是否有异常发生 checkErrorAndRethrow(); }

到此为止,HBaseSinkFunction中有关RichSinkFunction的方法已经分析完毕。除了一个close方法,包含关闭BufferedMutator,Connection和周期性flush HBase的定时任务,不再详细分析。

checkpoint相关方法实现

HBaseSinkFunction在checkpoint的时候,如果有尚未写入HBase的数据,必须flush到HBase中。以防止从checkpoint恢复的时候数据丢失。

@Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // 如果pendingRequest不为0,即有缓存的数据 // 一直执行flush操作,直到数据全存入HBase为止 while (numPendingRequests.get() != 0) { flush(); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { // 从checkpoint中恢复,什么都不用做 // nothing to do. } MutationConverter

MutationConverter用于将Flink的数据转换为HBase的Mutation。

该接口有两个方法:

open:初始化converter。 convertToMutation:将record转换为Mutation。 @Internal public interface HBaseMutationConverter extends Serializable { /** Initialization method for the function. It is called once before conversion method. */ void open(); /** * Converts the input record into HBase {@link Mutation}. A mutation can be a {@link Put} or * {@link Delete}. */ Mutation convertToMutation(T record); }

HBaseMutationConverter有两个实现类:

LegacyMutationConverter:转换Tuple2类型的数据。如果boolean为true,则创建Put操作,如果为false创建Delete操作。 RowDataToMutationConverter:转换RowData类型数据。通过RowKind来控制创建出Put操作还是Delete操作。

除此之外,这两个类中大量的逻辑为从HBaseTableSchema获取列族,qualifier和charset等信息,然后从数据解析出每个cf:qualifier对应的value,构造出Put或Delete对象。此处不再一一分析。

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有