实践数据湖iceberg:flink + iceberg CDC场景(版本问题,测试失败) 您所在的位置:网站首页 precode获取失败 实践数据湖iceberg:flink + iceberg CDC场景(版本问题,测试失败)

实践数据湖iceberg:flink + iceberg CDC场景(版本问题,测试失败)

2023-04-04 03:38| 来源: 网络整理| 查看: 265

概要测试flink cdc, 以及数据变化时update数据是如何落地flink1.14.3iceberg0.13.0cdc: 2.2设计测试场景:1. mysql数据准备1.1 准备数据(初始化)create database xxzh_stock character set utf8;CREATE TABLE `stock_basic` ( `i` bigint(20) DEFAULT NULL, `ts_code` varchar(10), `symbol` varchar(10), `name` varchar(10), `area` varchar(20), `industry` varchar(20), `list_date` varchar(10), `actural_controller` varchar(100) DEFAULT NULL, KEY `ix_stock_basic_index` (`i`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ------------------------------ Records of stock_basic-- ----------------------------INSERT INTO `stock_basic` VALUES ('0', '000001.SZ', '000001', '平安银行', '深圳', '银行', '19910403', null);INSERT INTO `stock_basic` VALUES ('1', '000002.SZ', '000002', '万科A', '深圳', '全国地产', '19910129', null);INSERT INTO `stock_basic` VALUES ('2', '000004.SZ', '000004', '国华网安', '深圳', '软件服务', '19910114', '李映彤');1.2 开启mysql binlog修改mysql配置,/etc/my.cnf,开启binlog, 并记录binlog-do-db=xxzh_stock 这个库的变化文件末尾,增加如下配置server-id=1log-bin=mysql-binbinlog_format=rowbinlog-do-db=xxzh_stock重启mysql,让配置生效[root@hadoop103 conf]# service mysqld restart12 flink表定义2.1 启动flink-sql命令带上icerbg,kafka,hive,cdc的包[root@hadoop101 ~]# sql-client.sh embedded -j /opt/software/iceberg0.13/iceberg-flink-runtime-1.14-0.13.0.jar -j /opt/software/iceberg0.13/flink-sql-connector-hive-2.3.6_2.12-1.14.3.jar -j /opt/software/flink-sql-connector-kafka_2.12-1.14.3.jar -j /opt/software/flink-connector-mysql-cdc-1.4.0.jar shellcdc的下载地址: https://repo.maven.apache.org/maven2/com/alibaba/ververica/flink-connector-mysql-cdc/1.4.0/flink-connector-mysql-cdc-1.4.0.jar2.2 准备mysql的表source表CREATE TABLE stock_basic_source( `i` INT NOT NULL, `ts_code` INT NOT NULL, `symbol` CHAR(10) NOT NULL, `name` char(10) NOT NULL, `area` CHAR(20) NOT NULL, `industry` CHAR(20) NOT NULL, `list_date` CHAR(10) NOT NULL, `actural_controller` CHAR(100) NOT NULL, PRIMARY KEY(i) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'hadoop103', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'xxzh_stock', 'table-name' = 'stock_basic');PRIMARY KEY(i) NOT ENFORCED需要加上主键,否则会报Flink SQL> select * from stock_basic_source;[ERROR] Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'2.3 准备iceberg的sink表CREATE CATALOG hive_catalog6 WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://hadoop101:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://user/hive/warehouse/hive_catalog6');use catalog hive_catalog6;CREATE DATABASE xxzh_stock_mysql_db;USE xxzh_stock_mysql_db;CREATE TABLE stock_basic_iceberg_sink( `i` INT NOT NULL, `ts_code` INT NOT NULL, `symbol` CHAR(10) NOT NULL, `name` char(10) NOT NULL, `area` CHAR(20) NOT NULL, `industry` CHAR(20) NOT NULL, `list_date` CHAR(10) NOT NULL, `actural_controller` CHAR(100) NOT NULL, PRIMARY KEY(i) NOT ENFORCED);3.通过flink从mysq写入iceberg3.1 数据由mysql写入到icebergFlink SQL> insert into hive_catalog6.xxzh_stock_mysql_db.stock_basic_iceberg_sink select * from stock_basic_source;[INFO] Submitting SQL update statement to the cluster...[INFO] SQL update statement has been successfully submitted to the cluster:Job ID: c83f55f408926ad987375f4fa3bf7df44. catalog路径说明catalog切换回default_catalog才能查soure表,或者带default_catalog的绝对路径4.1 切换catalog方式Flink SQL> select * from stock_basic_source;[ERROR] Could not execute SQL statement. Reason:org.apache.calcite.sql.validate.SqlValidatorException: Object 'stock_basic_source' not foundFlink SQL> show current catalog;+----------------------+| current catalog name |+----------------------+| hive_catalog6 |+----------------------+1 row in setFlink SQL> show catalogs;+-----------------+| catalog name |+-----------------+| default_catalog || hive_catalog6 |+-----------------+2 rows in setFlink SQL> use catalog default_catalog;[INFO] Execute statement succeed.Flink SQL> select * from stock_basic_source;4.2 绝对路径方式参考3.25 报错处理5.1 cdc包的问题查询解决方法:准备这个包 flink-connector-mysql-cdc-1.4.0.jarFlink SQL> select * from stock_basic_source;[ERROR] Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.Available factory identifiers are:blackholedatagenfilesystemicebergkafkaprintupsert-kafka5.2 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder2022-02-17 16:24:47,934 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend2022-02-17 16:24:47,934 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@2772b6952022-02-17 16:24:47,936 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@16ba57372022-02-17 16:24:47,936 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4ae44fbc2022-02-17 16:24:47,936 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend2022-02-17 16:24:47,936 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@276604d12022-02-17 16:24:47,936 INFO org.apache.flink.runtime.taskmanager.Task [] - SinkMaterializer -> IcebergStreamWriter (1/1)#3 (f31a1159e5464010d0ea8c9325e3db32) switched from DEPLOYING to INITIALIZING.2022-02-17 16:24:47,941 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, stock_basic_source]], fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) -> NotNullEnforcer(fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) (1/1)#3 (3b4e39607a907745d88639c7300b088a) switched from DEPLOYING to INITIALIZING.2022-02-17 16:24:47,942 WARN org.apache.flink.metrics.MetricGroup [] - The operator name NotNullEnforcer(fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) exceeded the 80 characters length limit and was truncated.2022-02-17 16:24:47,943 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task IcebergFilesCommitter -> Sink: IcebergSink hive_catalog6.xxzh_stock_mysql_db.stock_basic_iceberg_sink (1/1)#3 (9dc85e56ba89021f64f46a5c55b86e9c), deploy into slot with allocation id 6fa395ed1d6e1678faaa162efd8ca0ff.2022-02-17 16:24:47,946 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Source: TableSourceScan(table=[[default_catalog, default_database, stock_basic_source]], fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) exceeded the 80 characters length limit and was truncated.2022-02-17 16:24:47,946 INFO com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction [] - Consumer subtask 0 has no restore state.2022-02-17 16:24:47,947 INFO org.apache.flink.runtime.taskmanager.Task [] - IcebergFilesCommitter -> Sink: IcebergSink hive_catalog6.xxzh_stock_mysql_db.stock_basic_iceberg_sink (1/1)#3 (9dc85e56ba89021f64f46a5c55b86e9c) switched from CREATED to DEPLOYING.2022-02-17 16:24:47,947 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task IcebergFilesCommitter -> Sink: IcebergSink hive_catalog6.xxzh_stock_mysql_db.stock_basic_iceberg_sink (1/1)#3 (9dc85e56ba89021f64f46a5c55b86e9c) [DEPLOYING].2022-02-17 16:24:47,947 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5d60868d2022-02-17 16:24:47,947 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@595f09f92022-02-17 16:24:47,948 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend2022-02-17 16:24:47,948 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@7406169e2022-02-17 16:24:47,948 INFO org.apache.flink.runtime.taskmanager.Task [] - IcebergFilesCommitter -> Sink: IcebergSink hive_catalog6.xxzh_stock_mysql_db.stock_basic_iceberg_sink (1/1)#3 (9dc85e56ba89021f64f46a5c55b86e9c) switched from DEPLOYING to INITIALIZING.2022-02-17 16:24:47,955 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, stock_basic_source]], fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) -> NotNullEnforcer(fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) (1/1)#3 (3b4e39607a907745d88639c7300b088a) switched from INITIALIZING to FAILED with failure cause: java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:166) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748)问题分析:cdc目前支持到flink1.13.3,具体看github pom, https://github.com/ververica/flink-cdc-connectors/blob/master/pom.xml解决方法:降级或者编译一个cdc,参考 http://betheme.net/news/txtlist_i66236v.html碰到这种问题,很难讲心情好。。。下载源码 : https://github.com/ververica/flink-cdc-connectors尝试编译:发现1.14.3的blink包是没有的,好吧,还是放弃好了。blink-planner没有,未来吃太多坑,找社区的人问(github上有个cdc的社区服务钉钉群),也不建议使用flink1.14。降级!最新版flink1.13的子版本是flink1.13.5,cdc的flink1.13.3, 重新编译一个cdccdc的pom.xml中修改flink和scala版本。/[INFO] ------------------------------------------------------------------------[INFO] Reactor Summary for flink-cdc-connectors 2.2-SNAPSHOT:[INFO] [INFO] flink-cdc-connectors ............................... SUCCESS [ 11.822 s][INFO] flink-connector-debezium ........................... SUCCESS [ 6.094 s][INFO] flink-connector-test-util .......................... SUCCESS [ 0.812 s][INFO] flink-connector-mysql-cdc .......................... SUCCESS [ 11.715 s][INFO] flink-connector-postgres-cdc ....................... SUCCESS [ 1.001 s][INFO] flink-connector-oracle-cdc ......................... SUCCESS [ 1.162 s][INFO] flink-connector-mongodb-cdc ........................ SUCCESS [ 1.119 s][INFO] flink-connector-sqlserver-cdc ...................... SUCCESS [ 0.818 s][INFO] flink-sql-connector-mysql-cdc ...................... SUCCESS [ 6.187 s][INFO] flink-sql-connector-postgres-cdc ................... SUCCESS [ 4.314 s][INFO] flink-sql-connector-mongodb-cdc .................... SUCCESS [ 4.149 s][INFO] flink-sql-connector-oracle-cdc ..................... SUCCESS [ 6.307 s][INFO] flink-sql-connector-sqlserver-cdc .................. SUCCESS [ 3.716 s][INFO] flink-format-changelog-json ........................ SUCCESS [ 0.321 s][INFO] flink-cdc-e2e-tests ................................ SUCCESS [ 0.906 s][INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------[INFO] Total time: 01:00 min[INFO] Finished at: 2022-02-17T21:00:10+08:00[INFO] ------------------------------------------------------------------------总结一开始,没有把版本全面考虑,现在又要回滚版本,过程痛苦

内容来源于网络,如侵删。

近日,袋鼠云重磅发布《数据治理行业实践白皮书》,白皮书基于袋鼠云在数据治理领域的8年深厚积累与实践服务经验,从专业视角逐步剖析数据治理难题,阐述数据治理的概念内涵、目标价值、实施路线、保障体系与平台工具,并借助行业实践案例解析,为广大读者提供一种数据治理新思路。

扫码下载《数据治理行业实践白皮书》,下载地址:https://fs80.cn/4w2atuhttp://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4404ca01208ea9dd9be6d88714ca37bc..png

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有