Flink CDC + Hudi + Hive + Presto 构建实时数据湖最佳实践 您所在的位置:网站首页 使用java操作hive的过程 Flink CDC + Hudi + Hive + Presto 构建实时数据湖最佳实践

Flink CDC + Hudi + Hive + Presto 构建实时数据湖最佳实践

2023-06-15 06:04| 来源: 网络整理| 查看: 265

原文:Flink CDC + Hudi + Hive + Presto 构建实时数据湖最佳实践

摘要:本文作者罗龙文,分享了如何通过 Flink CDC、Hudi、Hive、Presto 等构建数据湖。主要内容包括:

测试过程环境版本说明

集群服务器基础环境

Hudi 编译环境配置

Flink 环境配置

启动 Flink Yarn Session 服务

MySQL binlog 开启配置

Flink CDC sink Hudi 测试代码过程

Tips:点击「阅读原文」预约 FFA 2021~

一、测试过程环境版本说明

Flink 1.13.1

Scala 2.11

CDH 6.2.0

Hadoop 3.0.0

Hive 2.1.1

Hudi 0.10(master)

PrestoDB 0.256

Mysql 5.7

二、集群服务器基础环境

2.1 Maven 和 JDK 环境版本

f50669cd9d92458d5810f1e8ad461845.png

2.2 Hadoop 集群环境版本

7a9d14ba97a210da510c4c5463dc8529.png

2.3 HADOOP环境变量配置 export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop export HADOOP_CALSSPATH=`$HADOOP_HOME/bin/hadoop classpath`

三、Hudi 编译环境配置

3.1 Maven Home settings.xml 配置修改

说明:指定 aliyun maven 地址 (支持 CDH cloudera 依赖) mirror 库

4ef731e4f0344d329c5f8e8d61afe14d.png

alimaven central,!cloudera aliyun maven http://maven.aliyun.com/nexus/content/groups/public/ 3.2 下载 Hudi 源码包 git clone https://github.com/apache/hudi.git

365f7fe61cd87302a610d52e099b066d.png

Hudi 社区建议版本适配

Hudi0.9 适配 Flink 1.12.2

Hudi0.10(master) 适配 Flink 1.13.X (说明 master 分支上版本还未 release)

3.3 Hudi 客户端命令行

a4eb4a26c9cdaf0adb65bc13e5219246.png

3.4 修改 Hudi 集成 Flink 和 Hive 编译依赖版本配置

hudi-master/packaging/hudi-flink-bundle

8c31cec885a112fe20f56b7c6d19ce25.png

pom.xml 文件 (笔者环境 CDH 6.2.0,Hive 2.1.1)

a8dd047baa6a4e3c01c3c5f9543b24bc.png

flink-bundle-shade-hive2 2.1.1-cdh6.2.0 compile ${hive.groupid} hive-service-rpc ${hive.version} ${flink.bundle.hive.scope} 3.5 编译 Hudi 指定 Hadoop 和 Hive 版本信息 mvn clean install -DskipTests -Drat.skip=true -Dscala-2.11 -Dhadoop.version=3.0.0 -Pflink-bundle-shade-hive2

(可加 –e –X 参数查看编译 ERROR 异常和 DEBUG 信息)

说明:默认 Scala 2.11、默认不包含 Hive 依赖

febeb70290ef6180c96b0b5ccf881136.png

首次编译耗时较长 笔者首次编译大概花费 50min+ (也和服务器网络有关)

后续编译会快一些 大约 15min 左右

3.6 Hudi 编译异常

807eceb011a5ba93386a559c820a5759.png

8680bebe01e7a17d6e2a4134cca95ec2.png

修改 Hudi master pom.xml 增加 CDH repository 地址

a1e08bcdae7b6020d4dd769a3d9513dc.png

3.7 Hudi 重新编译

133dcc8332dda86debf5b2019cc30b6a.png

3.8 Hudi 编译结果说明

hudi-master/packaging/hudi-flink-bundle/target

f5dd34b59f73004c106af8de8ab453c8.png

hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

说明:hudi-flink-bundle jar 是 Flink 用来写入和读取数据

hudi-master/packaging/hudi-hadoop-mr-bundle/target

2a3d8b32adc8745005f2e0f0817e3c1f.png

hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar

说明:hudi-mr-bundle jar 是 Hive 需要用来读 Hudi 数据

四、Flink 环境配置

版本说明:Flink 1.13.1,Scala 2.11 版本

4.1 FLINK_HOME 下 sql-client-defaults.yaml 配置

c9dd71611ad84a65c1e35c5ab7c819b2.png

4.2 flink-conf.yaml 配置修改

0fc80bd786a1a16b1e9c194eba916b6c.png

0d8fef53db78721831815deda001aa02.png

# state.backend: filesystem state.backend: rocksdb # 开启增量checkpoint state.backend.incremental: true # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints state.checkpoints.dir: hdfs://nameservice/flink/flink-checkpoints classloader.check-leaked-classloader: false classloader.resolve-order: parent-first 4.3 FLINK_HOME lib下添加依赖

3dd6585a989af3a0eb13828047d816a5.png

flink-sql-connector-mysql-cdc-1.4.0.jar flink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar.BAK – oracle cdc 依赖 flink-format-changelog-json-1.4.0.jar flink-sql-connector-kafka_2.11-1.13.1.jar --- Hadoop home lib下copy过来 hadoop-mapreduce-client-common-3.0.0-cdh6.2.0.jar hadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jar hadoop-mapreduce-client-jobclient-3.0.0-cdh6.2.0.jar --- hudi编译jar copy过来 hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

说明:目前 oracle cdc jar 和 mysql cdc jar 一起在 lib 下发现有冲突异常

五、启动 Flink Yarn Session 服务

5.1 FLINK_HOME shell 命令 $FLINK_HOME/bin/yarn-session.sh -s 2-jm 2048-tm 2048-nm ys-hudi01 -d

d8402c644ec8fd084b3325e8b377a4e8.png

5.2 Yarn Web UI

01e79aaaddca4c9e4afd0f6f6b859b90.png

5.3 Flinksql Client 启动命令 $FLINK_HOME/bin/sql-client.sh embedded -j ./lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar shell

说明:-j 指定 hudi-flink 依赖 jar

cb1938e038b3952892280df4d514949d.png

Show table / show catalogs

ad522a2fdca223e83555251ebffaca20.png

六、MySQL binlog 开启配置

6.1 创建 binlog 日志存储路径 mkdir logs 6.2 修改目录属主和 group chown -R mysql:mysql /mysqldata/logs 6.3 修改 mysql 配置信息 vim /etc/my.cnf server-id=2 log-bin= /mysqldata/logs/mysql-bin binlog_format=row expire_logs_days=15 binlog_row_image=full 6.4 修改完,重启 mysql server service mysqld restart 6.5 客户端查看 binlog 日志情况

show master logs;

3adb44e282e9076d5036791b44a0778b.png

Mysql 版本:5.7.30

8af2ff3df25958b6792e42b081d4fb3f.png

6.6 创建 mysql sources 表 DDL create table users_cdc(id bigint auto_increment primary key,name varchar(20) null,birthday timestamp default CURRENT_TIMESTAMP notnull,ts timestamp default CURRENT_TIMESTAMP notnull );

00e92a5839bcf4d98f8fc79069fa69f3.png

七、Flink CDC sink Hudi 测试代码过程

7.1 Flink sql cdc DDL 语句:(具体参数说明可参考 Flink 官网)

CREATE TABLE mysql_users (id BIGINT PRIMARY KEY NOT ENFORCED ,name STRING,birthday TIMESTAMP(3),ts TIMESTAMP(3) ) WITH ( 'connector'= 'mysql-cdc', 'hostname'= '127.0.0.1', 'port'= '3306', 'username'= '', 'password'=’’, 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name'= 'luo', 'table-name'= 'users_cdc' );

a64345aa5baea7a5a4eb0ac7b128ebc8.png

7.2 查询 mysql cdc 表 Flink SQL> select * from mysql_users;

1c2f16b686ada3e719ac68808db5eb01.png

由于目前 MySQL users_cdc 表是空,所以 flinksql 查询没有数据 只有表结构;

0c76517bbb51bbd241a68723e334c5fc.png

Flink web UI:

33b76e03f2199da51414c536177b106a.png

4db72b467b5df0667c6341e4b7bbf0b3.png

7.3 创建一个临时视图,增加分区列方便后续同步 Hive 分区表 Flink SQL> create view mycdc_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as partition FROM mysql_users;

说明:partition 关键字需要 `` 引起来

e1e28d71fabc80c8c8b788a41d3cd219.png

查询视图数据也是空结构,但增加了分区字段:

Flink SQL> select * from mycdc_v;

f62aeeb9c03557de87c2689459fe292f.png

e0e8c62faa42a9b7834f549e3e63c1bb.png

Flink web UI:

50f8ed0bb9cd404858b088bdf26e42fc.png

7.4 设置 checkpoint 间隔时间,存储路径已在 flink-conf 配置设置全局路径

建议:测试环境 可设置秒级别(不能太小),生产环境可设置分钟级别。

Flink SQL> set execution.checkpointing.interval=30sec;

dd7bfcb1222f793862ae49356ac8b8a3.png

7.5 Flinksql 创建 cdc sink hudi 文件,并自动同步 Hive 分区表 DDL 语句 CREATE TABLE mysqlcdc_sync_hive01( id bigint , name string, birthday TIMESTAMP(3), ts TIMESTAMP(3), `partition` VARCHAR(20), primary key(id) not enforced --必须指定uuid 主键 ) PARTITIONED BY (`partition`) with( 'connector'='hudi', 'path'= 'hdfs://nameservice /luo/hudi/mysqlcdc_sync_hive01' , 'hoodie.datasource.write.recordkey.field'= 'id'-- 主键 , 'write.precombine.field'= 'ts'-- 自动precombine的字段 , 'write.tasks'= '1' , 'compaction.tasks'= '1' , 'write.rate.limit'= '2000'-- 限速 , 'table.type'= 'MERGE_ON_READ'-- 默认COPY_ON_WRITE,可选MERGE_ON_READ , 'compaction.async.enabled'= 'true'-- 是否开启异步压缩 , 'compaction.trigger.strategy'= 'num_commits'-- 按次数压缩 , 'compaction.delta_commits'= '1'-- 默认为5 , 'changelog.enabled'= 'true'-- 开启changelog变更 , 'read.streaming.enabled'= 'true'-- 开启流读 , 'read.streaming.check-interval'= '3'-- 检查间隔,默认60s , 'hive_sync.enable'= 'true'-- 开启自动同步hive , 'hive_sync.mode'= 'hms'-- 自动同步hive模式,默认jdbc模式 , 'hive_sync.metastore.uris'= 'thrift://hadoop:9083'-- hive metastore地址 -- , 'hive_sync.jdbc_url'= 'jdbc:hive2://hadoop:10000'-- hiveServer地址 , 'hive_sync.table'= 'mysqlcdc_sync_hive01'-- hive 新建表名 , 'hive_sync.db'= 'luo'-- hive 新建数据库名 , 'hive_sync.username'= ''-- HMS 用户名 , 'hive_sync.password'= ''-- HMS 密码 , 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型 );

说明:Hudi 目前支持 MOR 和 COW 两种模式

Copy on Write:使用列式存储来存储数据 (例如:parquet),通过在写入期间执行同步合并来简单地更新和重现文件

Merge on Read:使用列式存储 (parquet) + 行式文件 (arvo) 组合存储数据。更新记录到增量文件中,然后进行同步或异步压缩来生成新版本的列式文件。

COW:Copy on Write (写时复制),快照查询 + 增量查询

MOR:Merge on Read (读时合并),快照查询 + 增量查询 + 读取优化查询 (近实时)

使用场景上:

COW 适用写少读多的场景 ,MOR 适用写多读少的场景;

MOR 适合 CDC 场景,更新延迟要求较低,COW 目前不支持 changelog mode 不适合处理 cdc 场景;

ed227aa8be47c5a5de197f46fedc7aef.png

b46a61c7110394ae10644199409d6065.png

Flink web UI

68480c102882246e9ca8416e67cafaa0.png

7.6 Flink sql mysql cdc 数据写入 Hudi 文件数据 Flink SQL> insert into mysqlcdc_sync_hive01 select id,name,birthday,ts,`partition` from mycdc_v;

2d2dc0c68fdc1f1d7c3f25b9c3226199.png

Flink web UI DAG 图:

0ea7808c0095b144b60fc38c22ad6c01.png

7.7 HDFS 上 Hudi 文件目录情况

20862e06b74bc714c9df3e1eac124bfd.png

2fad14a98a35391ac0c5624e4740c5c2.png

说明:目前还没写入测试数据,Hudi 目录只生成一些状态标记文件,还未生成分区目录以及 .log 和 .parquet 数据文件,具体含义可见 Hudi 官方文档。

7.8 Mysql 数据源写入测试数据 insert into users_cdc (name) values ('cdc01');

bfce7e0a262e7dcdc14811dd8d1fffdb.png

7.9 Flinksql 查询 mysql cdc insert 数据 Flink SQL> set execution.result-mode=tableau;[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.[INFO] Session property has been set.Flink SQL> select * from mysql_users; -- 查询到一条insert数据

52d7df4c5a0d1c475cbd6fe7cb556604.png

7.10 Flink web UI 页面可以看到 DAG 各个环节产生一条测试数据

50e82fd1ac0467792019c6f9d6875a5d.png

7.11 Flinksql 查询 sink 的 Hudi 表数据 Flink SQL> select * from mysqlcdc_sync_hive01; --已查询到一条insert数据

d5f8bb4efda415bf069341a0adece7c9.png

7.12 Hdfs 上 Hudi 文件目录变化情况

ce065c585e5b1d73c150c074e6a60c16.png

7.13 Hive 分区表和数据自动同步情况

bf14e67d27305261da399ad57df26266.png

7.14 查看自动创建 Hive 表结构 hive> show create table mysqlcdc_sync_hive01_ro;

3deb59dfd43897137f0eb264221dc87c.png

hive> show create table mysqlcdc_sync_hive01_rt;

8fd0d5a271199ca719fdcc4987dd1b3c.png

7.15 查看自动生成的表分区信息 hive> show partitions mysqlcdc_sync_hive01_ro; hive> show partitions mysqlcdc_sync_hive01_rt;

94739b10b9ac0eb9cc9cff16169fb05f.png

说明:已自动生产 Hudi MOR 模式的

mysqlcdc_sync_hive01_ro mysqlcdc_sync_hive01_rt

ro 表和 rt 表区别:

ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 表,只暴露压缩后的 parquet。其查询方式和 COW 表类似。设置完 hiveInputFormat 之后和普通的 Hive 表一样查询即可;

rt 表示增量视图,主要针对增量查询的 rt 表;

ro 表只能查 parquet 文件数据;rt 表 parquet 文件数据和 log 文件数据都可查。

7.16 Hive 访问 Hudi 数据

说明:需要引入 hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar

引入 Hudi 依赖 jar 方式:

引入到 $HIVE_HOME/lib 下;

引入到 $HIVE_HOME/auxlib 自定义第三方依赖 修改 hive-site.xml 配置文件;

Hive shell 命令行引入 Session 级别有效;

其中(1)和(3)配置完后需要重启 hive-server 服务;

查询 Hive 分区表数据:

hive> select * from mysqlcdc_sync_hive01_ro; --已查询到mysq insert的一条数据

909caf17f0de7b9c79e0b48de1e00494.png

hive> select * from mysqlcdc_sync_hive01_rt; --已查询到mysq insert的一条数据

f84e07694b8245ff41f44423d4bb4ae8.png

Hive 条件查询:

hive> select name,ts from mysqlcdc_sync_hive01_ro where partition='20211109';

35cdff1118c86c651259496a21857538.png

Hive ro 表 count 查询

hive> select count(1) from mysqlcdc_sync_hive01_ro;

ffcbaf5b731d8acd73cd8d6e35135610.png

Hive Count 异常解决:

引入 hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar 依赖

hive> add jar hdfs://nameservice /luo/hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar;hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

89a11720d2338216efd12f155a3bbf47.png

hive> select count(1) from mysqlcdc_sync_hive01_ro; --可正常count

23f8d885ebd9856a488fea4a3a257a6d.png

Hive rt 表 count 查询

hive> select count(1) from mysqlcdc_sync_hive01_rt;

534045c91c768c51c367cd7b40202bf6.png

说明:rt 表 count 还是异常,和 Hudi 社区人员沟通 Hudi master 目前还没 release 这块存在 bug 正在修复中

具体见:https://issues.apache.org/jira/browse/HUDI-2649

7.17 Mysql 数据源写入多条测试数据 insert into users_cdc (name) values ('cdc02'); insert into users_cdc (name) values ('cdc03'); insert into users_cdc (name) values ('cdc04'); insert into users_cdc (name) values ('cdc05'); insert into users_cdc (name) values ('cdc06');

1dfbb6caf58d59aafd7a2915e891b0ab.png

Flink web UI DAG 中数据链路情况:

1bf7dc54256f16260ae34e69917c30fe.png

7.18 Flinksql 中新写入数据查询情况

932acb3a6c2502b42f31e11766568255.png

Yarn web UI application_1626256835287_40351[1] 资源使用情况

8e9e4384a6c7b37a15cca10185e3934e.png

Hdfs 上 Hudi 文件目录变化情况

1bc4ef1d0e3cd05b6b3f75381c49bb6a.png

Hudi 状态文件说明:

requested:表示一个动作已被安排,但尚未启动

inflight:表示当前正在执行操作

completed:表示在时间线上完成了操作

Flink jobmanager log sync hive过程详细日志

137ac07bc27ac2e72f93238403f2ed0c.png

683b3c124a868e6797b5c812d9f606df.png

34307a95cd50a13b06a62e5e3ac979ec.png

7.19 Mysql 数据源更新数据 update users_cdc set name = 'cdc05-bj'where id = 5;

8728118c583cf916965060e95512ab3a.png

7.20 Flinksql 查询 cdc update 数据产生两条 binlog 数据

810c0744a0757362555f01422eb60606.png

说明:Flinksql 查询最终只有一条 +I 有效数据,且数据已更新

Flink web UI DAG 接受到两条 binlog 数据,但最终 compact 和 sink 只有一条有效数据

858f4c68e83a1f069e826e268e5384e4.png

7.21 MySQL 数据源 delete 一条数据 deletefrom users_cdc where id = 3;

6ce177c0ef8c81d5724b6fee40a76cba.png

Flink Web UI job DAG 中捕获一条新数据:

124893e1afb5d2392fa7fff6e01b98ae.png

Flinksql changlog delete 数据变化查询

2784029ad865d8550d9ab23604fb7308.png

HDFS 上 Hudi 数据文件生成情况

cdd809cc5440a29469966be764c53f02.png

921b020b021c76ccf457982bb74ed644.png

Hudi 文件类型说明:

commits:表示将一批数据原子性写入表中;

cleans:清除表中不在需要的旧版本文件的后台活动;

delta_commit:增量提交是指将一批数据原子性写入 MergeOnRead 类型的表中,其中部分或者所有数据可以写入增量日志中;

compaction:协调 Hudi 中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式。在内部,压缩的表现为时间轴上的特殊提交;

rollback:表示提交操作不成功且已经回滚,会删除在写入过程中产生的数据。

4be8d239f4c6c1a4490e85e40c9232d9.png

说明:Hudi 分区文件以及 .log 和 .parquet 文件都已生成

两种文件区别:Hudi 会在 DFS 分布式文件系统上的 basepath 基本路径下组织成目录结构。每张对应的表都会成多个分区,这些分区是包含该分区的数据文件的文件夹,与 Hive 的目录结构非常相似。在每个分区内,文件被组织成文件组,文件 id 为唯一标识。每个文件组包含多个切片,其中每个切片包含在某个提交 / 压缩即时时间生成的基本列文件 (parquet 文件),以及自生成基本文件以来对基本文件的插入 / 更新的一组日志文件 (*.log)。Hudi 采用 MVCC 设计,其中压缩操作会将日志和基本文件合并成新的文件片,清理操作会将未使用/较旧的文件片删除来回收 DFS 上的空间。

Flink 任务 checkpoint 情况:

设置 30s 一次

5617960d9a652f224a83fd335eb12e60.png

22939db4cddf5ecf954169f792b2cdb5.png

7.22 Hive shell 查询数据 update 和 delete 变化情况 hive> select * from mysqlcdc_sync_hive01_ro;

f9e45ad96ce17a11cc11cc1f841911ff.png

hive> select * from mysqlcdc_sync_hive01_rt;

80c18a53f0bfba88d21e3190b75bf2d1.png

7.23 Hudi Client 端操作 Hudi 表

进入 Hudi 客户端命令行

hudi-master/hudi-cli/hudi-cli.sh

连接 Hudi 表,查看表信息

hudi->connect --path hdfs://nameservice1/tmp/luo/hudi/mysqlcdc_sync_hive01

f915bd90418f1be2bd487c2cad65cb55.png

查看 Hudi commit 信息

hudi:mysqlcdc_sync_hive01->commits show --sortBy "CommitTime"

d8b8942927a8327ec5704cd4d4eb2968.png

查看 Hudi compactions 计划

hudi:mysqlcdc_sync_hive01->compactions show all

50a5bdb7d257da273853f23a2d2c0925.png

7.24 PrestoDB 查询 Hive 表 Hudi 数据

版本说明:PrestoDB 0.256 DBeaver7.0.4

PrestoDB 集群配置和 Hive 集成参考 PrestoDB 官网

presto-server-***/etc/catalog/hive.properties 配置 hive catalog

可通过 presto-cli 连接 hive metastore 开启查询,presto-cli 的设置参考 presto官方配置;

DBeaver 客户端查询 Hive ro 表数据:

ee4bc831c1e255857e31962821728951.png

Hive ro 表 count 正常:

9926d4b9c33daf955882a6167670d2b9.png

查询 Hive rt 表数据查询异常:

10d69dcd7552932750105b5c6eb0aaeb.png

Hive rt 表 count 异常:

7760028b1476ae068a0344b2e6220082.png

Presto Web UI:

3f2f330941b7680353baab0df59d1cf7.png

939dcb7ae7171b824c6e20a8d4a44cce.png

Flink Forward Asia 2021 

2022 年 1 月 8-9 日,FFA 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。

大会官网:

https://flink-forward.org.cn

大会线上观看地址 (记得预约哦):

https://developer.aliyun.com/special/ffa2021/live



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有