14 | 您所在的位置:网站首页 › pgsql行转列连续日期 › 14 |
目 录 1 DLH简介 1.1 DLH概述 1.2 DLH架构 1.3 功能特点 1.4 应用场景 2 快速入门 2.1 组件安装 2.1.1 查看组件的日志信息 2.2 运行状态监控 2.2.1 查看组件详情 2.2.2 组件检查 2.3 快速使用指导 2.3.1 非Kerberos环境 2.3.2 Kerberos环境 2.4 快速链接使用 2.4.1 配置组件快速链接 2.4.2 访问监控页面 3 使用指南 3.1 离线查询&交互式查询 3.1.1 语法介绍 3.1.2 配置说明 3.1.3 示例 3.2 外部数据源 3.2.1 语法介绍 3.2.2 Hive数据源 3.2.3 SeaSQL MPP数据源 3.2.4 DataEngine MPP数据源 3.2.5 MySQL数据源 3.2.6 HBase数据源 3.2.7 跨源分析 3.3 Hudi存储格式 3.3.1 Hudi存储格式相关术语 3.3.2 文件组织形式 3.3.3 存储类型和视图 3.3.4 Hudi属性说明 3.3.5 SQL操作 3.4 流SQL操作 3.4.1 保留关键字 3.4.2 流SQL UDF 3.4.3 创建表 3.4.4 数据插入 3.4.5 数据查询 3.5 MLSQL 3.5.1 监督算法 3.5.2 非监督算法 3.5.3 模型选择 3.5.4 特征工程 3.5.5 Pipeline机制 3.6 列加密 3.6.1 公钥加密 3.6.2 私钥加密 3.7 权限访问控制 3.7.1 权限说明 3.7.2 权限使用操作示例 3.8 添加/删除进程 3.8.1 添加进程 3.8.2 删除进程 4 配置说明 4.1 DLH常用配置 4.1.1 sparrow-default配置 4.1.2 sparrow-thrift-sparkconf配置 4.1.3 sparrow-hive-site-override配置 4.1.4 dlh-site配置 4.2 Atlas常用配置 4.2.1 application-properties配置 5 常见问题解答 1 DLH简介 1.1 DLH概述 DLH(数据湖仓库)结合数据湖和数据仓库的优势,在数据湖存储上实现了与数据仓库类似的数据结构和数据管理功能,提供“湖仓一体化”的能力。 DLH以HDFS和ONEStor作为数据湖的集中存储库,能够存储结构化、半结构化和非结构化的数据。借助数据集成服务能够将外部系统中数据接入到数据湖内,建立统一管理的数据目录、元数据信息及血缘关系展示等。同时,通过统一SQL接口能够对湖内数据进行离线查询、交互式分析、跨源分析、实时流计算以及机器学习算法训练等。 1.2 DLH架构DLH架构图如图1-1所示,说明如下: · DLH基于Hive提供统一SQL入口,不同场景下SQL语句底层执行引擎可自动进行切换,所有计算任务统一由YARN进行资源调度。 · 存储端支持多种存储格式和数据增量插入、增量查询等能力,并提供简单便捷的数据入湖工具。 · 通过统一的元数据管理界面,能够可视化管理库表结构信息及表数据量或外部数据源大小,并提供血缘关系展示等功能。 · 依赖大数据平台的认证/权限管理、加密管理、审计管理等模板保证DLH组件安全可靠。 图1-1 DLH架构图 1.3 功能特点 DLH的功能特点如下: · 支持存储多种数据类型 能够存储结构化、半结构化和非结构化数据以及流式数据访问。 · 完全兼容Hive标准 Hive作为Hadoop生态SQL事实标准,具有广泛的用户。数据湖仓库服务完全兼容Hive使用方式和语法,原有业务不需要做任何改动。 · 元数据管理 DLH对湖内元数据进行统一管理,能够以可视化的方式管理库表结构信息及表数据量或外部数据源大小等信息,并提供血缘关系展示等功能。 · 流批交互式融合 通过Hive做为SQL统一入口,融合交互式查询、离线查询、流计算等引擎,并扩展SQL优化规则智能选择最佳执行引擎,提供大数据的能力、数据库的体验。 · 协同分析 能够将湖内数据与外部数据源(如MPP数据库、关系型数据库、HBase等)数据进行协同计算,无需搬迁数据即可进行联合分析。 · 数据安全管理 依靠Kerberos和Ranger构建完善的企业级数据安权认证和权限管控。 1.4 应用场景· 交互式分析 在集群计算资源充足的情况下,DLH提供TB级别的数据交互式查询及分析能力。适用于如报表或大盘查询等高吞吐低时延的场景。 · 跨源查询分析 DLH支持对不同数据中心的多种数据源进行跨源查询及联合分析,避免数据迁移。目前支持对Hive、HBase、DataEngine MPP、SeaSQL MPP、MySQL进行不同集群的跨源分析。 · 数据仓库 数据仓库关注的是数据使用效率、大规模下的数据处理及管理能力。为保证数据和计算在湖和仓之间自由流动,DLH基于Hive进行产品设计及功能开发,完全兼容Hive语法,基于Hive的数仓方案无需进行迁移适配。 · 大数据实时流计算 DLH兼容Flink并实现FlinkSQL远程提交运行,目前DLH支持Flink流SQL通过统一入口直接下发Flink集群运行且无性能损耗。 · 机器学习 DLH集成Spark MLLib和GraphX模块并实现MLSQL和GraphSQL,可以方便的利用DLH计算能力对湖和仓中的数据进行机器学习算法训练和数据挖掘分析。 2 快速入门 2.1 组件安装 · 关于在Hadoop集群中,安装DLH时的注意事项和操作指导以及部署过程中相关的参数说明等,详情请参见产品安装部署手册和在线联机帮助。 · DLH依赖Hadoop、Zookeeper、Kafka、HBase、Flink、Presto以及Infra_Solr组件,安装DLH时要求完成上述组件的安装且组件状态正常,DLH组件安装过程无其他特殊要求。DLH安装完成后会自动同时安装Atlas、Hudi组件。 · 关于Presto组件操作的相关说明和指导,详情请参见Presto用户手册。 2.1.1 查看组件的日志信息 DLH安装完成后会自动同时安装Atlas组件,所以进行DLH管理时可能会需要查看这2个组件的日志。 表2-1 组件日志路径说明 组件 日志路径 DLH · DLH Server服务日志路径:/var/de_log/dlh/hive · DLH流SQL服务日志路径:/var/de_log/flink-sql-gateway/user_hdfs Atlas Atlas元数据服务日志路径:/var/de_log/atlas 2.2 运行状态监控 2.2.1 查看组件详情 因DLH组件依赖Atlas和Hudi组件,所以进行DLH管理时可能会需要访问这3个组件的组件详情页面。 1. 查看DLH组件详情进入DLH组件详情页面,如图2-1所示。组件详情页面主要展示基本信息、部署拓扑、配置和配置修改历史等相关信息,同时可对组件或组件进程执行相关管理操作,可查看或修改组件的各配置项信息,也可查看组件的配置修改历史及当前使用配置版本。 主要功能如下: (1) 基本信息:展示组件的基本配置信息,比如:组件内各服务进程的状态(鼠标悬停在各进程名上时,可查看进程的安装地址)、DLH JDBC连接URL(支持点击URL直接进行复制)。 (2) 部署拓扑:在组件详情的[部署拓扑]页签,可查看组件进程的安装详情以及运行状态详情,并可对组件执行停止、重启、删除等相关操作。 【说明】进程名:同一个进程可分别安装在多个主机节点上,所以进程列表中某一进程名可能重复出现,但同一进程名对应的主机名和主机IP不同。 (3) 配置:在组件详情的[配置]页签,可查看或修改组件各配置项的信息。 (4) 配置修改历史:在组件详情的[配置修改历史]页签,可查询组件的配置历史版本以及当前使用版本。 (5) 组件操作:在组件详情页面右上角,可对组件执行相关管理操作。比如:重启组件、添加进程、下载Client、访问快速链接、查看操作记录等。 图2-1 DLH组件详情 2. 查看Atlas组件详情 进入Atlas组件详情页面,如图2-2所示。组件详情页面主要展示基本信息、部署拓扑、配置和配置修改历史等相关信息,同时可对组件或组件进程执行相关管理操作,可查看或修改组件的各配置项信息,也可查看组件的配置修改历史及当前使用配置版本。 主要功能如下: (1) 基本信息:展示组件的基本配置信息,比如:组件内各服务进程的状态(鼠标悬停在各进程名上时,可查看进程的安装地址)。 (2) 部署拓扑:在组件详情的[部署拓扑]页签,可查看组件进程的安装详情以及运行状态详情,并可对组件执行停止、重启等相关操作。 【说明】进程名:同一个进程可分别安装在多个主机节点上,所以进程列表中某一进程名可能重复出现,但同一进程名对应的主机名和主机IP不同。 (3) 配置:在组件详情的[配置]页签,可查看或修改组件各配置项的信息。 (4) 配置修改历史:在组件详情的[配置修改历史]页签,可查询组件的配置历史版本以及当前使用版本。 (5) 组件操作:在组件详情页面右上角,可对组件执行相关管理操作。比如:重启组件、添加进程、访问快速链接、查看操作记录等。 图2-2 Atlas组件详情 3. 查看Hudi组件详情 进入Hudi组件详情页面,如图2-3所示。组件详情页面主要展示基本信息、部署拓扑、配置和配置修改历史等相关信息,同时可对组件或组件进程执行相关管理操作,可查看或修改组件的各配置项信息,也可查看组件的配置修改历史及当前使用配置版本。 主要功能如下: (1) 基本信息:展示组件的基本配置信息,比如:组件内各服务进程的状态(鼠标悬停在各进程名上时,可查看进程的安装地址)。 (2) 部署拓扑:在组件详情的[部署拓扑]页签,可查看组件进程的安装详情以及运行状态详情,并可对组件执行删除等相关操作。 【说明】进程名:同一个进程可分别安装在多个主机节点上,所以进程列表中某一进程名可能重复出现,但同一进程名对应的主机名和主机IP不同。 (3) 配置:在组件详情的[配置]页签,可查看或修改组件各配置项的信息。 (4) 配置修改历史:在组件详情的[配置修改历史]页签,可查询组件的配置历史版本以及当前使用版本。 (5) 组件操作:在组件详情页面右上角,可对组件执行相关管理操作。比如:重启组件、添加进程、查看操作记录等。 图2-3 Hudi组件详情 2.2.2 组件检查 因DLH组件依赖Atlas组件,所以进行DLH管理时可能会需要对这2个组件执行组件检查操作。 1. DLH组件检查集群在使用过程中,根据实际需要,可对DLH组件执行组件检查操作。 (1) 对DLH组件执行组件检查的方式有以下三种,任选其一即可: ¡ 在[集群管理/集群列表]页面,单击某集群名称可跳转至对应的集群详情页面。 - 在集群详情页面选择[组件]页签,单击组件列表中DLH组件对应的按钮。 - 在集群详情页面选择[组件]页签,单击组件列表中DLH组件名称进入组件详情页面,在右上角组件操作的下拉框中选择按钮。 ¡ 在组件管理的组件详情页面右上角组件操作的下拉框中选择按钮。 (2) 组件检查结束后,检查窗中会显示组件检查成功或失败的状态。如图2-4所示,组件检查成功则表示该组件可正常使用。 图2-4 组件检查
(3) 组件检查结束后,在组件详情页面单击按钮,进入操作记录窗口。可查看“DLH Service Check”组件操作执行的详细信息以及操作日志详情,根据操作日志可判断组件检查的具体情况。 图2-5 组件检查日志详情 2. Altas组件检查 集群在使用过程中,根据实际需要,可对Altas组件执行组件检查操作。 (1) 对Altas组件执行组件检查的方式有以下两种,任选其一即可: ¡ 在[集群管理/集群列表]页面,单击某集群名称可跳转至对应的集群详情页面。 - 在集群详情页面选择[组件]页签,单击组件列表中Altas组件对应的按钮。 - 在集群详情页面选择[组件]页签,单击组件列表中Altas组件名称进入组件详情页面,在右上角组件操作的下拉框中选择按钮。 (2) 组件检查结束后,检查窗中会显示组件检查成功或失败的状态。如图2-6所示,组件检查成功则表示该组件可正常使用。 图2-6 组件检查
(3) 组件检查结束后,在组件详情页面单击按钮,进入操作记录窗口。可查看“ATLAS Service Check”组件操作执行的详细信息以及操作日志详情,根据操作日志可判断组件检查的具体情况。 图2-7 组件检查日志详情 2.3 快速使用指导 DLH既可以通过集群用户访问,又可以通过组件超级用户访问。其中: · 集群用户:指在大数据集群的[集群权限/用户管理]页面可查看到的用户,包括集群超级用户和集群普通用户。其中: ¡ 集群超级用户:仅Hadoop集群拥有集群超级用户。新建Hadoop集群成功后,集群超级用户会自动同步到[集群权限/用户管理]页面,且对应描述为“集群超级用户”。 ¡ 集群普通用户:指在[集群权限/用户管理]页面新建的用户。开启权限管理后,普通用户绑定角色后即可拥有该角色所具有的权限;不开权限管理时,普通用户缺省仅拥有各组件原生的用户权限。 · 组件超级用户:指组件内部的最高权限用户,如Hive组件的hive用户。大数据集群中安装组件时均会缺省创建组件内置超级用户,也是集群用户的一种。 2.3.1 非Kerberos环境非Kerberos环境下,用户不需要做身份认证即可直接连接DLH执行相关管理操作。
非Kerberos环境下,通过控制台执行DLH SQL操作前需连接DLHServer2,连接操作支持“通过dlh命令连接”和“通过beeine脚本连接”两种方式。 1. 通过dlh命令连接登录集群内的任一节点(默认所有节点均安装了DLH Client),直接执行dlh命令即可进行连接。通过该方式连接DLHServer2时会自动填入连接信息,连接用户使用当前用户。 图2-8 dlh命令连接 2. 通过beeine脚本连接 执行连接操作的用户需具备相应的脚本执行权限。
登录集群内的任一节点(默认所有节点均安装了DLH Client),执行如下命令: /usr/hdp/3.0.1.0-187/dlh/hive/bin/beeline -u "{jdbc_url}" -n hive -p "" 【说明】 · 在beeline命令中: ¡ -u:指定连接DLHServer2的IP地址,端口号默认为13000。 ¡ -n:指定连接所需用户名。 ¡ -p:指定连接所需密码(可为空值)。 · DLH JDBC连接URL信息获取包括以下两种方式: ¡ 在DLH组件详情页面获取 访问大数据平台管理系统,进入DLH组件详情页面,在基本信息模块可查看DLHServer2 JDBC URL信息,如图2-9所示,直接拷贝即可。 图2-9 DLH组件JDBC URL
¡ 手动拼接(未开启Kerberos) jdbc:hive2://:13000/ 其中: - jdbc:hive2://:为固定前缀。 - 为DLHServer2所在的机器IP地址或主机名(在集群外通过主机名连接HiveServer2时必须配置本地hosts文件)。 - 13000:为DLHServer2的默认端口号。 2.3.2 Kerberos环境Kerberos环境下,用户需要做身份认证才可直接连接DLH执行相关管理操作,认证方式请参见2.3.2 1. Kerberos环境下用户身份认证。
Kerberos环境下,通过控制台执行DLH SQL操作前需执行以下两步操作: (1) 首先进行用户身份认证,才可以正常访问DLH。 (2) 连接DLHServer2,连接操作支持“通过dlh命令连接”和“通过beeine脚本连接”两种方式。 1. Kerberos环境下用户身份认证如果大数据集群开启Kerberos,若想操作DLH,则必须首先进行用户身份认证。根据用户类型不同,分为以下两类: · 集群用户身份认证 · 组件用户身份认证 (一)集群用户身份认证· 集群用户指在大数据集群的[集群权限/用户管理]页面可查看到的用户,包括集群超级用户和集群普通用户。 · Kerberos环境下,集群用户的认证文件可在[集群权限/用户管理]页面,单击用户列表中用户对应的按钮进行下载。
DLH还可以通过集群用户访问。在开启Kerberos的大数据集群中进行集群用户(以user1用户示例)身份认证的方式,包括以下两种(根据实际情况任选其一即可): (1) 方式一:(此方式不要求知道用户密码,直接使用keytab文件进行认证) a. 将用户user1的认证文件(即keytab配置包)解压后,上传至访问节点的/etc/security/keytabs/目录下,然后将keytab文件的所有者修改为user1,命令如下: chown user1 /etc/security/keytabs/user1.keytab b. 使用klist命令查看user1.keytab的principal名称,命令如下: klist -k user1.keytab 【说明】如图2-10所示,红框内容即为user1.keytab的principal名称。 图2-10 认证文件的principal名称
c. 切换至用户user1,并执行身份验证的命令如下: su user1 kinit -kt user1.keytab [email protected] 【说明】其中:user1.keytab为用户user1的keytab文件,[email protected]为user1.keytab的principal名称。 d. 输入klist命令可查看认证结果。 图2-11 查看认证结果
(2) 方式二:(此方式要求用户密码已知,通过密码直接进行认证) a. 切换至用户user1进行认证,输入以下命令: su user1 kinit user1 b. 根据提示输入密码Password for [email protected]: c. 输入klist命令可查看认证结果。 图2-12 查看认证结果 (二)组件超级用户身份认证 DLH可以通过组件超级用户访问,在开启Kerberos的集群中进行组件超级用户(以hive用户示例)认证的步骤如下: (1) 在集群内节点的/etc/security/keytabs/目录下,查找hive的认证文件“hive.service.keytab”。 【说明】在DLH Client节点上,需要将hive的认证文件“hive.service.keytab”上传节点的/etc/security/keytabs/目录下进行认证。 (2) 使用klist命令查看hive.service.keytab的principal名称,命令如下: klist -k hive.service.keytab 【说明】如图2-13所示,红框内容即为hive.service.keytab的principal名称。 图2-13 认证文件的principal名称
(3) 切换至用户hive,并执行身份验证的命令如下: su hive kinit -kt hive.service.keytab hive/[email protected] 【说明】其中:hive.service.keytab为hive的认证文件,hive/[email protected]为hive.service.keytab的principal名称。 (4) 输入klist命令可查看认证结果。 图2-14 查看认证结果 2. 通过dlh命令连接 登录集群内的任一节点(默认所有节点均安装了DLH Client),直接执行dlh命令即可进行连接。通过该方式连接DLHServer2时会自动填入连接信息,连接用户使用当前用户。 图2-15 dlh命令连接 3. 通过beeine脚本连接 执行连接操作的用户需具备相应的脚本执行权限。
登录集群内的任一节点(默认所有节点均安装了DLH Client),执行如下命令: /usr/hdp/3.0.1.0-187/dlh/hive/bin/beeline -u "{jdbc_url}" -n hive -p "" 【说明】 · 在beeline命令中: ¡ -u:指定连接DLHServer2的IP地址,端口号默认为13000。 ¡ -n:指定连接所需用户名。 ¡ -p:指定连接所需密码(可为空值)。 · DLH JDBC连接URL信息获取包括以下两种方式: ¡ 在DLH组件详情页面获取 访问大数据平台管理系统,进入DLH组件详情页面,在基本信息模块可查看DLHServer2 JDBC URL信息,如图2-16所示,直接拷贝即可。 图2-16 DLH组件JDBC URL
¡ 手动拼接URL(开启Kerberos) jdbc:hive2:// :13000/;principal=hive/[email protected] 其中: - -u用于指定连接DLHServer2的地址和端口号。 - DLHServer2的端口号默认为13000,principal名称为hive/[email protected](其中_HOST是DLHServer2进程所在主机的域名地址,principal获取方式请参考下面的说明),示例:hive/[email protected]。 - -n用于指定连接所需的用户名(可为空值)。 - -p用于指定连接所需的密码(可为空值)。 关于principal的配置值获取,可以登录大数据平台管理系统,进入DLH组件详情页面,在组件详情的[配置]页签,可查看配置文件dlh-site.xml中“hive.server2.authentication.kerberos.principal”的配置项获取。 2.4 快速链接使用 因DLH组件依赖Atlas组件,所以进行DLH管理时可能会需要访问这2个组件的快速链接页面。 1. DLH快速链接DLH快速链接包括DLHServer监控和HistroyServer任务监控,分别访问不同的监控页面可以查看不同的信息。如图2-17所示,在DLH组件详情页面的右上角[快速链接]的下拉框中,可以获取不同监控页面的访问入口信息。 【说明】当集群开启高可用时,DLH组件会同步开启HA,此时部分监控页面有两个访问入口,任选其一即可。 图2-17 DLH快速链接 2. Atlas快速链接 Atlas快速链接为Atlas UI,提供了基于DLH库、表、列及血缘等元数据管理功能。如图2-18所示,在Atlas组件详情页面的右上角[快速链接]的下拉框中,可以获取Atlas UI访问入口信息。 【说明】当集群开启高可用时,Atlas组件会同步开启HA,此时组件UI页面有两个访问入口,任选其一即可。 图2-18 Atlas快速链接 2.4.1 配置组件快速链接 大数据集群部署完成后,若想要访问DLH或Atlas监控页面,首先需要修改本地hosts文件,用以确保组件的快速链接页面通过域名访问能够顺利跳转。 修改本地hosts文件的方法如下: (1) 登录大数据集群中任意一节点,查看当前集群的hosts文件(Linux环境下位置为/etc/hosts)。 (2) 将集群的hosts文件信息添加到本地hosts文件中。若本地电脑是Windows环境,则hosts文件位于C:\Windows\System32\drivers\etc\hosts,修改该hosts文件并保存。 (3) 在本地hosts文件中配置主机域名信息完成后,即可访问组件的快速链接。 2.4.2 访问监控页面 1. DLHServer监控在DLH组件详情页面的右上角[快速链接]的下拉框中选择DLHServer(如果多个快速链接,任选其一即可),此时不需要通过用户名和密码,即可直接跳转至DLHServer监控UI页面。 如图2-19所示,DLHServer主页监控页面显示的是当前链接的会话,包括:IP、用户名、当前执行的查询数量、链接总时长、空闲时长等。如果有会话执行查询,页面中的Queries模块会显示查询的语句、执行耗时等。 图2-19 访问DLHServer UI页面 2. HistroyServer任务监控 在DLH组件详情页面的右上角[快速链接]的下拉框中选择HistoryServer,根据集群是否开启Kerberos,访问HistoryServer快速链接分为两种情况: · 若集群没有开启Kerberos认证,则此时点击访问入口链接,不需要通过用户名和密码,即可直接跳转访问HistoryServer任务监控UI页面。 · 若集群开启了Kerberos认证,则此时点击访问入口链接,需要输入用户名和密码进行认证(可以使用集群创建时填写的超级用户,也可以使用用户管理中创建的用户),然后才可跳转访问对应的UI页面。 在HistoryServer监控页面,可查看如下信息: (1) 如图2-20所示,History Server页面默认展示已完成的历史任务列表。单击页面左下角“Show incomplete applications”可跳转到正在运行的任务列表。 图2-20 访问HistroyServer UI页面
(2) 单击任务列表中的App ID,可进入该任务的Jobs列表页面,可对Completed Jobs已完成的任务查看详情。 图2-21 Spark Jobs列表页面
(3) 在Spark Jobs列表页面,单击“Completed Jobs”列表中的任意Job,可进入该Job的详情页面,查看Stage列表。 图2-22 Job详情页面
(4) 在Job详情页面,单击“Completed Stages”列表中任意Stage,可进入该Stage的详情页面,查看每个task的执行时间、GC时间等。 图2-23 Stage详细页面 3. Atlas元数据管理 DLH基于Atlas提供了Hive元数据管理功能,对DLH内部的数据库、数据表、数据流、数据列以及列血缘等元数据提供了查看基本属性(在基本属性中可以在业务层面增加属性以及标签)、血缘分析以及业务类别分类等能力。 访问DLH的Atlas元数据管理页面的方式如下: (1) 在集群管理页面的左侧导航树中选择[集群列表],进入集群列表页面,单击集群名称可跳转至集群详情页面。 (2) 在集群详情页面选择[组件]页签,单击组件列表中Atlas组件名进入Atlas组件详情页面。 (3) 在Atlas组件详情页面的右上角[快速链接]的下拉框中,选择Atlas(如果多个快速链接,任选其一即可),可跳转至DLH对应的Atlas元数据管理登录页面,此时需要通过admin用户和密码(admin用户的密码为CloudOS5#DE3@Atlas),才可访问Atlas元数据管理UI页面。 本章节仅对Atlas元数据管理UI页面进行简单说明,更多关于Atlas原生UI界面的功能特性说明请参见atlas官网。
下面对Atlas元数据管理UI页面进行说明: (1) Atlas元数据管理UI页面主要分为左右两部分,左部分为功能模块菜单,包括Entities、Classification、Business Metadata、Glossaries和Custom Filters,右部分为功能展示区。 (2) 如图2-24所示,在Entities菜单中,单击具体的实体元数据(如:hive_table(4))则显示对应表的元数据信息。如图2-25所示,在表的元数据信息中再单击具体的表Name可以跳转至表的详情页面。上面中间的搜索框可以对Entities菜单中的各类实体元数据进行全局搜索。 图2-24 Entities菜单
图2-25 通过表Name查看表的详情
(3) 如图2-26所示,在Classifications菜单中,可以添加类别,供Entities中具体的元数据使用。 图2-26 Classifications菜单
(4) 如图2-27所示,在Business Metadata菜单中,可以添加业务元数据、枚举值等进行管理,业务元数据可以供Entities中具体的元数据使用。 图2-27 Business Metadata菜单 (5) 如图2-28所示,在Glossaries菜单中,可以管理词汇表,词汇表可以运用在Entities中具体的元数据中。 图2-28 Glossaries菜单
(6) 如图2-29所示,在Custom Filters菜单中,显示保存的自定义查询,在此处可以进行重命名、删除以及快速定位原来查询条件的查询结果。 图2-29 Custom Filters菜单 3 使用指南 3.1 离线查询&交互式查询 DLH离线查询功能完全兼容Hive,同时支持基于Presto的交互式查询功能。并且,当前版本中,已实现离线查询和交互式查询引擎的自动选择功能。 3.1.1 语法介绍 1. 离线查询语法 DLH基于Hive开发,离线查询语法完全兼容Hive语法。 2. 交互式查询语法DLH基于Presto提供交互式查询能力,SQL语句兼容Presto基本查询语法。另外,Presto自身也支持CLI和JDBC接口进行查询。 · Presto查询语法请参见Trino官网(v316)。 · 外部数据源表示数据源和DLH组件不属于同一个大数据集群。DLH不支持对外部数据源的库表查询引擎切换。 3.1.2 配置说明 DLH对非外部数据源的库表进行SQL查询时,可根据业务需要选择合适的执行引擎,引擎选择开关由表3-1中所示配置决定。 表3-1 离线查询&交互式查询执行引擎相关配置 配置项 默认值 说明 dlh.query.engine hive 执行引擎选项支持hive、presto、auto。该值为hive表示执行引擎固定为Hive,该值为presto表示执行引擎固定为Presto,该值为auto时默认执行引擎为Presto(若使用Presto执行失败或表大小超设定阈值则转交至Hive执行) hive.presto.table.threshold 10737418240 表大小默认阈值为10GB,单位为B。当执行引擎为auto时,若表容量小于该阈值则查询走presto执行,若表容量大于该阈值则查询走hive执行 hive.presto.http-server.http.port - Presto的协调器节点的地址,比如:http://localhost:18089。若集群开启了HA,可配置为虚拟主机名及代理端口,比如:http://testclr-vserver.vip.hde.com:18090,这部分信息可以从Presto组件的自定义config配置http.proxy.url获取 3.1.3 示例 (1) 默认配置下,DLH非外部数据源的库表查询默认由Hive执行 0: jdbc:hive2://qcj39.hde.com:2181,qcj40.hde.> set dlh.query.engine; +----------------------+ | set | +----------------------+ | dlh.query.engine=hive| +----------------------+ 1 row selected (0.017 seconds) (2) 设置DLH非外部数据源的库表查询引擎为Presto,此时SQL查询由交互式查询引擎执行 0: jdbc:hive2://qcj39.hde.com:2181,qcj40.hde.> set dlh.query.engine = presto; No rows affected (0.01 seconds) (3) 执行SQL查询,此时查询任务由Presto完成(可通过DLH组件管理界面的快速链接查看对应任务) 0: jdbc:hive2://qcj39.hde.com:2181,qcj40.hde.> select count(*) from reason; +-----+ | c0 | +-----+ | 35 | +-----+ 1 row selected (2.877 seconds) (4) 设置DLH非外部数据源的库表查询引擎为auto,表大小阈值为1GB(表大小默认阈值为10GB),此时超过1G的表走Hive执行,小于1G的表走Presto执行 0: jdbc:hive2://qcj39.hde.com:2181,qcj40.hde.> set dlh.query.engine = auto; No rows affected (0.01 seconds) 0: jdbc:hive2://qcj39.hde.com:2181,qcj40.hde.> set hive.presto.table.threshold=10485760; No rows affected (0.01 seconds) 3.2 外部数据源· 外部数据源表示数据源和DLH组件不属于同一个大数据集群。 · 外部数据源配置存放资源文件的路径时(如:hdfs-site.xml、core-site.xml或认证文件krb5.conf、keytab等),仅支持/etc、/opt、/usr/hdp、/tmp、/apps/presto及其子目录。 · 外部数据源分析基于Presto实现,在DLH中执行需要把执行引擎设置为presto。
DLH组件支持对Hive、SeaSQL MPP、DataEngine MPP、MySQL、HBase等外部数据源进行跨源查询,避免数据迁移。 3.2.1 语法介绍· 注册数据源 create catalog (if not exists) using with properties ("" = "", "" = "", ....); 其中: ¡ catalogType支持Hive、HBase、SeaSQL MPP、DataEngine MPP、MySQL等,不区分大小写。 ¡ 注册数据源时,不同的数据源需要的配置项不同,详情请参见3.2.2 ~3.2.6 章节。比如,注册hive数据源时,命令如下: create catalog if not exists default using hive with properties ( "hive.metastore.uri" = "thrift://node1.hde.com:19083","hive.config.resources"="/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml","hive.allow-drop-table"="true","hive.allow-rename-table"="true","hive.parquet.use-column-names"="true"); · 获取所有的数据源信息 show catalogs; · 切换数据源 use catalog ; · 删除数据源 drop catalog ; · 切换到数据源下的某个数据库(或schema) use .; · 查看某个数据源的创建数据源语句 show create catalog ; · 查看某个数据源下面的所有库列表 show databases in 或show databases from ; · 查看某个数据源下面的所有表列表 show tables in .; · 读取某个数据源下的某个库下的某个表数据 select * from ..; 3.2.2 Hive数据源Hive数据源支持通过DLH组件进行创库、建表、查询的操作。 1. 基础配置表3-2 Hive数据源相关配置 配置项 默认值 说明 hive.metastore.uri - 使用Thrift协议连接Hive元存储的URL。如果提供了多个URL,则默认使用第一个URL。该属性必选。 示例:thrift://192.0.2.3:9083或thrift://192.0.2.3:9083,thrift://192.0.2.4:9083 hive.config.resources - HDFS配置文件列表,以逗号分隔。该文件必须存在于部署DLH组件的所有节点上。 示例:/etc/hdfs-site.xml hive.storage-format ORC 创建新表时采用的默认文件格式,支持ORC、PARQUET、AVRO、RCBINARY、SEQUENCEFILE、JSON、TEXTFILE、CSV hive.parquet.use-column-names false 配置为true时,查询读取Parquet存储格式的表(如hudi表)时,使用列名而非下标进行组装数据,可避免元数据schema修改后列名与数据对应错位的问题 2. 安全相关配置 通过在Hive目录属性文件中设置hive.security属性可以启用Hive的授权检查,hive.security属性必须是表3-3中的值。 表3-3 Hive数据源安全相关配置 属性值 说明 legacy(默认值) 授权检查很少执行,因此大多数操作都是允许的。使用配置属性hive.allow-drop-table、hive.allow-rename-table、hive.allow-add-column、hive.allow-drop-column和hive.allow-rename-column来限定数据源是否进行相应操作 read-only 允许读数据和读取元数据的操作(如SELECT),不允许写数据或者写元数据的操作(如CREATE、INSERT或DELETE) 3. Kerberos相关配置 Hive数据源开启Kerberos时,需要的额外配置如表3-4所示。 表3-4 Kerberos环境下额外配置 配置项 默认值 说明 hive.metastore.authentication.type NONE Hive元存储身份验证类型,可选择:NONE或KERBEROS hive.metastore.service.principal - Hive元存储服务的Kerberos主体,Hive数据源开启Kerberos时需要。此属性值中可使用_HOST占位符 示例:hive/[email protected] hive.metastore.client.principal - 在连接到Hive元存储服务时将使用的Kerberos主体,Hive数据源开启Kerberos时需要 hive.metastore.client.keytab - Hive元存储客户端的keytab位置,Hive数据源开启Kerberos时需要 hive.metastore.krb5.conf.path - Kerberos服务器地址,Hive数据源开启Kerberos时需要 hive.hdfs.authentication.type - 身份验证类型,可选择:NONE或KERBEROS hive.hdfs.presto.principal - HDFS客户端Kerberos主体,Hive数据源开启Kerberos时需要 hive.hdfs.presto.keytab - HDFS客户端Kerberos认证文件,Hive数据源开启Kerberos时需要
Hive数据源开启Kerberos时: · 若想通过DLH集群访问Hive外部数据源,需要进行相关互信配置,详情请参见3.2.2 4. (2)示例2:创建并访问开启Kerberos的Hive数据源示例。 · 通过DLH组件查看外部Hive数据源的库表时,与通过Hive查看时数据类型有所不同。比如:show databases时,会多显示一个information_schema数据库。 4. 示例 (1) 示例1:创建并访问未开启Kerberos的Hive外部数据源 a. 将Hive外部数据源对应的HDFS配置文件:core-site.xml文件和hdfs-site.xml文件,拷贝到安装DLH组件的所有节点上(示例:拷贝至/opt/190目录下) b. 将Hive外部数据源集群节点的映射关系(hosts文件内容)添加到安装DLH组件的所有节点上的/etc/hosts文件中 c. 后台登录DLH控制台,创建名为hive2数据源,该数据源访问集群中xldwhj190.hde.com节点的hive create catalog hive2 using hive with properties ( "hive.metastore.uri" = "thrift://xldwhj190.hde.com:9083", "hive.config.resources" = "/opt/190/core-site.xml,/opt/190/hdfs-site.xml" ); d. 切换数据源为hive2 use catalog hive2; e. 创建名为db2的数据库 create database db2; f. 切换到hive2.db2的数据库下 use hive2.db2; g. 创建t11表 create table t11(id int, name string); h. 查看t11表的建表语句 show create table t11; i. 插入一条记录 insert into t11 values(0, 'a'); j. 查询t11表中数据 select * from t11;
(2) 示例2:创建并访问开启Kerberos的Hive数据源 a. 将Hive外部数据源对应的HDFS配置文件:core-site.xml文件和hdfs-site.xml文件,拷贝到安装DLH组件的所有节点上 b. 将Hive外部数据源集群节点的映射关系(hosts文件内容)添加到安装DLH组件的所有节点上的/etc/hosts文件中 c. 登录大数据集群的管理系统,在[集群权限/用户管理]页面下载对应外部集群用户的认证文件 d. 将下载的外部集群用户认证文件,拷贝到安装DLH组件的所有节点上(路径为创建catalog时指定的路径) e. 配置krb5.conf文件,示例如图3-1所示,有两种方式: - 方式一:将目的数据源集群的krb5.conf文件拷贝到安装DLH组件的所有节点上(路径为创建catalog时指定的路径)。 - 方式二:是将目的数据源集群的krb5.conf文件中并入到数据湖所在集群的/etc/krb5.conf文件,realms内容直接拷贝,domain信息需要将目的集群和本集群的机器名逐一采用机器名=集群名的格式进行编辑(仅修改本地集群即可)。 图3-1 配置/etc/krb5.conf文件
f. 后台登录DLH控制台,创建一个名为hive190的数据源,该数据源可以访问目的集群的Hive create catalog hive190 using hive with properties ( "hive.metastore.uri" = "thrift://xldwhj190.hde.com:9083",-- 外部数据源HiveMetastore地址,多个英文逗号隔开 "hive.allow-drop-table" = "true", "hive.config.resources" = "/opt/190/core-site.xml,/opt/190/hdfs-site.xml", "hive.hdfs.authentication.type" = "KERBEROS", "hive.metastore.authentication.type" = "KERBEROS", "hive.metastore.service.principal" = "hive/[email protected]",--固定格式,域名为目的集群的realm "hive.hdfs.presto.principal" = "[email protected]",-- 目的集群用户principal "hive.hdfs.presto.keytab" = "/opt/190/admin123.keytab",-- 目的集群用户keytabl "hive.metastore.krb5.conf.path" = "/etc/krb5.conf", "hive.metastore.client.keytab" = "/opt/190/admin123.keytab",-- 目的集群用户keytab "hive.metastore.client.principal" = "[email protected]"--目的集群用户principal ); (3) 示例3:创建指定存储格式的数据源(根据需要添加开启Kerberos或未开启Kerberos所需要的参数项) 其中:"hive.storage-format" = "PARQUET"属性用于指定存储格式。 create catalog hive3 using hive with properties ( "hive.metastore.uri" = "thrift://xldwhj190.hde.com:9083", "hive.storage-format" = "PARQUET", "hive.config.resources" = "/opt/190/core-site.xml,/opt/190/hdfs-site.xml" …… ); 验证建表后,查看表存储格式是否为parquet: create table t12(id int, name string); show create table t12; (4) 示例4:创建安全模式为legacy的数据源(根据需要添加开启Kerberos或未开启Kerberos所需要的参数项) 其中:"hive.allow-drop-table" = "true","hive.allow-drop-column"="true","hive.allow-rename-column"="true","hive.allow-add-column"="true","hive.allow-rename-table"="true"等属性用于指定安全模式为legacy。 …… create catalog hive4 using hive with properties ( "hive.metastore.uri" = "thrift://xldwhj190.hde.com:9083", "hive.config.resources" = "/opt/190/core-site.xml,/opt/190/hdfs-site.xml", "hive.allow-drop-table" = "true", "hive.allow-drop-column"="true", "hive.allow-rename-column"="true", "hive.allow-add-column"="true", "hive.allow-rename-table"="true" …… ); 验证删除表、删除列、表重命名、添加列、列重命名是否可行(True表示支持): alter table t11 RENAME TO t11new; alter table t11 add column age string; alter table t11 RENAME COLUMN name TO namenew ; alter table t11 DROP COLUMN id; drop table t11new; (5) 示例5:创建安全模式为read-only的数据源(根据需要添加开启Kerberos或未开启Kerberos所需要的参数项) 其中:"hive.security" = "read-only"属性用于指定安全模式为read-only。 create catalog hive3 using hive with properties ( "hive.metastore.uri" = "thrift://xldwhj190.hde.com:9083", "hive.config.resources" = "/opt/190/core-site.xml,/opt/190/hdfs-site.xml", "hive.security" = "read-only" …… ); 验证insert、alter等操作是否已经不可用: insert into t11 values(0, 'a'); 3.2.3 SeaSQL MPP数据源SeaSQL MPP数据源支持通过DLH组件进行查询和创建表。 SeaSQL MPP数据源不支持的功能包括:CREATE DATABASE、CREATE SCHEMA、UPDATE、DELETE、GRANT、REVOKE、SHOW GRANTS、SHOW ROLES、SHOW ROLE GRANTS等语法。 1. 数据源配置 通过表3-5所示参数,可以配置SeaSQL MPP数据源连接器。 表3-5 SeaSQL MPP数据源相关配置 配置项 默认值 说明 connection-url - 连接SeaSQL MPP的地址,比如:jdbc:postgresql://example.net:5432/database connection-user - 连接SeaSQL MPP的用户名,比如:root connection-password - 连接SeaSQL MPP的密码,比如:secret 2. 示例 创建SeaSQL MPP数据源并对数据源进行查看数据库、创建表等操作。 · 创建一个名叫seqsql1的数据源 create catalog seqsql1 using seasql with properties ( "connection-url"="jdbc:postgresql://10.121.88.114:5434/postgres", "connection-user"="ssadmin", "connection-password"="passwd" ); · 查看seqsql1下的数据库 show databases from seqsql1; · 查看seqsql1.public下的表 show tables from seqsql1.public; · 创建一个t1的表 create table seqsql1.public.t1 (i int , s string); · 向表t1中插入一条记录 insert into seqsql1.public.t1 values(0, 'a'); · 查询表t1中数据 select * from seqsql1.public.t1; 3.2.4 DataEngine MPP数据源DataEngine MPP数据源支持通过DLH组件进行查询和创建表。 DataEngine MPP数据源不支持的功能包括:CREATE DATABASE、CREATE SCHEMA、UPDATE、DELETE、GRANT、REVOKE、SHOW GRANTS、SHOW ROLES、SHOW ROLE GRANTS等语法。 1. 数据源配置 通过表3-6所示参数,可以配置DataEngine MPP数据源连接器。 表3-6 DataEngine MPP数据源相关配置 配置项 默认值 说明 connection-url - 连接DataEngine MPP的地址,比如:jdbc:vertica://example.net:5433/database connection-user - 连接DataEngine MPP的用户名,比如:root connection-password - 连接DataEngine MPP的密码,比如:secret 2. 示例 创建DataEngine MPP数据源并对数据源进行查看数据库、创建表等操作。 · 创建一个名叫test01的数据源 create catalog test01 using vertica with properties ( "connection-url"="jdbc:vertica://10.121.66.152:5433/Database", "connection-user"="dbadmin", "connection-password"="passwd" ); · 查看test01下的模式 show databases from test01; · 查看test01.public下的表 show tables from test01.public; · 切换到test01.public模式下 use test01.public; · 创建一个test1的表 create table test1(i int ,s string); · 向表test1中插入一条记录 insert into test1 values(0, 'a'); · 查询表test1中数据 select * from vertica1.public. test1; 3.2.5 MySQL数据源MySQL数据源支持通过DLH组件进行查询和创建表。 MySQL数据源不支持的功能包括:CREATE DATABASE、CREATE SCHEMA、UPDATE、DELETE、GRANT、REVOKE、SHOW GRANTS、SHOW ROLES、SHOW ROLE GRANTS等语法。 1. 数据源配置 通过表3-7所示参数,可以配置MySQL数据源连接器。 表3-7 MySQL数据源相关配置 配置项 说明 connection-url 连接MySQL的地址,比如:jdbc:mysql://example.net:3306 connection-user 连接MySQL的用户名,比如:root connection-password 连接MySQL的密码,比如:secret 2. 示例 创建MySQL数据源并对数据源进行查看数据库、创建表等操作。 · 创建一个名叫mysql12的数据源 create catalog mysql12 using mysql with properties ( "connection-url"="jdbc:mysql://10.121.65.63:53306", "connection-user"="user01", "connection-password"="passwd" ); · 查看mysql12下的数据库 show databases from mysql12; · 查看mysql12.ambari_hivedebug0908下的表 show tables from mysql12.hivedebug0908; · 创建一个a的表 create table mysql12.hivedebug0908.a(a string); · 向表a中插入一条记录 insert into mysql12.hivedebug0908.a values('test'); · 查询表a中数据 select * from mysql12.hivedebug0908.a; 3.2.6 HBase数据源支持通过DLH在HBase实例上查询和创建表。用户可以在HBase连接器中创建表,并映射到HBase Cluster中已有的表(或创建新表),支持insert、select和delete操作。HBase连接器维护着一个元存储,用于持久化HBase元数据,目前此元存储采用MySQL。 · 基本上支持所有的SQL语句,包括创建、查询、删除模式、添加、删除、修改表、插入数据、删除行等。但不支持对表进行重命名、不支持删除列。 · HBase没有提供接口来检索表的元数据,所以show tables只能显示用户已与HBase数据源建立关联的表。 · 支持10种数据类型:VARCHAR、TINYINT、SMALLINT、INTEGER、BIGINT、DOUBLE、BOOLEAN、TIME、DATE和TIMESTAMP。 · HBase连接器支持下推大部分运算符,比如基于RowKey的点查询、基于RowKey的范围查询等。此外,还支持通过谓词条件进行下推,比如:=、>=、>、 表属性 > dlh-site.xml > 默认值。 表3-13 dlh-site属性说明 属性 描述 hoodie.upsert.shuffle.parallelism upsert并发数,设置为10,可通过表属性或会话设置覆盖 hoodie.insert.shuffle.parallelism insert并发数,设置为10,可通过表属性或会话设置覆盖 hoodie.blukinsert.shuffle.parallelism blukinsert并发数,设置为10,可通过表属性或会话设置覆盖 hoodie.delete.shuffle.parallelism delete并发数,设置为10,可通过表属性或会话设置覆盖 hoodie.datasource.write.hive_style_partitioning 默认true hoodie.datasource.write.partitionpath.urlencode 默认true
表3-14 表属性说明 属性 描述 hoodie.datasource.write.table.type Hudi表类型,可以设置COPY_ON_WRITE或MERGE_ON_READ两种类型,缺失时默认为COPY_ON_WRITE hoodie.datasource.write.recordkey.field Hudi表记录主键,value值需要在表的列中 hoodie.datasource.write.precombine.field 数据集插入前合并记录的判断字段,在recode字段值相当的情况下,默认取该字段数值大的记录进行插入 hoodie.table.name Hudi表名,缺省时默认取Hive表名 hoodie.mor.isRealTimeInputFormat hoodie.datasource.write.table.type=MERGE_ON_READ时,该属性设置为true,创建mor类型的rt表,否则创建ro表
表3-15 会话属性说明 属性 描述 hoodie.upsert.shuffle.parallelism upsert并发数,设置为10,可通过表属性或会话设置覆盖 hoodie.insert.shuffle.parallelism insert并发数,设置为10,可通过表属性或会话设置覆盖 hoodie.blukinsert.shuffle.parallelism blukinsert并发数,设置为10,可通过表属性或会话设置覆盖 hoodie.delete.shuffle.parallelism delete并发数,设置为10,可通过表属性或会话设置覆盖 hoodie.datasource.write.operation Hudi表插入方式,取值为upsert、insert、blukinsert、delete等,缺失时默认为upsert。删除操作时,该值默认为delete 3.3.5 SQL操作 1. 建表 遵循Hive原生建表语句,通过扩展stored类型和使用表属性来控制Hudi表的创建特性。 功能描述: · 支持分区表、内部表、外部表语法。 · 建表语句可缺失内置字段,但是建表后查看表可以看到Hudi内置字段。建表语句如果包含内置字段,类型必须为String类型。 · 建表时需首先明确表明stored as hudi来标明表为Hudi表。 · 建表时可以通过表属性hoodie.table.name设置hoodie表名,可以与Hive表使用不同的名称。 · 建表时可以通过表属性hoodie.datasource.write.table.type设置Hudi表存储类型,默认缺失时为COPY_ON_WRITE类型。 · 可以通过设置表属性hoodie.datasource.write.table.type=MERGE_ON_READ和hoodie.mor.isRealTimeInputFormat=true 创建hudi存储类型为MERGE_ON_READ的rt视图表。 · MERGE_ON_READ存储类型的Hudi表,如果需要通过Hive SQL进行读优化和近实时视图查询,则需要同时建立2个Hive表同时读取同一套Hudi数据,此时需要保证Hive建表语句中只有hoodie.mor.isRealTimeInputFormat=true有差异,且hoodie.table.name必须配置。 · 建表时表属性hoodie.datasource.write.recordkey.field与hoodie.datasource.write.precombine.field的值必须在表字段中,且不是hudi内置字段。
语法: CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type [COMMENT col_comment], ... [constraint_specification])] [COMMENT table_comment] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] STORED AS HUDI [LOCATION hdfs_path] [TBLPROPERTIES (property_name=property_value, ...)] [AS select_statement];
表3-16 参数说明 表属性 描述 hoodie.datasource.write.table.type Hudi表类型,可以设置COPY_ON_WRITE或MERGE_ON_READ两种类型,缺失时默认为COPY_ON_WRITE hoodie.datasource.write.recordkey.field Hudi表记录主键,value值需要在表的列中,支持多主键,以逗号分隔 hoodie.datasource.write.precombine.field 数据集插入前合并记录的判断字段,在recode字段值相当的情况下,默认取该字段数值大的记录进行插入,value值需要在表的列中 hoodie.table.name Hudi表名,缺省时默认取Hive表名 hoodie.mor.isRealTimeInputFormat hoodie.datasource.write.table.type=MERGE_ON_READ时,该属性设置为true,创建mor类型的rt表,否则创建ro表
示例: · 创建内部表 CREATE TABLE IF NOT EXISTS `hudi_inner`( `fare` bigint, `rider` string, `ts` double, `uuid` string) STORED AS hudi TBLPROPERTIES( 'hoodie.datasource.write.recordkey.field'= 'uuid', 'hoodie.datasource.write.precombine.field'='ts'); · 创建外部表 CREATE EXTERNAL TABLE IF NOT EXISTS `hudi_outer`( `fare` bigint, `rider` string, `ts` double, `uuid` string) STORED AS hudi TBLPROPERTIES( 'hoodie.datasource.write.recordkey.field'= 'uuid', 'hoodie.datasource.write.precombine.field'='ts'); · 创建分区表 CREATE EXTERNAL TABLE IF NOT EXISTS `hudi_part`( `fare` bigint, `rider` string, `ts` double, `uuid` string) PARTITIONED BY (`driver` string,`name` string) STORED AS hudi TBLPROPERTIES( 'hoodie.datasource.write.recordkey.field'= 'uuid', 'hoodie.datasource.write.precombine.field'='ts'); · 缺失表属性hoodie.datasource.write.table.type 创建cow类型表 CREATE EXTERNAL TABLE IF NOT EXISTS `hudi_trips_cow`( `fare` bigint, `rider` string, `ts` double, `uuid` string) PARTITIONED BY (`driver` string,`name` string) STORED AS hudi LOCATION '/tmp/hudi_trips_cow' TBLPROPERTIES( 'hoodie.datasource.write.recordkey.field'= 'uuid', 'hoodie.datasource.write.precombine.field'='ts'); · 指定表属性'hoodie.datasource.write.table.type' = 'COPY_ON_WRITE',创建cow类型表 CREATE EXTERNAL TABLE IF NOT EXISTS `hudi_trips_cow`( `fare` bigint, `rider` string, `ts` double, `uuid` string) PARTITIONED BY (`driver` string,`name` string) STORED AS hudi LOCATION '/tmp/hudi_trips_cow' TBLPROPERTIES( 'hoodie.datasource.write.table.type' = 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field'= 'uuid', 'hoodie.datasource.write.precombine.field'='ts'); · 指定表属性'hoodie.table.name' = xxx',创建cow类型表 CREATE EXTERNAL TABLE IF NOT EXISTS `hudi_trips_cow`( `fare` bigint, `rider` string, `ts` double, `uuid` string) PARTITIONED BY (`driver` string,`name` string) STORED AS hudi LOCATION '/tmp/hudi_trips_cow' TBLPROPERTIES( 'hoodie.table.name' = 'hudi_trips_cow1 ', 'hoodie.datasource.write.table.type' = 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field'= 'uuid', 'hoodie.datasource.write.precombine.field'='ts'); · 创建mor的ro表 CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`hudi_trips_mor`( `fare` bigint, `rider` string, `ts` double, `uuid` string) PARTITIONED BY (`driver` string) STORED AS hudi LOCATION '/tmp/hudi_trips_mor' TBLPROPERTIES ( 'hoodie.table.name' = 'hudi_trips_mor ', 'hoodie.datasource.write.table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field'= 'uuid', 'hoodie.datasource.write.precombine.field'='ts'); · 创建mor的rt表 CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`hudi_trips_mor_rt`( `fare` bigint, `rider` string, `ts` double, `uuid` string) PARTITIONED BY (`driver` string) STORED AS hudi LOCATION '/tmp/hudi_trips_mor' TBLPROPERTIES( 'hoodie.table.name' = 'hudi_trips_mor ', 'hoodie.mor.isRealTimeInputFormat' = 'true', 'hoodie.datasource.write.table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field'= 'uuid', 'hoodie.datasource.write.precombine.field'='ts'); · 创建多主键表 CREATE TABLE IF NOT EXISTS `hudi_inner_multi`( `fare` bigint, `rider` string, `ts` double, `uuid` string) STORED AS hudi TBLPROPERTIES( 'hoodie.datasource.write.recordkey.field'= 'uuid,fare', 'hoodie.datasource.write.precombine.field'='ts'); · 复制表结构 create table `t1` like `hudi_trips_cow`; 2. 插入 功能说明: · 支持insert into values语法 · 支持insert into select 语法 · 支持insert overwrite values 语法 · 支持insert overwrite select 语法 · 支持分区表动静结合使用 · 可以通过设置hoodie.datasource.write.operation定义hoodie的写操作方式,默认为upsert · 支持通过hoodie属性调整并发数等
表3-17 参数说明 表属性 描述 hoodie.datasource.write.operation Hudi表插入方式,取值为upsert、insert、blukinsert、delete等,缺失时默认为upsert。删除操作时,该值默认为delete hoodie.upsert.shuffle.parallelism upsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖 hoodie.insert.shuffle.parallelism insert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖 hoodie.blukinsert.shuffle.parallelism blukinsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖
示例: · 插入表数据 insert into hudi_trips_cow partition (driver='driver-213',name) values(58,'sdf',1.8,25,'nihao'); · 使用会话重置属性,插入表数据 set hoodie.datasource.write.operation=upsert; set hoodie.upsert.shuffle.parallelism=2; insert into hudi_trips_cow partition (driver='driver-213',name) values(58,'sdf',1.8,25,'nihao'); · 将查询结果插入表中 insert into table hudi_trips_cow partition (driver,name) select fare,rider,ts,uuid,'driver-214',name from hudi_trips_cow_test2; · 覆盖插入 Insert overwrite table hudi_trips_cow partition (driver='driver-213',name) values(56,'sdf',1.8,25,'sdf'),(57,'sdf',1.9,25,'nihao'); · 指定字段插入 insert into hudi_inner (uuid,ts) values('uid1', 22.0); · 插入多主键表 insert into hudi_inner_multi(uuid,ts,fare) values('uid1', 22.0, 100); 3. 更新 功能说明: · 遵循Hive标准update语法,需要手动开启DLH的事务特性(DLH默认是关闭Hive的事务特性的),开启方式如下: ¡ 设置dlh-site中的hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager ¡ 设置dlh-site中的hive.support.concurrency=true 配置修改完成后,重启DLH组件。
表3-18 参数说明 表属性 描述 hoodie.datasource.write.operation Hudi表插入方式,取值为upsert、insert、blukinsert、delete等,缺失时默认为upsert。删除操作时,该值默认为delete hoodie.upsert.shuffle.parallelism upsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖 hoodie.insert.shuffle.parallelism insert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖 hoodie.blukinsert.shuffle.parallelism blukinsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖
示例: · 更新表数据 update hudi_trips_cow set rider='test' where uuid='25'; · 使用UDF函数更新表数据 update hudi_trips_cow set fare=length('test') where uuid='25'; · 使用会话重置属性,更新表数据 set hoodie.datasource.write.operation=upsert; set hoodie.upsert.shuffle.parallelism=3; update hudi_trips_cow set fare=length('test') where uuid='25'; 4. 删除 功能说明: · 遵循Hive标准delete语法,需要手动开启DLH的事务特性(DLH默认是关闭Hive的事务特性的),开启方式如下: ¡ 设置dlh-site中的hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager ¡ 设置dlh-site中的hive.support.concurrency=true 配置修改完成后,重启DLH组件。
表3-19 参数说明 表属性 描述 hoodie.datasource.write.operation Hudi表插入方式,取值为upsert、insert、blukinsert、delete等,缺失时默认为upsert。删除操作时,该值默认为delete hoodie.upsert.shuffle.parallelism upsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖 hoodie.insert.shuffle.parallelism insert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖 hoodie.blukinsert.shuffle.parallelism blukinsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖
示例: · 删除表数据 delete from hudi_trips_cow where uuid='25'; · 重置删除相关属性,删除表数据 set hoodie.delete.shuffle.parallelism=3; delete from hudi_trips_cow where uuid='25'; 5. 新增列 功能说明: · cow表支持,mor表不支持 · 遵循Hive标准add COLUMNS语法
示例: · 表hudi_trips_cow新增列名为test,类型String alter table hudi_trips_cow add COLUMNS (`test` String); 6. 新增分区 功能说明: · 遵循Hive标准add partition语法
示例: · 新增表hudi_trips_cow分区driver='driver-213',name="sdf" alter table hudi_trips_cow add partition(driver='driver-213',name="sdf") location '/tmp/hudi_trips_cow/driver=driver-213/name=sdf'; 7. 删除分区 功能说明: · 删除分区要求Hudi表是分区表,且是外部表。当Hudi表是分区表,且是内部表时,删除分区不支持
示例: · 删除表hudi_trips_cow分区driver='driver-213',name="sdf" alter table hudi_trips_cow drop partition(driver='driver-213',name="sdf"); 8. 表重命名 功能说明: · 只修改Hive表的名字,Hudi表名不变
示例: · 重命名表名hudi_trips_cow为hudi_trips_cow1 alter table hudi_trips_cow rename to hudi_trips_cow1; 9. 近实时查询 功能说明: · 支持COPY_ON_WRITE存储类型的Hudi表 · 支持MERGE_ON_READ存储类型的Hudi表,且Hive表必须是rt类型表
表3-20 参数说明 表属性 描述 hoodie.hudi_trips_cow.consume.mode 查询方式:SNAPSHOT、INCREMENTAL。近实时查询只能配置为SNAPSHOT
示例: · 查询cow表 set hoodie.hudi_trips_cow.consume.mode=SNAPSHOT; select * from hudi_trips_cow; · 查询mor表(rt表) set hoodie.hudi_trips_mor.consume.mode=SNAPSHOT; select * from hudi_trips_mor_rt; 【说明】hudi_trips_mor为Hudi表名,hudi_trips_mor_rt为Hive表名,二者不一定相同,Hudi表名可以通过“show create table +Hive表名”进行查看。 10. 增量查询 功能说明: · 查询前需要设置消费模式为INCREMENTAL · 支持通过设置开始和增量值进行增量查询
表3-21 参数说明 表属性 描述 hoodie.hudi_trips_cow.consume.mode 查询方式:SNAPSHOT、INCREMENTAL。增量查询只能配置为INCREMENTAL hoodie.hudi_trips_cow.consume.start.timestamp INCREMENTAL模式下使用 hoodie.hudi_trips_cow.consume.max.commits INCREMENTAL模式下使用
示例: 前提条件:查询时需确保消费模式为INCREMENTAL类型。 · 查询cow表 set hoodie.hudi_trips_cow.consume.mode=INCREMENTAL; set hoodie.hudi_trips_cow.consume.start.timestamp=20201019165434; set hoodie.hudi_trips_cow.consume.max.commits=1; select * from hudi_trips_cow where `_hoodie_commit_time` > '20201019165434'; · 查询mor表(rt表) set hoodie.hudi_trips_mor.consume.mode=INCREMENTAL; set hoodie.hudi_trips_mor.consume.start.timestamp=20201019165434; set hoodie.hudi_trips_mor.consume.max.commits=1; select * from hudi_trips_mor_rt where `_hoodie_commit_time` > '20201019165434'; 11. 读优化查询 功能说明: · 查询前需要设置消费模式为SNAPSHOT · 读优化查询只读取parquet数据。对于cow类型表,读优化查询就是实时查询;但是对于mor表,读优化查询只能查询到插入的数据,对于更新参数的log数据则无法查询。
表3-22 参数说明 表属性 描述 hoodie.hudi_trips_cow.consume.mode 查询方式:SNAPSHOT、INCREMENTAL。读优化查询只能配置为SNAPSHOT
示例: · 查询mor表(ro表) set hoodie.hudi_trips_mor.consume.mode=SNAPSHOT; select * from hudi_trips_mor; 3.4 流SQL操作如果集群手动开启kerberos认证,需要在flink自定义配置sql-gateway-flink-conf新增配置项目security.kerberos.login.keytab、security.kerberos.login.principal,具体值为集群超级用户对应的keytab和principal,如:/etc/security/keytabs/admin123.keytab、[email protected]。 3.4.1 保留关键字 DLH流SQL兼容Flink SQL能力,虽然Flink SQL 特性并未完全实现,但是一些字符串及其组合已经被预留为关键字以备未来使用,如表3-23所示。如果您希望使用以下字符串作为字段名,请在使用时使用反引号将该字段名包起来(示例`value`, `count` )。 表3-23 预留字符串 A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, RAW, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRING, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE 3.4.2 流SQL UDF 当前版本中,DLH流SQL暂不支持自定义UDF,DLH流SQL的内置UDF兼容Flink1.12版本中的内置UDF,Flink1.12的内置UDF请参见:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html。 3.4.3 创建表 DLH流表的建表语法完全兼容开源FlinkSQL的建表语法。但当前版本中,DLH流表连接器的数据源端仅支持Kafka,数据目的端仅支持Kafka、Mysql、PostgreSQL、Elasticsearch、Hive。其中,Kafka连接器支持的数据格式为csv或json,Hive连接器支持的数据格式为orc、parquet或textfile, Elasticsearch支持的数据格式为json,Mysql和PostgreSQL连接器不区分数据格式。 当前版本中,DLH流表的数据源端支持事件时间类型表、处理时间类型表。 1. DLH流表的建表语句图3-3 DLH流表的建表语句中需设置的基本属性值 属性 默认值 是否必选 说明 connector 无 是 连接器类型。在当前版本中,DLH仅支持kafka、elasticsearch-7、jdbc topic 无 是 连接器所连接的Kafka topic properties.group.id 无 是 消费Kafka数据时指定的groupID。作为source表时此属性必填,作为sink表时此属性可不填 scan.startup.mode group-offsets 否 Kafka消费端offset配置。可选值:latest-offset、earliest-offset、group-offsets properties.bootstrap.servers 无 是 连接Kafka必需的boostrap.server配置 format 无 是 数据流格式。在当前版本中,DLH仅支持csv、json csv.field-delimiter 无 是 数据流字符串分隔符。csv格式下必填 properties.security.protocol 无 否 Kerberos环境下,连接Kafka所需的安全协议,值为:SASL_PLAINTEXT properties.sasl.mechanism 无 否 Kerberos环境下,连接Kafka所需的认证机制,值为:GSSAPI properties.sasl.kerberos.service.name 无 否 Kerberos环境下,连接Kafka所需的认证服务名称,值为:kafka 2. 示例 场景1:在数据源端创建表 · 创建CSV表 ¡ Kafka(csv)连接器事件时间下的建表语句: CREATE TABLE csv_source( user_id INT, product STRING, amount INT, ts TIMESTAMP(3), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'csv_source', 'properties.group.id' = 'flink1', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'format' = 'csv', 'csv.field-delimiter' = '|' ); ¡ Kafka(csv)连接器处理时间下的建表语句: 注意:ts为额外的处理时间字段,数据源中如果存在会被覆盖 CREATE TABLE csv_source( user_id INT, product STRING, amount INT, ts as proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'csv_source', 'properties.group.id' = 'flink1', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'format' = 'csv', 'csv.field-delimiter' = '|' ); · 创建json表 ¡ Kafka(json)连接器事件时间下的建表语句: CREATE TABLE json_source( user_id INT, product STRING, amount INT, ts TIMESTAMP(3), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'json_source', 'properties.group.id' = 'flink_json', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'format' = 'json' ); ¡ Kafka(json)连接器处理时间下的建表语句: 【注意】ts为额外的处理时间字段,数据源中如果存在会被覆盖。 CREATE TABLE json_source( user_id INT, product STRING, amount INT, ts as proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'json_source', 'properties.group.id' = 'flink_json', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'format' = 'json' );
场景2:在数据目的端创建表 · Kafka(csv)连接器的建表语句: CREATE TABLE csv_source( user_id INT, product STRING, amount INT ) WITH ( 'connector' = 'kafka', 'topic' = 'csv_source', 'properties.group.id' = 'flink1', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'format' = 'csv', 'csv.field-delimiter' = '|' ); · Kafka(json)连接器的建表语句: CREATE TABLE json_source( user_id INT, product STRING, amount INT ) WITH ( 'connector' = 'kafka', 'topic' = 'json_source', 'properties.group.id' = 'flink_json', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'format' = 'json' ); · Elasticsearch(json)连接器的建表语句: CREATE TABLE sk_es ( user_id INT, product STRING, amount INT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://qcj36:9200', 'index' = 'users' ); · Mysql连接器的建表语句: CREATE TABLE sk_jdbc_mysql ( user_id INT, product STRING, amount INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.121.65.53:53306/mydatabase', 'table-name' = 'users', 'username' = 'cluster', 'password' = 'CloudOS5#DE3@Cluster' ); · PostgreSQL连接器的建表语句: CREATE TABLE sk_jdbc_postgresql ( user_id INT, product STRING, amount INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://10.121.47.114:5432/mydatabase', 'table-name' = 'users', 'username' = 'postgres', 'password' = 'postgres' ); · Hive(textfile)连接器的建表语句: CREATE TABLE hive_txt( user_id INT, product STRING, amount INT )PARTITIONED BY(dt STRING, hr STRING) stored as TEXTFILE TBLPROPERTIES ( 'sink.rolling-policy.file-size'='1MB', 'sink.rolling-policy.rollover-interval'='1 min', 'sink.rolling-policy.check-interval'='1 s', 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file' ); · Hive(orc)连接器的建表语句: CREATE TABLE hive_orc( user_id INT, product STRING, amount INT )PARTITIONED BY(dt STRING, hr STRING) stored as orc TBLPROPERTIES ( 'sink.rolling-policy.file-size'='1MB', 'sink.rolling-policy.rollover-interval'='1 min', 'sink.rolling-policy.check-interval'='1 s', 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file' ); · Hive(parquet)连接器的建表语句: CREATE TABLE hive_parquet( user_id INT, product STRING, amount INT )PARTITIONED BY(dt STRING, hr STRING) stored as parquet TBLPROPERTIES ( 'sink.rolling-policy.file-size'='1MB', 'sink.rolling-policy.rollover-interval'='1 min', 'sink.rolling-policy.check-interval'='1 s', 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file' ); 3.4.4 数据插入 1. 插入语句DLH流表的数据插入语句兼容FlinkSQL的insert into……select……语句: INSERT INTO tablename1(col1,col2,col3,…)SELECT col1,col2,col3,…From tablename2 【注意】 · 非Kerberos环境下,可直接执行DLH流表的数据插入语句。 · Kerberos环境下,执行DLH流表的数据插入语句,需满足以下条件: ¡ 准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。 ¡ 确保在Flink的所有节点上,都存在上述用户对应的keytab文件,且需要保证该用户为该keytab文件的所有者。 ¡ 修改Flink的配置:security.kerberos.login.keytab= ;security.kerberos.login.principal= ,以上两个配置项的值修改完成并保存后,重启Flink。 2. 窗口类型DLH支持FlinkSQL所有窗口类型:tumble、hop、session、over等。 3. 示例(1) 示例1:非Kerberos环境下进行DLH流表的数据插入(CSV),步骤如下: a. 创建Kafka topic:csv_source和csv_sink ./kafka-topics.sh --zookeeper zyf07:2181 --topic csv_source --create --partitions 3 --replication-factor 2 ./kafka-topics.sh --zookeeper zyf07:2181 --topic csv_sink --create --partitions 3 --replication-factor 2 b. 连接DLH beeline并创建数据源表 CREATE TABLE csv_source( user_id INT, product STRING, amount INT, ts TIMESTAMP(3), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'csv_source', 'properties.group.id' = 'flink_csv', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'format' = 'csv', 'csv.field-delimiter' = '|' ); c. 创建数据输出表 CREATE TABLE csv_sink ( user_id INT, product STRING, amount INT, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'csv_sink', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'format' = 'csv' ); d. 提交流任务 INSERT INTO csv_sink(user_id, product, amount, ts) SELECT user_id, product, amount, ts FROM csv_source; e. 向Kafka的topic:csv_source生产数据 【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。 可以提前准备数据,然后将数据写入到一个文件(示例csv_input.txt)。 表3-24 示例数据 1|beer|3|2019-12-12 00:00:01 1|diaper|4|2019-12-12 00:00:02 2|pen|3|2019-12-12 00:00:04 2|rubber|3|2019-12-12 00:00:06 3|rubber|2|2019-12-12 00:00:05 4|beer|1|2019-12-12 00:00:08
向Kafka的topic:csv_source生产数据,命令如下: ./kafka-console-producer.sh --broker-list zyf07:6667 --topic csv_source < csv_input.txt f. 消费Kafka的topic:csv_sink数据 ./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic csv_sink --from-beginning g. 测试结束,停止流任务 yarn application –list 获取对应的applicationId yarn application --kill h. 删除已创建的csv_source、csv_sink表 drop table csv_source; drop table csv_sink; 【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。
(2) 示例2:非Kerberos环境下进行DLH流表的数据插入(JSON),步骤如下: a. 创建Kafka topic:json_source和json_sink ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2 ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink --create --partitions 3 --replication-factor 2 b. 连接DLH beeline并创建数据源表 CREATE TABLE json_source( user_id INT, product STRING, amount INT, ts TIMESTAMP(3), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'json_source', 'properties.group.id' = 'flink_json', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'format' = 'json' ); c. 创建数据输出表 CREATE TABLE json_sink (user_id INT,product STRING,amount INT,ts TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'json_sink', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'format' = 'json' ); d. 提交流任务 INSERT INTO json_sink(user_id, product, amount, ts) SELECT user_id, product, amount, ts FROM json_source; e. 向Kafka的topic:json_source生产数据 【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。 可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。 表3-25 示例数据 {"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"} {"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"} {"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"} {"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"} {"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"} {"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}
向Kafka的topic:json_source生产数据,命令如下: ./kafka-console-producer.sh --broker-list zyf07:6667 --topic json_source < json_input.txt f. 消费Kafka的topic:json_sink数据 ./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink --from-beginning g. 测试结束,停止流任务 yarn application –list 获取对应的applicationId yarn application --kill h. 删除已创建的json_source、json_sink表 drop table json_source; drop table json_sink; 【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。
(3) 示例3:Kerberos环境下进行DLH流表的数据插入(CSV),步骤如下: a. 创建Kafka topic:csv_source和csv_sink ./kafka-topics.sh --zookeeper zyf07:2181 --topic csv_source --create --partitions 3 --replication-factor 2 ./kafka-topics.sh --zookeeper zyf07:2181 --topic csv_sink --create --partitions 3 --replication-factor 2 b. 连接DLH beeline并创建数据源表 CREATE TABLE csv_source( user_id INT, product STRING, amount INT, ts TIMESTAMP(3), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'csv_source', 'properties.group.id' = 'flink_csv', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'csv', 'csv.field-delimiter' = '|' ); c. 创建数据输出表 CREATE TABLE csv_sink (user_id INT,product STRING,amount INT,ts TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'csv_sink', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'csv' ); d. 提交流任务 前置条件: - 准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。 - 确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。 - 修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。 以上前置条件全部满足之后,才可以提交流任务SQL: INSERT INTO csv_sink(user_id, product, amount, ts) SELECT user_id, product, amount, ts FROM csv_source; e. 向Kafka的topic:csv_source生产数据 【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。 可以提前准备数据,然后将数据写入到一个文件(示例csv_input.txt)。 表3-26 示例数据 1|beer|3|2019-12-12 00:00:01 1|diaper|4|2019-12-12 00:00:02 2|pen|3|2019-12-12 00:00:04 2|rubber|3|2019-12-12 00:00:06 3|rubber|2|2019-12-12 00:00:05 4|beer|1|2019-12-12 00:00:08
向Kafka的topic:csv_source生产数据,命令如下: ./kafka-console-producer.sh --broker-list zyf07:6667 --producer-property security.protocol=SASL_PLAINTEXT --topic csv_source < csv_input.txt f. 消费Kafka的topic:csv_sink数据 ./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic csv_sink --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning g. 测试结束,停止流任务 yarn application –list 获取对应的applicationId yarn application --kill h. 删除已创建的csv_source、csv_sink表 drop table csv_source; drop table csv_sink; 【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。
(4) 示例4:Kerberos环境下进行DLH流表的数据插入(JSON),步骤如下: a. 创建Kafka topic:json_source和json_sink ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2 ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink --create --partitions 3 --replication-factor 2 b. 连接DLH beeline并创建数据源表 CREATE TABLE json_source( user_id INT, product STRING, amount INT, ts TIMESTAMP(3), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'json_source', 'properties.group.id' = 'flink_json', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'json' ); c. 创建数据输出表 CREATE TABLE json_sink (user_id INT,product STRING,amount INT,ts TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'json_sink', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'json' ); d. 提交流任务 前置条件: - 准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。 - 确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。 - 修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。 以上前置条件全部满足之后,才可以提交流任务SQL: INSERT INTO json_sink(user_id, product, amount, ts) SELECT user_id, product, amount, ts FROM json_source; e. 向Kafka的topic:json_source生产数据 【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。 可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。 表3-27 示例数据 {"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"} {"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"} {"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"} {"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"} {"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"} {"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}
向Kafka的topic:json_source生产数据,命令如下: ./kafka-console-producer.sh --broker-list zyf07:6667 --topic json_source --producer-property security.protocol=SASL_PLAINTEXT < json_input.txt f. 消费Kafka的topic:json_sink数据 ./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning g. 测试结束,停止流任务 yarn application –list 获取对应的applicationId yarn application --kill h. 删除已创建的json_source、json_sink表 drop table json_source; drop table json_sink; 【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。
(5) 示例5:Kerberos环境下进行DLH流表的数据插入(tumble窗口),步骤如下: a. 创建Kafka topic:json_source和json_sink_tumble ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2 ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink_tumble --create --partitions 3 --replication-factor 2 b. 连接DLH beeline并创建数据源表 CREATE TABLE json_source( user_id INT, product STRING, amount INT, ts TIMESTAMP(3), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'json_source', 'properties.group.id' = 'flink_json', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'json' ); c. 创建数据输出表 CREATE TABLE json_sink_tumble ( cnt BIGINT, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'json_sink_tumble', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'json' ); d. 提交流任务 前置条件: - 准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。 - 确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。 - 修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。 以上前置条件全部满足之后,才可以提交流任务SQL: INSERT INTO json_sink_tumble SELECT count(product), tumble_start(ts, interval '1' minute) FROM json_source group by tumble(ts, interval '1' minute); e. 向Kafka的topic:json_source生产数据 【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。 可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。 表3-28 示例数据 {"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"} {"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"} {"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"} {"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"} {"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"} {"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}
向Kafka的topic:json_source生产数据,命令如下: ./kafka-console-producer.sh --broker-list zyf07:6667 --topic json_source --producer-property security.protocol=SASL_PLAINTEXT < json_input.txt f. 消费Kafka的topic:json_sink_tumble数据 ./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink_tumble --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning g. 测试结束,停止流任务 yarn application –list 获取对应的applicationId yarn application --kill h. 删除已创建的json_source、json_sink_tumble表 drop table json_source; drop table json_sink_tumble; 【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。
(6) 示例6:Kerberos环境下进行DLH流表的数据插入(hop窗口),步骤如下: a. 创建Kafka topic:json_source和json_sink_hop ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2 ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink_hop --create --partitions 3 --replication-factor 2 b. 连接DLH beeline并创建数据源表 CREATE TABLE json_source( user_id INT, product STRING, amount INT, ts TIMESTAMP(3), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'json_source', 'properties.group.id' = 'flink_json', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'json' ); c. 创建数据输出表 CREATE TABLE json_sink_hop ( win_start TIMESTAMP(3), win_end TIMESTAMP(3), win_rowtime TIMESTAMP(3), user_id INT, product_cnt bigint ) WITH ( 'connector' = 'kafka', 'topic' = 'json_sink_hop', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'json' ); d. 提交流任务 前置条件: - 准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。 - 确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。 - 修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。 以上前置条件全部满足之后,才可以提交流任务SQL: INSERT INTO json_sink_hop SELECT hop_start(ts, interval '2' second, interval '5' second),hop_end(ts, interval '2' second, interval '5' second), hop_rowtime(ts, interval '2' second, interval '5' second),user_id,count(product) FROM json_source group by hop(ts, interval '2' second, interval '5' second),user_id; e. 向Kafka的topic:json_source生产数据 【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。 可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。 表3-29 示例数据 {"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"} {"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"} {"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"} {"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"} {"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"} {"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}
向Kafka的topic:json_source生产数据,命令如下: ./kafka-console-producer.sh --broker-list zyf07:6667 --topic json_source --producer-property security.protocol=SASL_PLAINTEXT < json_input.txt f. 消费Kafka的topic:json_sink_hop数据 ./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink_hop --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning g. 测试结束,停止流任务 yarn application –list 获取对应的applicationId yarn application --kill h. 删除已创建的json_source、json_sink_hop表 drop table json_source; drop table json_sink_hop; 【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。
(7) 示例7:Kerberos环境下进行DLH流表的数据插入(session窗口),步骤如下: a. 创建Kafka topic:json_source和json_sink_session ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2 ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink_session --create --partitions 3 --replication-factor 2 b. 连接DLH beeline并创建数据源表 CREATE TABLE json_source( user_id INT, product STRING, amount INT, ts TIMESTAMP(3), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'json_source', 'properties.group.id' = 'flink_json', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'json' ); c. 创建数据输出表 CREATE TABLE json_sink_session ( session_start TIMESTAMP(3), session_end TIMESTAMP(3), session_rowtime TIMESTAMP(3), user_id INT, product_cnt bigint ) WITH ( 'connector' = 'kafka', 'topic' = 'json_sink_session', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'json' ); d. 提交流任务 前置条件: - 准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。 - 确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。 - 修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。 以上前置条件全部满足之后,才可以提交流任务SQL: INSERT INTO json_sink_session SELECT session_start(ts, interval '5' second),session_end(ts, interval '5' second), session_rowtime(ts, interval '5' second),user_id,count(product) FROM json_source group by session(ts, interval '5' second),user_id; e. 向Kafka的topic json_source生产数据 【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。 可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。 表3-30 示例数据 {"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"} {"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"} {"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"} {"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"} {"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"} {"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}
向Kafka的topic json_source生产数据,命令如下: ./kafka-console-producer.sh --broker-list zyf07:6667 --topic json_source --producer-property security.protocol=SASL_PLAINTEXT < json_input.txt f. 消费Kafka的topic:json_sink_session数据 ./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink_session --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning g. 测试结束,停止流任务 yarn application –list 获取对应的applicationId yarn application --kill h. 删除已创建的json_source、json_sink_session表 drop table json_source; drop table json_sink_session; 【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。
(8) 示例8:Kerberos环境下进行DLH流表的数据插入(over窗口),步骤如下: a. 创建Kafka topic:json_source和json_sink_over ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2 ./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink_over --create --partitions 3 --replication-factor 2 b. 连接DLH beeline并创建数据源表 CREATE TABLE json_source( user_id INT, product STRING, amount INT, ts TIMESTAMP(3), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'json_source', 'properties.group.id' = 'flink_json', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'json' ); c. 创建数据输出表 CREATE TABLE json_sink_over ( user_id INT, product STRING, amount INT, maxAmount INT ) WITH ( 'connector' = 'kafka', 'topic' = 'json_sink_over', 'properties.group.id' = 'flink2', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '10.121.65.7:6667', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'format' = 'json' ); d. 提交流任务 前置条件: - 准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。 - 确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。 - 修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。 以上前置条件全部满足之后,才可以提交流任务SQL: insert into json_sink_over select user_id,product,amount,max(amount) over(partition by product order by ts rows between 2 preceding and current row) as maxAmount from json_source; e. 向Kafka的topic:json_source生产数据 【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。 可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。 表3-31 示例数据 {"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"} {"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"} {"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"} {"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"} {"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"} {"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}
向Kafka的topic:json_source生产数据,命令如下: ./kafka-console-producer.sh --broker-list zyf07:6667 --topic json_source --producer-property security.protocol=SASL_PLAINTEXT < json_input.txt f. 消费Kafka的topic:json_sink_over数据 ./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink_over --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning g. 测试结束,停止流任务 yarn application –list 获取对应的applicationId yarn application --kill h. 删除已创建的json_source、json_sink_over表 drop table json_source; drop table json_sink_over; 【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。 3.4.5 数据查询· 流计算任务为长驻任务。在查询SQL执行过程中,如果出现Ctrl+C或其他操作导致查询异常中止,Flink查询任务也无法自动停止。在此情况下,若需要终止Flink查询任务,则需要根据查询SQL的开始执行时间在YARN上查询对应的Flink任务并手动终止。 · 在当前版本中,DLH会话中窗口查询结果不在控制台直接打印,实际计算结果需在对应的Flink作业监控页面进行查询。实际业务场景下,多数是通过insert into select from 语句来执行窗口操作。 1. 查询语法 DLH流表的数据查询语句兼容FlinkSQL的streaming模式下的查询语句。 · 非Kerberos环境下,可直接执行DLH流表的数据查询语句。 · Kerberos环境下,执行DLH流表的数据查询语句,需满足以下条件: ¡ 准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。 ¡ 确保在Flink的所有节点上,都存在上述用户对应的keytab文件,且需要保证该用户为该keytab文件的所有者。 ¡ 修改Flink的配置:security.kerberos.login.keytab= ;security.kerberos.login.principal= ,以上两个配置项的值修改完成并保存后,重启Flink。 2. 示例(1) 创建Kafka topic:test001,用于数据源产生(若开启kerberos则需追加--command-config kerberos参数) ./kafka-topics.sh --create --bootstrap-server rzt237:6667,rzt238:6667,rzt239:6667 --replication-factor 1 --partitions 1 --topic test001 [--command-config kerberos] (2) 创建流表flink55 【说明】DLH流表的建表语句中需设置的基本属性值请参考3.4.3章节。 CREATE TABLE flink55 ( user_id INT, product STRING, amount INT, ts TIMESTAMP(3), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'test001', 'properties.group.id' = 'flink22', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '10.121.64.238:6667', 'format' = 'csv', 'csv.field-delimiter' = '|' ); (3) 向Kafka的topic:test001中生产数据 【注意】数据格式务必与建表语句中format.type和formmat.field-delimiter属性保持一致。 表3-32 测试数据 1|diaper|4|2019-12-12 00:00:02 2|pen|3|2019-12-12 00:00:04 2|rubber|3|2019-12-12 00:00:06 3|rubber|2|2019-12-12 00:00:05 4|beer|1|2019-12-12 00:00:08 1|diaper|4|2019-12-12 00:00:02 2|pen|3|2019-12-12 00:00:04 2|rubber|3|2019-12-12 00:00:06 3|rubber|2|2019-12-12 00:00:05
(4) 进行流表数据查询 ¡ select语句 执行SQL: select * from flink55; ¡ select as语句 执行SQL: select user_id,amout as ddd from flink55; 【注意】自定义字段名需避免使用保留关键字。 ¡ 过滤查询select * from [tab] where [filter_condition]语句 执行SQL: select * from flink55 where user_id > 2; ¡ 查询嵌套常用udf 执行SQL: select UPPER(product) from flink55; ¡ 分组聚合 执行SQL: select user_id,sum(amount) as cnt from flink55 group by user_id ¡ 去重语法 执行SQL: select DISTINCT user_id from flink55; ¡ having语法 执行SQL: select sum(amount) from flink55 group by user_id having sum(amount) > 5; ¡ join语法 前置条件: - 创建kafka topic:test002,并向test002中生产部分模拟数据。 - 创建新的Flink表(表中字段与topic test002中字段格式需保持一致)。 以上前置条件全部满足之后,执行SQL: select flink55.user_id,flink55.product from flink55 inner join flink55_2 on flink55.user_id = flink55_2.user_id; 【注意】此情况下,时间字段不能作为查询过滤条件及查询返回结果字段。 ¡ 分组窗口聚合 在当前版本中,DLH会话中窗口查询结果不在控制台直接打印,实际计算结果需在对应的Flink作业监控页面进行查询。实际业务场景下,多数是通过insert into select from 语句来执行窗口操作。 3.5 MLSQLMLSQL是基于MLlib的SQL中间件。MLlib是Spark的机器学习库,目标是让机器学习更加容易且更具扩展性,使用Mllib时需要编写MLlib应用,操作复杂。但是通过MLSQL,用户可直接通过SQL语句使用MLlib中的算法,使操作的便利性得到了很大提高。 MLSQL目前支持监督算法(比如:分类算法、回归算法)、非监督算法(比如:聚类算法)、特征工程算法、模型选择、Pipeline机制。 3.5.1 监督算法 1. 语法· 训练语法 SELECT TrainAlgorithmName(’model_result_table’,label,col1,col2,…,[’parms’]) from TrainTableName; · 预测语法 SELECT PredictAlgorithmName(‘model_result_table’,col1,col2,col3,…,’id’,’predict_result_table’) from PredictTableName; 2. 语法说明表3-33 训练语法说明 参数 说明 常见配置选项 TrainAlgorithmName 训练算法名称,必须指定,支持的训练算法见算法说明:训练算法部分 无 model_result_table 算法训练结果模型表,必须指定 无 TrainTableName 训练数据表名,用作算法的数据源,必须指定 无 label 训练数据表中的列,用作算法的参数,作为标签列,必须指定 无 col1,col2,… 训练数据表中的列,用作算法的参数,作为特征列,必须指定 无 parms 算法的其它参数,根据算法需要,可以不指定(内容格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …),参数说明 无
表3-34 预测语法说明 参数名称 参数解释 备注 PredictAlgorithmName 预测算法名称,必须指定,算法说明:预测算法部分 无 model_result_table 模型表名,必须指定 用来指定使用哪个模型进行预测,对应训练中的model_result_table PredictTableName 预测数据表名,用来预测的数据,必须指定 无 id 预测数据表的id列,用作算法的参数,作为标签列,必须指定 无 col1,col2,… 表的列,用作算法的参数,必须指定 无 predict_result_table 使用模型进行预测后的结果表 临时表 3. 算法说明 表3-35 训练算法 算法名 算法说明 备注 LogisticRegression 逻辑回归训练函数 分类 DecisionTreeClassifier 决策树分类训练函数 [注意]:参与训练的数据属性列的值不能相同 分类 GBTClassifier GBT分类训练函数 [注意]:参与训练的数据属性列的值不能相同 分类(二分类) RandomForestClassifier 随机森林分类训练函数 [注意]:参与训练的数据属性列的值不能相同 分类 MultilayerPerceptronClassifier 多层感知机训练函数 分类 NaiveBayes 朴素贝叶斯训练函数 分类 OneVsRest Onevsrest训练函数:一般用于多分类,需要设置基础二分类分类器 分类 LinearRegression 线性回归训练函数 回归 GeneralizedLinearRegression 广义线性回归训练函数 回归 DecisionTreeRegressor 决策树回归训练函数 [注意]:参与训练的数据属性列的值不能相同 回归 RandomForestRegressor 随机森林回归训练函数 [注意]:参与训练的数据属性列的值不能相同 回归 GBTRegressor GBT回归训练函数 [注意]:参与训练的数据属性列的值不能相同 回归 IsotonicRegression 保序回归训练函数 回归 AFTSurvivalRegression AFT-Survival回归训练函数 回归
表3-36 预测算法 算法名 算法说明 备注 LogisticRegressionPrediction 逻辑回归预测函数 分类预测 DecisionTreeClassificationPrediction 决策树分类预测函数 分类预测 GBTClassificationPrediction GBT分类预测函数 分类预测 RandomForestClassificationPrediction 随机森林分类预测函数 分类预测 MultilayerPerceptronPrediction 多层感知机分类预测函数 分类预测 NaiveBayesPrediction 朴素贝叶斯分类预测函数 分类预测 OneVsRestPrediction Onevsrest分类预测函数 分类预测 LinearRegressionPrediction 线性回归预测函数 回归预测 GeneralizedLinearRegressionPrediction 广义线性回归预测函数 回归预测 DecisionTreeRegressionPrediction 决策树回顾预测函数 回归预测 GBTRegressorPrediction GBT回归预测函数 回归预测 RandomForestRegressorPrediction 随机森林预测函数 回归预测 IsotonicRegressionPrediction 保序回归预测函数 回归预测 AFTSurvivalRegressionPrediction Aft-Survival回归预测函数 回归预测 4. 参数说明 表3-37 参数说明 参数名 参数说明 支持算法 aggregationdepth 设置分布式统计时的层数,主要用在treeAggregate中,数据量越大,可适当加大这个值,默认是2,int型 logisticregression、aftsurvivalregression、linearregression standardization 训练模型前是否需要对训练特征进行标准化处理,boolean型 Logisticregression、linearregression threshold 二分类预测的阈值,范围[0,1],double型 Logisticregression thresholds 多分类预测的阈值,array[double]型 Logisticregression fitintercept 是否训练intercept,boolean型 Logisticregression、aftsurvivalregression、linearregression、generalizedlinearregression tol 优化算法迭代求解过程中的收敛阈值,默认值:1e-4,不能为负数,double型 Logisticregression、multilayerperceptronclassifier、aftsurvivalregression、generalizedlinearregression、linearregression weightcol 列权重,string型 Logisticregression、naivebayes、isotonicregression、generalizedlinearregression、linearregression regparam 正则化参数(>=0),调节拟合程度,越小-过拟合,越大-欠拟合;double型 Logisticregression、generalizedlinearregression、linearregression elasticnetparam 弹性网络混合参数(范围[0,1]),用于调节L1和L2之间的比例,两种正则化比例加起来是1,默认是0,即只使用L2正则化,设置为1,即只使用L1正则化;double型 Logisticregression、linearregression family 模型中使用的误差分布类型 · Logisticregression可选:auto,binomial,multinomial,不写默认使用auto · GeneralizedLinearregression可选:gaussian,binomial,poisson,gamma Logisticregression、generalizedlinearregression seed 随机种子,long型 Decisiontreeclassifier、gbtclassifier、randomforestclassifier、multilayerperceptronclassifier、 decisiontreeregressor、gbtregressor、randomforestregressor maxdepth 树的最大高度,int型 Decisiontreeclassifier、gbtclassifier、randomforestclassifier、decisiontreeregressor、gbtregressor、randomforestregressor maxbins 每个特征分裂时,最大划分(桶)数量;int型 Decisiontreeclassifier、gbtclassifier、randomforestclassifier、decisiontreeregressor、gbtregressor、randomforestregressor mininstancespernode 需保证节点分割出的子节点的最少样本数达到这个值,int型 Decisiontreeclassifier、gbtclassifier、randomforestclassifier、decisiontreeregressor、gbtregressor、randomforestregressor mininfogain 最小信息增益,当前节点的所有属性分割带来的信息增益都要比这个值大,double型 Decisiontreeclassifier、gbtclassifier、randomforestclassifier、decisiontreeregressor、gbtregressor、randomforestregressor subsamplingrate 二次抽样率,指定每棵树训练的数据比例,double型 gbtclassifier、randomforestclassifier、gbtregressor、randomforestregressor maxiter 优化算法求解的最大迭代次数,默认值100,int型 LinearRegression、 Logisticregression、Gbtclassifier、multilayerperceptronclassifier、 AFTSurvivalRegression、GBTRegressor、GeneralizedLinearRegression stepsize 每次迭代优化步长,double型,取值范围:(0,1] Gbtclassifier、multilayerperceptronclassifier、gbtregressor losstype 使用哪种损失函数 可选用:hinge、logistic、squared Gbtclassifier、gbtregressor blocksize 该参数被前馈网络训练器用来将训练样本数据的每个分区都按照blocksize大小分成不同组,并且每个组内的每个样本都会叠加成一个向量,以便在各种优化算法间传递;int型 multilayerperceptronclassifier layers 必须设置,注意输入/输出的层数 multilayerperceptronclassifier solver 优化的求解算法 · Multilayerperceptronclassifier可选:l-bfgs,gd · Linearregression可选:auto,l-bfgs,normal · Generalizedlinearregression可选:irls Multilayerperceptronclassifier、generalizedlinearregression、linearregression modeltype 模型类型(Multinomial,Bernoulli),实际上依据特征的分布不同,朴素贝叶斯又划分为多个子类别,string型 naivebayes smoothing 一般采用拉普拉斯平滑进行处理,double型 naivebayes classifier 设置基础二分类分类器,在onevsrest算法中必须要指定的 onevsrest featuresubsetstrategy 特征子集策略,每棵树中使用那些特征做分裂,这个参数可以用小数比例形式指定,减少这个值会加速训练,但太小会影响效果 Randomforestclassifier、randomforestregressor numtrees 随机森林中决策树个数 Randomforestclassifier、randomforestregressor censorcol 设置中心列 aftsurvivalregression isotonic 递增保序回归或递减保序回归,boolean型 isotonicregression link 连接函数名,描述线性预测器和分布函数均值之间关系 · family为gaussian时可选:identity、log、inverse · family为binomial时可选:logit、probit、cloglog · family为poisson时可选:log、identity、sqrt · family为gamma时可选:inverse、identity、log generalizedlinearregression linkpredictioncol 连接函数(线性预测器列名) generalizedlinearregression 5. 示例 · LogisticRegression算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,0.0,2.3,4.5,5.6); Insert into mltable values(2,1.0,6.5,3.8,4.9); 运行语句示例: ------------------训练----------------------- drop table if exists lor_model; select LogisticRegression('lor_model',label,col1,col2,col3,'-MaxIter 20') from mltable; select * from lor_model; --------------------预测--------------------- select LogisticRegressionPrediction('lor_model',col1,col2,col3,'id','lor_pred') from mltable; select * from lor_pred a,mltable b where a.id = b.id; · DecisionTreeClassifier算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,0.0,5.7,5.9,7.9); Insert into mltable values(2,1.0,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists dtc_model; select DecisionTreeClassifier('dtc_model',label,col1,col2,col3) from mltable; select * from dtc_model; --------------------预测--------------------- select DecisionTreeClassificationPrediction('dtc_model',col1,col2,col3,'id','dtc_pred') from mltable; select * from dtc_pred a,mltable b where a.id = b.id; · GBTClassifie算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,0.0,5.7,5.9,7.9); Insert into mltable values(2,1.0,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists gbtc_model; select GBTClassifier('gbtc_model',label,col1,col2,col3) from mltable; select * from gbtc_model; --------------------预测--------------------- select GBTClassificationPrediction('gbtc_model',col1,col2,col3,'id', 'gbtc_pred') from mltable; select * from gbtc_pred a,mltable b where a.id = b.id; · RandomForestClassifier算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,0.0,5.7,5.9,7.9); Insert into mltable values(2,1.0,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists rfc_model; select RandomForestClassifier('rfc_model',label,col1,col2,col3) from mltable; select * from rfc_model; --------------------预测--------------------- select RandomForestClassificationPrediction('rfc_model',col1,col2,col3,'id', 'rfc_pred') from mltable; select * from rfc_pred a,mltable b where a.id = b.id; · MultilayerPerceptronClassifier算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,0.0,5.7,5.9,7.9); Insert into mltable values(2,1.0,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists mpc_model; select MultilayerPerceptronClassifier('mpc_model',label,col1,col2,col3, '-Layers 3 3 2') from mltable; select * from mpc_model; --------------------预测--------------------- select MultilayerPerceptronPrediction('mpc_model',col1,col2,col3,'id', 'mpc_pred') from mltable; select * from mpc_pred a,mltable b where a.id = b.id; · NaiveBayes算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,0.0,5.7,5.9,7.9); Insert into mltable values(2,1.0,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists nbc_model; select NaiveBayes('nbc_model',label,col1,col2,col3) from mltable; select * from nbc_model; --------------------预测--------------------- select NaiveBayesPrediction('nbc_model',col1,col2,col3,'id', 'nbc_pred') from mltable; select * from nbc_pred a,mltable b where a.id = b.id; · OneVsRest算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,0.0,5.7,5.9,7.9); Insert into mltable values(2,1.0,7.8,2.6,12.1); 语句示例: ---------------------训练----------------------- drop table if exists ovrc_model; select OneVsRest('ovrc_model',label,col1,col2,col3,'-Classifier NaiveBayes') from mltable; select * from ovrc_model; --------------------预测------------------------ select OneVsRestPrediction('ovrc_model',col1,col2,col3,'id', 'ovrc_pred') from mltable; select * from ovrc_pred a,mltable b where a.id = b.id; · LinearRegression算法 数据准备: Create table mltable2 (id int,label double,col1 double,col2 double,col3 double); Insert into mltable2 values(1,8.6,5.7,5.9,7.9); Insert into mltable2 values(2,7.8,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists lr_model; select LinearRegression('lr_model',label,col1,col2,col3) from mltable2; select * from lr_model; --------------------预测--------------------- select LinearRegressionPrediction('lr_model',col1,col2,col3,'id', 'lr_pred') from mltable2; select * from lr_pred a,mltable2 b where a.id = b.id; · GeneralizedLinearRegression算法 数据准备: Create table mltable2 (id int,label double,col1 double,col2 double,col3 double); Insert into mltable2 values(1,8.6,5.7,5.9,7.9); Insert into mltable2 values(2,7.8,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists glr_model; select GeneralizedLinearRegression('glr_model',label,col1,col2,col3) from mltable2; select * from glr_model; --------------------预测--------------------- select GeneralizedLinearRegressionPrediction('glr_model',col1,col2,col3,'id', 'glr_pred') from mltable2; select * from glr_pred a,mltable2 b where a.id = b.id; · DecisionTreeRegression算法 数据准备: Create table mltable2 (id int,label double,col1 double,col2 double,col3 double); Insert into mltable2 values(1,8.6,5.7,5.9,7.9); Insert into mltable2 values(2,7.8,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists dtr_model; select DecisionTreeRegressor('dtr_model',label,col1,col2,col3) from mltable2; select * from dtr_model; -------------------预测--------------------- select DecisionTreeRegressionPrediction('dtr_model',col1,col2,col3,'id', 'dtr_pred') from mltable2; select * from dtr_pred a,mltable2 b where a.id = b.id; · RandomForestRegression算法 数据准备: Create table mltable2 (id int,label double,col1 double,col2 double,col3 double); Insert into mltable2 values(1,8.6,5.7,5.9,7.9); Insert into mltable2 values(2,7.8,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists rfr_model; select RandomForestRegressor('rfr_model',label,col1,col2,col3) from mltable2; select * from rfr_model; -------------------预测--------------------- select RandomForestRegressorPrediction('rfr_model',col1,col2,col3,'id', 'rfr_pred') from mltable2; select * from rfr_pred a,mltable2 b where a.id = b.id; · GBTRegression算法 数据准备: Create table mltable2 (id int,label double,col1 double,col2 double,col3 double); Insert into mltable2 values(1,8.6,5.7,5.9,7.9); Insert into mltable2 values(2,7.8,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists gbtr_model; select GBTRegressor('gbtr_model',label,col1,col2,col3) from mltable2; select * from gbtr_model; -------------------预测--------------------- select GBTRegressorPrediction('gbtr_model',col1,col2,col3,'id', 'gbtr_pred') from mltable2; select * from gbtr_pred a,mltable2 b where a.id = b.id; · IsotonicRegression算法 数据准备: Create table mltable2 (id int,label double,col1 double,col2 double,col3 double); Insert into mltable2 values(1,8.6,5.7,5.9,7.9); Insert into mltable2 values(2,7.8,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists gbtr_model; select IsotonicRegression('isor_model',label,col1,col2,col3) from mltable2; select * from isor_model; -------------------预测--------------------- select IsotonicRegressionPrediction('isor_model',col1,col2,col3,'id', 'isor_pred') from mltable2; select * from isor_pred a,mltable2 b where a.id = b.id; · AFTSurvivalRegressioin算法 数据准备: Create table afts_table (id int,label double,censor double,col1 double,col2 double); Insert into afts_table values(1,8.6,1.0 ,5.9,7.9); Insert into afts_table values(2,7.8,0.0,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists aftsr_model; select AFTSurvivalRegression('aftsr_model',label,censor,col1,col2) from afts_table; select * from aftsr_model; -------------------预测--------------------- select AFTSurvivalRegressionPrediction('aftsr_model',col1,col2,'id', 'aftsr_pred') from afts_table; select * from aftsr_pred a,afts_table b where a.id = b.id; 3.5.2 非监督算法 1. 语法· 训练语法 SELECT TrainAlgorithmName(’model_result_table’,label,col1,col2,…,[’parms’]) from TrainTableName; · 预测语法 SELECT PredictAlgorithmName(‘model_result_table’,col1,col2,col3,…,’id’,’predict_result_table’) from PredictTableName; 2. 语法说明表3-38 训练语法说明 参数 说明 常见配置选项 TrainAlgorithmName 训练算法名称,必须指定,支持的训练算法见算法说明:训练算法部分 无 model_result_table 算法训练结果模型表,必须指定 无 TrainTableName 训练数据表名,用作算法的数据源,必须指定 无 label 训练数据表中的列,用作算法的参数,作为标签列,必须指定 无 col1,col2,… 训练数据表中的列,用作算法的参数,作为特征列,必须指定 无 parms 算法的其它参数,根据算法需要,可以不指定(内容格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …),详细解释见参数说明 无
表3-39 预测语法说明 参数名称 参数解释 备注 PredictAlgorithmName 预测算法名称,必须指定,支持的预测算法见算法说明:预测算法部分 无 model_result_table 模型表名,必须指定 用来指定使用哪个模型进行预测,对应训练中的model_result_table PredictTableName 预测数据表名,用来预测的数据,必须指定 无 id 预测数据表的id列,用作算法的参数,作为标签列,必须指定 无 col1,col2,… 表的列,用作算法的参数,必须指定 无 predict_result_table 使用模型进行预测后的结果表 临时表 3. 算法说明 表3-40 训练算法 算法名 算法说明 备注 kmeans Kmeans聚类训练函数 聚类 lda Lda训练函数 聚类 gaussianmixture 高斯混合训练函数 聚类 als Als协同过滤训练函数,算法必须加参数-ratingcol,-usercol,-itemcol 协同过滤
表3-41 预测算法 算法名 算法说明 备注 kmeansprediction Kmeans聚类预测函数 聚类预测 ldaprediction Lda预测函数 聚类预测 gmprediction 高斯混合预测函数 聚类预测 alsprediction als预测函数 协同过滤预测 4. 参数说明 表3-42 参数说明 参数名 参数说明 支持算法 k 设置聚类K值,int型 Kmeans、lda、gaussianmixture initmode 初始中心点选择方式,“random”“k-means” Kmeans initsteps 设置初始化步长 Kmeans docconcentrations 文档中心,array[double]型 lda docconcentration Dirichlet分布的参数a,文档在主题上分布的先验参数(超参数a)。当前必须大于1,值越大,推断出的分布越平滑。默认为-1,自动设置 lda optimizedocconcentration 是否优化文档中心列 lda optimizer 优化器,用来学习LDA模型,一般是EMLDAOptimizer或OnlineLDAOptimizer;可选值:em或online lda alpha ALS隐式反馈变化率 als implicitprefs 使用显式反馈ALS变量或隐式反馈 als itemcol 设置item列,必须要设置 als nonnegative 是否支持非负 als numblocks 并行计算的block数(-1为自动配置) als numitemblocks Item的block数 als numuserblocks User的block数 als rank 对应ALS模型中的隐藏因子数,即矩阵分解出的两个矩阵的新的行/列数 als ratingcol Rating列,必须要设置 als usercol User列,必须要设置 als tol 优化算法迭代求解过程中的收敛阈值,默认值是1e-4,不能为负数 Kmeans、gaussianmixture maxiter 优化算法求解的最大迭代次数(>=0),默认值100 Kmeans、lda、gaussianmixture、als regparam 控制模型的正则化过程,从而控制模型的过拟合情况 als checkpointinterval 检查点间隔,当maxiter很大的时候,检查点可以帮助减少shuffle文件大小并且可以帮助故障恢复;int型 Lda、als seed 随机种子,long型 Kmeans、lda、gaussianmixture、als 5. 示例 · KMeans算法 数据准备: Create table mltable1 (id int,col1 double,col2 double,col3 double,col4 double); Insert into mltable1 values(1,8.6,5.7,5.9,7.9); Insert into mltable1 values(2,7.8,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists km_result; select KMeans('km_result',col1,col2,col3,col4,'-MaxIter 20 -K 2') from mltable1; select * from km_result; -------------------预测--------------------- select KMeansPrediction('km_result',col1,col2,col3,col4,'id','km_pred') from mltable1; select * from km_pred a,mltable1 b where a.id = b.id; · LDA算法 数据准备: Create table mltable1 (id int,col1 double,col2 double,col3 double,col4 double); Insert into mltable1 values(1,8.6,5.7,5.9,7.9); Insert into mltable1 values(2,7.8,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists lda_model; select LDA('lda_model',col1,col2,col3,col4) from mltable1; select * from lda_model; -------------------预测--------------------- select LDAPrediction('lda_model',col1,col2,col3,col4,'id','lda_pred') from mltable1; select * from lda_pred a,mltable1 b where a.id = b.id; · GaussianMixture算法 数据准备: Create table mltable1 (id int,col1 double,col2 double,col3 double,col4 double); Insert into mltable1 values(1,8.6,5.7,5.9,7.9); Insert into mltable1 values(2,7.8,7.8,2.6,12.1); 语句示例: ------------------训练----------------------- drop table if exists gm_model; select GaussianMixture('gm_model',col1,col2,col3,col4) from mltable1; select * from gm_model; -------------------预测--------------------- select GMPrediction('gm_model',col1,col2,col3,col4,'id','gm_pred') from mltable1; select * from gm_pred a,mltable1 b where a.id = b.id; · ALS算法 数据准备: Create table als_table (userid int,movieid int,rating double,timestamp bigint); Insert into als_table values(0,2,3,1424380312); Insert into als_table values(1,36,2,1424380312); Insert into als_table values(2,4,3,1424380312); 语句示例: ------------------训练----------------------- drop table if exists als_model; select ALS('als_model',userId,movieId,rating,'-UserCol userId -ItemCol movieId -RatingCol rating') from als_table; select * from als_model; -------------------预测-------------------- select alsprediction('als_model',userId,movieId,rating,'userId', 'p_als_0') from als_table; select * from p_als_0; 3.5.3 模型选择 1. 语法· 训练语法 SELECT ModelSelectTypeAlgorithmName(label,col1,col2,col3,…,’model_result_table’, ’-Estimator TrainAlgorithmName [params1]’, ‘[params2]’, ‘-Evaluator EvaluatorName [params3]’) from TrainTableName; · 预测语法 SELECT ModelSelectTypePredictAlgorithmName(‘model_result_table’,col1,col2,col3,…,’id’,’predict_result_table’) from PredictTableName; 2. 语法说明表3-43 训练语法说明 参数 说明 常见配置选项 TrainAlgorithmName 训练算法名称,必须指定,支持的训练算法见算法说明:训练算法部分 无 model_result_table 算法训练结果模型表,必须指定 无 TrainTableName 训练数据表名,用作算法的数据源,必须指定 无 label 训练数据表中的列,用作算法的参数,作为标签列,必须指定 无 col1,col2,… 训练数据表中的列,用作算法的参数,作为特征列,必须指定 无 EvaluatorName 模型选择所使用的评估器,可选:regression、multiclassclassification、binaryclassification 若使用二分类训练算法,则使用binaryclassification评估器;若使用多分类训练算法,则使用multiclassclassification评估器;若使用回归训练算法,则使用regression回归器; params1 模型选择所使用的训练算法的参数,根据算法需要,可以不指定(内容格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …),详细解释见参数说明 无 params2 模型选择的网格参数设置,(内容格式:-N MultilayerPerceptronClassifierumFolds n -G_Param G_ParamValue …) 待选择的参数组,可以不指定,详细解释见参数说明 其中NumFolds参数必须指定 params3 评估算法设定的参数,可以不指定,(内容格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …) 评估器参数ParamKey可选:metricname、labelcol、rawpredictioncol、predictioncol
表3-44 预测语法说明 参数名称 参数解释 备注 PredictAlgorithmName 预测算法名称,必须指定,支持的预测算法见算法说明:预测算法部分 无 model_result_table 模型表名,必须指定 用来指定使用哪个模型进行预测,对应训练中的model_result_table PredictTableName 预测数据表名,用来预测的数据,必须指定 无 id 预测数据表的id列,用作算法的参数,作为标签列,必须指定 无 col1,col2,… 表的列,用作算法的参数,必须指定 无 predict_result_table 使用模型进行预测后的结果表 临时表 3. 算法说明 表3-45 训练算法 算法名 算法说明 备注 ModelSelect_CrossValidation 使用十则交叉的方式进行模型选择 十则交叉
表3-46 预测算法 算法名 算法说明 备注 CrossValidationPrediction 十则交叉模型选择预测函数 十则交叉 4. 参数说明 表3-47 参数说明 参数名 参数说明 备注 regParam 控制模型的正则化过程,从而控制模型的过拟合情况 无 maxIter 优化算法求解的最大迭代次数(>=0),默认值100 无 threshold 二分类预测的阈值,范围[0,1],double型 无 thresholds 多分类预测的阈值,array[double]型 无 checkpointInterval 检查点间隔,当maxiter很大的时候,检查点可以帮助减少shuffle文件大小并且可以帮助故障恢复,int型 无 fitIntercept 是否训练intercept,boolean型 无 standardization 训练模型前是否需要对训练特征进行标准化处理,boolean型 无 seed 随机种子 无 elasticNetParam 弹性网络混合参数(范围[0,1]),用于调节L1和L2之间的比例,两种正则化比例加起来是1,默认是0,即只使用L2正则化,设置为1,即只使用L1正则化,double型 无 tol 优化算法迭代求解过程中的收敛阈值,默认值:1e-4,不能为负数,double型 无 stepSize 每次迭代优化步长,double型 无 solver 优化的求解算法 无 aggregationDepth 设置分布式统计时的层数,主要用在treeAggregate中,数据量越大,可适当加大这个值,默认是2,int型 无 featuresCol 设置特征列 无 labelCol 设置标签列 无 predictionCol 设置预测列 无 inputCol 设置输入列,string型 无 inputCols 设置输入列,array[string]型 无 outputCol 设置输出列 无 weightCol 设置权重列 无 5. 示例 · 十则交叉模型选择 数据准备: Create table test_table (id int,label double,col1 double,col2 double,col3 double); Insert into test_table values(1,8.6,5.7,5.9,7.9); Insert into test_table values(2,7.8,7.8,2.6,12.1); Insert into test_table values(3,8.8,9.6,4.6,32.1); Insert into test_table values(4,11.9,12.5,8.6,62.1); Insert into test_table values(5,16.9,17.5,6.6,32.1); Insert into test_table values(6,21.9,23.4,10.6,43.2); 语句示例: select ModelSelect_CrossValidation(label,col1,col2,col3,'model_table1','-Estimator LinearRegression -MaxIter 10000','-NumFolds 3 -G_elasticNetParam 0.4 0.5 -G_regParam 0.1 0.2','-Evaluator Regression -MetricName mse') from test_table; select CrossValidationPrediction('model_table1',col1,col2,col3,'id','res_table1') from test_table; select * from res_table1; 3.5.4 特征工程 1. 语法· 不包含训练过程的特征工程语法 SELECT FeatureAlgorithmName(’transform_result_table’,’id’,col1,…,[’parms’]) from TableName; · 包含训练过程的特征工程语法 SELECT FeatureAlgorithmName(‘model_result_table’,’transform_result_table’,’id’,col1,…,[’parms’]) from TableName; 2. 语法说明表3-48 不包含训练过程的特征工程语法说明 参数 说明 常见配置选项 FeatureAlgorithmName 特征工程算法名称,必须指定 无 transform_result_table 转换结果表名,必须指定 无 TableName 表名,用作算法的数据源,必须指定 无 id 表的连接列的字段名称,指定tablename表中的列名作为该参数值,必须指定 无 col1,,… 表的列,用作算法的参数,必须指定 无 parms 算法的其它参数,根据算法需要,可以不指定(格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …) 无
表3-49 包含训练过程的特征工程语法说明 参数 说明 常见配置选项 FeatureAlgorithmName 特征工程算法名称,必须指定 无 model_result_table 训练结果模型表,必须指定 无 transform_result_table 转换结果表名,必须指定 无 tablename 表名,用作算法的数据源,必须指定 无 id 表的连接列的字段名称,指定tablename表中的列名作为该参数值,必须指定 无 col1,,… 表的列,用作算法的参数,必须指定 无 parms 算法的其它参数,根据算法需要,可以不指定(格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …) 无 3. 算法说明 表3-50 包含训练的特征工程算法 算法名 算法说明 支持参数 Word2vec 一种嵌入方法,可计算每个单词在给定语料库环境下的分布式词向量 InputCol:String //设置输入列 OutputCol:String //设置输出列 MaxIter:Int //设置最大迭代次数 MaxSentenceLength:Int //设置最大句子长度 MinCount:Int //设置 NumPartitions:Int //设置分区数 Seed:Long //设置种子 StepSize:Double //设置步长 VectorSize:Int //设置转换后的向量的长度 WindowSize:Int //设置 CountVectorizer 通过计数来将一个文档转换为向量 Binary:Boolean // InputCol:String //设置输入列 MinDF:Double // MinTF:Double // OutputCol:String //设置输出列 VocabSize:Int //设置词汇量 Pca 主成分分析,用来降低特征维度 InputCol:String //设置输入列—指定为features OutputCol:String //设置输出列 K:Int //设置主成分数 stringindexer 将字符串标签列编码为标签索引 InputCol:String //设置输入列 OutputCol:String //设置输出列 vectorindexer 解决向量数据集中的类别特征索引 MaxCategories:Int//最大类别数,小于它的认为是类别特征,否则被认为是连续特征 InputCol:String//设置输入列 OutputCol:String//设置输出列 standardscaler 用来转换一个向量行数据集,规范化每一个特征,使其标准差、均值为0 InputCol:String//设置输入列 OutputCol:String//设置输出列 WithStd:Boolean//设置是否使用标准差 WithMean:Boolean//设置是否使用均值 minmaxscaler 可将向量行缩放到范围[0,1] InputCol:String//设置输入列 OutputCol:String//设置输出列 maxabsscaler 该特征的绝对值的最大值转换向量行,使其特征在范围[-1,1]内 InputCol:String//设置输入列 OutputCol:String//设置输出列 quantileDiscretizer 连续特征转化为类别特征,类别数通过numBuckets参数来设置 RelativeError:Double//近似的精度控制 NumBuckets:Int//箱化数 InputCol:String//设置输入列 OutputCol:String//设置输出列 RFormula 通过形如clicked ~ country + hour这个关系式,得到新的特征和标签列 FeaturesCol:String //设置特征列 LabelCol:String//设置标签列 Formula:Sring//设置关系式 ChiSqSelector 卡方选择器 FeaturesCol:String//设置特征列 LabelCol:String//设置标签列 OutputCol:Int//输出的属性 NumTopFeatures:Int//选中排在前几个的属性 Percentile:Double//选中排在前百分之几的属性 SelectorType:String//设置卡方选择器的类型
表3-51 不包含训练的特征工程算法 算法名 算法说明 支持参数 Tfidf 文本挖掘中使用的特征向量化方法 Binary:Boolean InputCol:String//设置输入列 OutputCol:String//设置输出列 NumFeatures:Int//设置特征数 MinDocFreq:Int tokenizer 将文本(如:一个句子)分割成一个个独立的词(通常是一个单词) InputCol:String//设置输入列 OutputCol:String//设置输出列 stopwordsremover 从数据中删除停用词 CaseSensitive:Boolean//设置大小写敏感 InputCol:String//设置输入列 OutputCol:String//设置输出列 StopWords:Array[String]//设置停用词 ngram n-gram是一个序列,将输入特征转化为n-gram形式 N:Int //每个ngram中含有的元素个数 InputCol:String//设置输入列 OutputCol:String//设置输出列 binarize 二值化通过阈值将特征映射为(0/1) InputCol:String//设置输入列 OutputCol:String//设置输出列 Threshold:Double//设置阈值 polynomialExpansion 多项式扩展是将特征映射到多项式空间的过程 Degree:Int InputCol:String//设置输入列 OutputCol:String//设置输出列 onehotencoder 将标签索引列映射为二进制向量列 InputCol:String//设置输入列 OutputCol:String//设置输出列 DropLast:Boolean//是否删除最后类别 normalizer 可以转换向量行的数据集,规范化每一向量(0到1范围内) P:Double//设置P值 InputCol:String//设置输入列 OutputCol:String//设置输出列 elementwiseproduct 将每一个向量和权重向量使用元素对应相乘 InputCol:String//设置输入列 OutputCol:String//设置输出列 ScalingVec:Vector//设置乘数向量 vectorSlicer 输入特征向量,输出为新的特征向量,新的特征向量是原始特征向量的子集 Indices:Array[Int]//设置需要映射的索引 InputCol:String//设置输入列 OutputCol:String//设置输出列 Interaction 获取向量或实型列产生单一的向量列,向量列中包含输入向量中所有连接的结果 InputCols::Array[String]//设置输入列 OutputCol::String//设置输出列 VectorAssembler 可以将多个列合并为一个列 InputCols::Array[String] //设置输入列 OutputCol:String//设置输出列 4. 示例 · TFIDF算法 数据准备: Create table tfidf_table (id int,sentence string); Insert into tfidf_table values(1, 'test'); Insert into tfidf_table values(2, 'mytest'); 运行语句: select TFIDF('tfidf_trans_res','id',sentence ,'-InputCol sentence -OutputCol features') from tfidf_table; select * from tfidf_trans_res; select * from tfidf_trans_res a,tfidf_table b where a.id = b.id; · Word2Vec算法 数据准备: Create table word2vec_table (id int,text array); Insert into word2vec_table select 1,array('sparrow', 'mysparrow'); Insert into word2vec_table select 2,array('test','mytest'); 运行语句: drop table if exists w2v_mod_res; select Word2Vec ('w2v_mod_res','w2v_trans_res','id',text,'-VectorSize 3 -MinCount 0 -InputCol text -OutputCol result') from word2vec_table; select * from w2v_mod_res; select * from w2v_trans_res; · CountVectorizer算法 数据准备: Create table countvectorizer_table (id int,words array); Insert into countvectorizer_table select 1,array('sparrow', 'mysparrow'); Insert into countvectorizer_table select 2,array('test', 'mytest'); 运行语句: drop table if exists countvec_mod_res; select CountVectorizer('countvec_mod_res','countvec_trans_res','id',words,'-VocabSize 3 –MinDF 0 -InputCol words -OutputCol features') from countvectorizer_table; select * from countvec_mod_res; select * from countvec_trans_res; · PCA 算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,5.7,2.3,4.5,5.6); Insert into mltable values(2,8.6,6.5,3.8,4.9); 运行语句: drop table if exists pca_mod_res; select pca('pca_mod_res','pca_trans_res','id',col1,col2,col3,'-K 3 -InputCol features -OutputCol pcafeatures') from mltable; select * from pca_mod_res; select * from pca_trans_res; · Tokenizer算法 数据准备: Create table tokenizer_table (id int,sentence string); Insert into tokenizer_table values(1, 'test'); Insert into tokenizer_table values(2, 'mytest'); 运行语句: select Tokenizer('tokenizer_trans_res','id',sentence,'-InputCol sentence -OutputCol words') from tokenizer_table; select * from tokenizer_trans_res; · StopWordRemover算法 数据准备: Create table stopwordremover_table (id int,raw array); Insert into stopwordremover_table select 1,array('sparrow', 'mysparrow'); Insert into stopwordremover_table select 2, array('test', 'mytest'); 运行语句: select StopWordsRemover(' swr_trans_res ','id',raw,'-InputCol raw -OutputCol filtered') from stopwordremover_table; select * from swr_trans_res ; · Ngram算法 数据准备: Create table ngram_table (id int,words array); Insert into ngram_table select 1,array('sparrow', 'mysparrow'); Insert into ngram_table select 2, array('test', 'mytest'); 运行语句: select NGram (' ngram_trans_res ','id', words,'-N 2 -InputCol words -OutputCol ngrams') from ngram_table; select * from ngram_trans_res; · Binarizer算法 数据准备: Create table binarizer_table (id int,feature double); Insert into binarizer_table values(1,5.7); Insert into binarizer_table values(2,2.6); 运行语句: select binarize('bin_trans_res','id', feature,'-Threshold 0.5 -InputCol feature -OutputCol binarized_feature') from binarizer_table; select * from bin_trans_res; · PolynomialExpansion算法 数据准备: Create table polynominalexpansion (id int,col1 double,col2 double); Insert into polynominalexpansion values(1,5.7,11.8); Insert into polynominalexpansion values(2,2.6,13.6); 运行语句: select PolynomialExpansion ('pe_trans_res','id',col1,col2,'-Degree 3 -InputCol features -OutputCol polyFeatures') from polynominalexpansion; select * from pe_trans_res; · StringIndex算法 数据准备: Create table stringindex_table (id int,category string); Insert into stringindex_table values(1, 'test'); Insert into stringindex_table values(2, 'mytest'); 运行语句: select StringIndexer ('si_mod_res',' si_trans_res ','id', category,' -InputCol category -OutputCol categoryIndex') from stringindex_table; select * from si_mod_res; select * from si_trans_res; · OneHotEncoder算法 数据准备: Create table onehotencoder_table (id int,category_index double); Insert into onehotencoder_table values(1,5); Insert into onehotencoder_table values(2,6); 运行语句: select OneHotEncoder ('onehot_trans_res','id',category_index,' -InputCol category_index -OutputCol categoryVec') from onehotencoder_table; select * from onehot_trans_res; · VectorIndexer算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,5.6,2.3,4.5,5.6); Insert into mltable values(2,7.8,6.5,3.8,4.9); 运行语句: select VectorIndexer ('vi_mod_res','vi_trans_res','id',col1,col2,col3,'- MaxCategories 10 -InputCol features -OutputCol indexed') from mltable; select * from vi_mod_res; select * from vi_trans_res; · Normalizer算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,5.6,2.3,4.5,5.6); Insert into mltable values(2,7.8,6.5,3.8,4.9); 运行语句: select Normalizer('norm_trans_res','id',col1,col2,col3,'-P 1.0 -InputCol features -OutputCol normFeatures') from mltable; select * from norm_trans_res; · StandardScaler算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,5.6,2.3,4.5,5.6); Insert into mltable values(2,7.8,6.5,3.8,4.9); 运行语句: select StandardScaler ('standard_mod_res','standard_trans_res','id',col1,col2,col3,'-WithStd true -WithMean false -InputCol features -OutputCol scaledFeatures') from mltable; select * from standard_mod_res; select * from standard_trans_res; · MinMaxScaler算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,5.6,2.3,4.5,5.6); Insert into mltable values(2,7.8,6.5,3.8,4.9); 运行语句: select MinMaxScaler('minmax_mod_res','minmax_trans_res','id',col1,col2,col3,'-InputCol features -OutputCol scaledFeatures') from mltable; select * from minmax_mod_res; select * from minmax_trans_res; · MaxAbsScaler算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,5.6,2.3,4.5,5.6); Insert into mltable values(2,7.8,6.5,3.8,4.9); 运行语句: select MaxAbsScaler('maxabs_mod_res','maxabs_trans_res','id',col1,col2,col3,'-InputCol features -OutputCol scaledFeatures') from mltable;
select * from maxabs_mod_res; select * from maxabs_trans_res; · ElementwiseProduct算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,5.6,2.3,4.5,5.6); Insert into mltable values(2,7.8,6.5,3.8,4.9); 运行语句: select ElementwiseProduct('ep_trans_res','id',col1,col2,col3,'-ScalingVec 0.0 1.0 2.0 -InputCol vector -OutputCol transformedVector') from mltable; select * from ep_trans_res; · QuantileDiscretizer算法 数据准备: Create table quantile_discret_table (id int,hour double); Insert into quantile_discret_table values(1,5.6); Insert into quantile_discret_table values(2,7.8); 运行语句: select QuantileDiscretizer('qd_mod_res','qd_trans_res','id',hour,'-NumBuckets 3 -InputCol hour -OutputCol result') from quantile_discret_table; select * from qd_mod_res; select * from qd_trans_res; · VectorSlicer算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,5.6,2.3,4.5,5.6); Insert into mltable values(2,7.8,6.5,3.8,4.9); 运行语句: select VectorSlicer('trans_res','id',col1,col2,col3,'-Indices 1 2 -InputCol userFeatures -OutputCol features') from mltable; select * from trans_res; · RFormula算法 数据准备: Create table rformula_table (id int,counter string,hour double,clicked double); Insert into rformula_table values(1, 'henan',2.3,5.6); Insert into rformula_table values(2, 'hangzhou',6.5,3.8); 运行语句: select RFormula('rf_mod_res','rf_trans_res','id',counter,hour,clicked,'-Formula clicked~counter+hour -FeaturesCol features -LabelCol label') from rformula_table; select * from rf_mod_res; select * from rf_trans_res; · ChiSqSelector算法 数据准备: Create table mltable (id int,label double,col1 double,col2 double,col3 double); Insert into mltable values(1,5.6,2.3,4.5,5.6); Insert into mltable values(2,7.8,6.5,3.8,4.9); 运行语句示例: Select ChiSqSelector('css_mod_res','css_trans_res','id',col1,col2,col3,label,'-NumTopFeatures 1 -FeaturesCol features -LabelCol label -OutputCol selectedFeatures') from mltable; select * from css_mod_res; select * from css_trans_res; · Interaction算法 数据准备: Create table vectorassemble_table (id int,col1 double,col2 double,col3 double,col4 double,col5 double ,col6 double); Insert into vectorassemble_table values(1,5.6,2.3,4.5,5.6,7.8,9.2); Insert into vectorassemble_table values(2,7.8,6.5,3.8,4.9,8.7,2.6); 运行语句示例: select Interaction('inter_trans_res','id',col1,col2,col3,col4,col5,col6,'-InputCols vec1 vec2 -OutputCol interactedCol') from vectorassemble_table; Select * from inter_trans_res; · VectorAssembler算法 数据准备: Create table vectorassemble_table (id int,col1 double,col2 double,col3 double,col4 double,col5 double ,col6 double); Insert into vectorassemble_table values(1,5.6,2.3,4.5,5.6,7.8,9.2); Insert into vectorassemble_table values(2,7.8,6.5,3.8,4.9,8.7,2.6); 运行语句示例: Select VectorAssembler('va_trans_res','id',col1,col2,col3,'-InputCols col1 col2 col3 -OutputCol vec1') from vectorassemble_table; select * from va_trans_res; 3.5.5 Pipeline机制 1. 语法· 训练语法 SELECT pipelinetrain(‘pipeline_model_table’,’stage_algAndparms’,col1,col2,..) from TableName; · 预测语法 SELECT pipelineprediction(‘pipeline_model_table’ ,col1,col2,..,’id’, ’predict_result_table’) from TableName; 2. 语法说明表3-52 训练语法说明 参数 说明 常见配置选项 pipelinetrain 指定为pipeline机制训练,且固定 无 pipeline_modelt_table 训练结果模型表,必须指定 无 Stage_algAndparms Pipeline各阶段使用的算法和其参数 无 id 表的连接列的字段名称,指定tablename表中的列名作为该参数值,必须指定 无 col1,,… 表的列,用作算法的参数,必须指定 无 TableName 表名,用作算法的数据源,必须指定 无
表3-53 预测语法说明 参数 说明 常见配置选项 pipelineprediction 指定为pipeline机制预测,必须指定 无 pipeline_modelt_table 预测使用的模型表,必须指定 无 col1,,… 表的列,用作算法的参数,必须指定 无 id 表的连接列的字段名称,指定tablename表中的列名作为该参数值,必须指定 无 predict_result_table 预测结果表名,将预测结果存储在该表中,必须指定 无 tablename 表名,用作算法的数据源,必须指定 无 3. 参数说明 Stage_algAndparms: 通过-alg来指定该阶段使用的算法(算法说明,算法说明_1和算法说明_2),其他参数参考对应算法中的说明。 4. 示例· 创建训练数据 create table pipeline_train_table (id int,text string,label double); insert into pipeline_train_table values (0,"a b c d e spark",1.0); insert into pipeline_train_table values (1,"b d",0.0); insert into pipeline_train_table values (2,"spark f g h",1.0); insert into pipeline_train_table values (3,"hadoop mapreduce",0.0); · 使用SQL调用pipeline机制,并查看训练模型存储信息 select piplinetrain('pipeline_model', '-alg tokenizer -inputcol text -outputcol words', '-alg hashingtf -inputcol words -outputcol features', '-alg logisticregression', id,text,label) from pipeline_train_table; select * from pipeline_model; · 使用SQL预测新数据,并查看预测结果 create table pipeline_test_table (id int,text string); insert into pipeline_test_table values (4,"spark i j k"), (5,"l m n"), (6,"spark hadoop spark"), (7,"apache hadoop"); select pipelineprediction('pipeline_model',text,'id','pipeline_res_table') from pipeline_test_table; select * from pipeline_res_table; 3.6 列加密DLH支持对String数据类型的列数据进行加密,支持AES、DES和SM4三种加密算法及公钥和私钥两种加密方式。 · 目前DLH只支持对text、orc、parquet格式的表进行列加密。 · DLH不支持以load的方式将数据导入加密表。 · 当前版本中,暂不支持复杂类型的加密嵌套。
用户需要在建表时指定列加密相关参数。列加密参数如表3-54所示,加密参数一旦设置就不允许再进行修改。 表3-54 列加密参数 参数名 说明 encrypt.columns 该参数指定加密列,列必须是String类型 encrypt.algorithm 该参数指定加密算法,支持AES、DES和SM4 encrypt.type 该参数指定加密方式,支持public和private。可以不指定,默认public encrypt.key 该参数指定密钥。当设置encrypt.type为private时才需要设置此参数。DES加密算法要求密钥的长度大于8位,SM4加密算法要求长度为16 encrypt.pattern 该参数指定加密模式,支持CBC或ECB。在加密算法为SM4时设置 3.6.1 公钥加密 使用公钥加密,会将数据加密后再存储。只有使用DLH查询才能得到真实的数据,直接查看数据文件或者通过其他方式查询,只能查看到加密数据。 示例: · 在DLH中建立ORC格式表,并使用公钥加密和DES加密算法对表的列name、passwd进行数据加密。 create table employee(id int,name string,passwd string) tblproperties("encrypt.columns"="name,passwd","encrypt.algorithm"="DES"); insert into employee values(1,'zhangsan','43211'); select * from employee; +-----+-----------+---------+ | id | name | passwd | +-----+-----------+---------+ | 1 | zhangsan | 43211 | +-----+-----------+---------+ · 在DLH中建立parquet格式的表,并使用公钥加密方式和AES加密算法对列salary进行数据加密。 create table salary(id int,name string,salary string) stored as parquet tblproperties ("encrypt.columns"="salary","encrypt.algorithm"="AES"); insert into salary values(1,'zhangsan','8999.5'); select * from salary; +-----+-----------+---------+ | id | name | salary | +-----+-----------+---------+ | 1 | zhangsan | 8999.5 | +-----+-----------+---------+ · 在DLH中建立textfile格式的表,并使用公钥加密方式和SM4加密算法的CBC模式对列salary进行数据加密。 create table employeeinfo(id int,city string,address string) stored as textfile tblproperties("encrypt.columns"="city,address","encrypt.algorithm"="SM4","encrypt.pattern"="CBC"); insert into employeeinfo values (1,'hnzz','yulanjie'); select * from employeeinfo; +-----+-------+-----------+ | id | city | address | +-----+-------+-----------+ | 1 | hnzz | yulanjie | +-----+-------+-----------+ · 配置Hive连接DLH使用的metastore地址(如:thrift://sharedev1.hde.com:19083,thrift://sharedev2.hde.com:19083),通过Hive查看这两个表,只能看到加密数据。 [root@node2~]# beeline --hiveconf hive.metastore.uris=thrift://sharedev1.hde.com:19083,thrift://sharedev2.hde.com:19083 beeline> !connect jdbc:hive2://node2:10000 Connecting to jdbc:hive2://node2:10000 Enter username for jdbc:hive2://node2:10000: hdfs Enter password for jdbc:hive2://node2:10000: Connected to: Apache Hive (version 2.1.1-cdh6.2.0) Driver: Hive JDBC (version 2.1.1-cdh6.2.0) Transaction isolation: TRANSACTION_REPEATABLE_READ 0: jdbc:hive2://node2:10000> select * from employee; +--------------+---------------------------+------------------+--+ | employee.id | employee.name | employee.passwd | +--------------+---------------------------+------------------+--+ | 1 | OYuK0vlc7KHZo7+6lh609A== | xSIrlXaRby4= | +--------------+---------------------------+------------------+--+ 1 row selected (3.923 seconds) 0: jdbc:hive2://node2:10000> select * from salary; +------------+--------------+---------------------------+--+ | salary.id | salary.name | salary.salary | +------------+--------------+---------------------------+--+ | 1 | zhangsan | 0K51RX+6P2FWJ6Rmd7A8Dg== | +------------+--------------+---------------------------+--+ 1 row selected (1.519 seconds)
0: jdbc:hive2://node2:10000> select * from employeeinfo; +------------------+-------------------------+---------------------------+--+ | employeeinfo.id | employeeinfo.name | employeeinfo.salary | +------------------+-------------------------+---------------------------+--+ | 1 | eCccpUM/EjhaKlzkIl3qg== | P3YPNJ8bm4Mc3lbdK2vr8w== | +------------------+-------------------------+---------------------------+--+ 1 row selected (1.519 seconds) 3.6.2 私钥加密私钥加密也会将数据加密后再存储,同时要求在查询或者插入数据之前先设置私钥,否则认为操作是不被允许的。设置的私钥是对当前Session有效的。 设置私钥的语法如下: set encrypt.key.db_name.table_name=encrypt_key; 示例: · 在DLH中建立orc格式的表,并指定私钥加密和DES加密算法对数据表列name、passwd进行数据加密。 create table employee1(id int,name string,passwd string) tblproperties("encrypt.columns"="name,passwd","encrypt.algorithm"="DES","encrypt.type"="private","encrypt.key"="qwertyui"); · 当不设置私钥时,插入和查询都是不允许的。 insert into employee1 values(1,'zhangsan','43211'); java.lang.Exception: encrypt.key is not right · 设置私钥后可以正常插入和查询。 set encrypt.key.default.employee1=qwertyui; insert into employee1 values(1,'zhangsan','43211'); select * from employee1; +-----+-----------+---------+ | id | name | passwd | +-----+-----------+---------+ | 1 | zhangsan | 43211 | +-----+-----------+---------+ Time taken: 0.111 seconds, Fetched 1 row(s) · 查看表信息时,encrypt.key也已经被加密。 sparrow> show create table employee1; CREATE TABLE `employee1`(`id` int, `name` string, `passwd` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' TBLPROPERTIES ( 'numFiles' = '1', 'transient_lastDdlTime' = '1536046320', 'totalSize' = '40', 'encrypt.algorithm' = 'DES', 'encrypt.key' = 'FumPB0oW5xSwdVxa/MkAnQ==', 'encrypt.columns' = 'name,passwd', 'encrypt.type' = 'private', 'COLUMN_STATS_ACCURATE' = 'true' ) 3.7 权限访问控制根据集群是否开启“安全管理/权限管理”功能,集群中新建用户获取DLH组件权限的方式不同: · 当集群未开启权限管理时,集群中新建用户可直接对DLH组件执行库表的创建、修改、插入、删除等操作,无需进行授权操作。 · 当集群开启权限管理时,集群中新建用户的DLH组件权限需要在[集群权限/角色管理]页面进行配置,集群用户绑定已配置DLH权限的角色后才可执行相关操作。
权限管理是集群安全管理的重要组成部分,在开启权限管理的集群中,权限基于角色进行统一管理,角色是权限的集合。 为集群中用户赋予权限的整体流程如下: (1) 新建角色,并为角色配置权限。 (2) 新建用户,并将角色分配用户,用户即拥有角色所具有的权限。 3.7.1 权限说明在开启权限管理的集群中,用户对DLH组件的操作需被赋予相关权限后才能执行。 DLH支持对数据库表和数据库UDF配置权限(支持使用通配符*模糊适配),并且提供数据脱敏和行级过滤功能。 · 数据库的配置权限:数据库表可对数据库、数据表、列配置权限,权限包括:select、update、create、drop、alter、index、use,其中all表示配置所有权限。 · 数据库UDF的配置权限:数据库UDF可对数据库、UDF配置权限,权限包括create、drop。 · 数据脱敏:是指按照某种规则对指定列进行脱敏,保证数据安全性,DLH数据脱敏策略如表表3-56所示。 · 行级过滤:是指对表配置行级过滤策略,仅展示满足过滤策略的数据(若过滤策略配置为空则不过滤)。行级过滤类似where子句,示例:配置策略id>1时,查询表时会自动获取id大于1的数据 DLH操作所需权限对应关系如表3-55所示,DLH数据脱敏策略如表3-56所示。 表3-55 DLH权限说明 权限类型 对应的组件常用操作 select 库表查询等相关操作,如:select、export、show、describe等 update 插入或更新库表等相关操作,如:insert、insert overwrite、delete、update、load等 create 创建库表等相关操作,如create table、create database等 【说明】库表的创建者默认不拥有库表的使用权限,但是通过“权限管理”功能可授予其使用权限 drop 删除库表等相关操作,如:drop table等 alter 修改库表等相关操作,如:alter table等 index 索引等相关操作,如:create index等 use 执行show database、use database等操作 all 支持以上所有操作
表3-56 DLH数据脱敏策略说明 数据脱敏策略 策略规则 详细说明 MASK 对字母和数字脱敏 · string:x替代小写字母,X替代大写字母,n替代数字 · int:1替代数字 · date:"1900-01-01"替代原值 · decimal:NULL值替代原值 MASK_SHOW_LAST_4 后四位不脱敏 · string:仅显示后四位,其他用x代替 · int:仅显示后四位,其他用1代替; · date:"1900-01-01"替代原值 · decimal:NULL值替代原值 MASK_SHOW_FIRST_4 前四位不脱敏 · string:仅显示前四位,其他用x代替 · int:仅显示前四位,其他用1代替 · date:"1900-01-01"替代原值 · decimal:NULL值替代原值 MASK_HASH 显示HASH值 · string:Hash值替代原值 · int/date/decimal:NULL值替代原值 MASK_NULL 显示NULL值 NULL值替代原值 MASK_NONE 显示原值 显示原值 MASK_DATE_SHOW_YEAR 脱敏时间信息 · string:x替换数字 · int:1替代数字 · date:显示日期年份,月份和日期均用01代替 · decimal:NULL值替代原值
· 若DLH要使用数据脱敏和行级过滤功能,必须在组件详情页面[配置]页签下的高级配置中,配置dlh.query.engine=hive;dlh.query.engine默认值为hive,若切换为dlh.query.engine=presto则不支持数据脱敏和行级过滤功能。 · 新建拥有DLH权限的角色时,若要使用数据脱敏和行级过滤则必须首先配置数据库表的select权限。 3.7.2 权限使用操作示例 1. 数据库权限控制 表3-57 数据库授权配置 操作 权限要求(数据库示例为dlhtest) create database 数据库:*或dlhtest,数据表:*,列:*,权限:create use database 数据库:*或dlhtest,数据表:*,列:*,权限:use drop database 数据库:*或dlhtest,数据表:*,列:*,权限:drop
下面以授予create权限为例,对dlhtest数据库授予create权限(create权限默认拥有use权限),介绍数据库的权限访问控制(其它权限的授予方式与create权限的操作类似,不再重复说明)。操作步骤如下: (1) 新建用户 在[集群权限/用户管理]页面,新建dlhuser01用户,授权前执行创建dlhtest数据库操作。如图3-4所示,此时显示dlhuser01用户没有dlhtest数据库的create权限。 图3-4 无执行权限
(2) 新建角色 在[集群权限/角色管理]页面,新建dlh01角色,授予dlh01角色dlhtest数据库的create权限。 图3-5 新建角色
(3) 用户绑定角色 在[集群权限/用户管理]页面,单击dlhuser01用户对应的按钮,为其绑定dlh01角色。 图3-6 为用户绑定角色进行授权
(4) 使用dlhuser01用户重新执行创建dlhtest数据库的操作。如图3-7所示,此时显示dlhtest数据库创建成功。 图3-7 执行成功 2. 表权限控制 表3-58 表授权配置 操作 权限要求(数据库示例为dlhtest,表示例为dlhtab1) create table 数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:create alter table 数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:alter insert into table · Hive 数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:update · YARN 队列:*或default,权限:submit-app delete from table(针对ACID表) · Hive 数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:update · YARN 队列:*或default,权限:submit-app update table(针对ACID表) · Hive 数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:update · YARN 队列:*或default,权限:submit-app select * from table · Hive 数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:select · YARN 队列:*或default,权限:submit-app create index 数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:index drop table 数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:drop
示例1:下面以授予update权限为例,对dlhtest数据库dlhtab1数据表授予update权限,介绍数据表的权限访问控制。操作步骤如下: (1) 新建用户 在[集群权限/用户管理]页面,新建dlhuser21用户,授权前执行插入dlhtest.dlhtab1数据表的操作。如图3-8所示,此时显示dlhuser21用户没有dlhtest.dlhtab1数据表的update权限。 图3-8 无执行权限
(2) 新建角色 在[集群权限/角色管理]页面,新建dlh21角色,授予dlh21角色dlhtest.dlhtab1数据表的update权限。 图3-9 新建角色
(3) 用户绑定角色 在[集群权限/用户管理]页面,单击dlhuser21用户对应的按钮,为其绑定角dlh21色。 (4) 使用dlhuser21用户重新执行插入dlhtest.dlhtab1数据表的操作。如图3-10所示,此时显示dlhuser21用户没有提交到yarn default队列的权限。 图3-10 无执行权限
(5) 修改角色授权 在[集群权限/角色管理]页面,单击dlh21角色对应的按钮,修改该角色的组件权限设置,授予dlh21角色default队列的submit-app权限。 图3-11 编辑角色
(6) 使用dlhuser21用户重新执行插入dlhtest.dlhtab1数据表的操作。如图3-12所示,此时显示dlhtest. dlhtab1数据表插入数据成功。 图3-12 执行成功
示例2:下面以授予select权限为例,对dlhtest数据库dlhtab1数据表授予select权限,介绍数据表的权限访问控制。操作步骤如下: (1) 继续使用dlhuser21用户执行查询dlhtest.dlhtab1数据表的操作。如图3-13所示,此时显示dlhuser21用户没有dlhtest. dlhtab1数据表的select权限。 图3-13 无执行权限
(2) 修改角色授权 在[集群权限/角色管理]页面,单击dlh21角色对应的按钮,修改该角色的组件权限设置,授予dlh21角色dlhtest. dlhtab1数据表的select权限。 图3-14 编辑角色
(3) 使用dlhuser21用户重新执行查询dlhtest. dlhtab1数据表的操作。如图3-15所示,此时显示查询dlhtest. dlhtab1数据表数据成功。 图3-15 执行成功 3. 列级别权限控制 表3-59 授权配置 操作 权限要求(数据库示例为dlhtest,数据表示例为dlhtab1,列示例为name) insert into table · Hive 数据库:*或dlhtest,数据表:*或dlhtab1,列:name,权限:update · YARN 队列:*或default,权限:submit-app select name from table 数据库:*或dlhtest,数据表:*或dlhtab1,列:name,权限:select
示例1:下面以授予select权限为例,对dlhtest数据库dlhtab1数据表的name列授予select权限,介绍数据列的权限访问控制。操作步骤如下: (1) 新建用户 在[集群权限/用户管理]页面,新建dlhuser02用户,授权前执行在dlhtest.dlhtab1数据表中查询name列的操作。如图3-16所示,此时显示dlhuser02用户没有dlhtab1数据表的查询权限。 图3-16 无执行权限
(2) 新建角色 在[集群权限/角色管理]页面,新建dlh02角色,授权dlhtest数据库dlhtab1数据表的name列的select权限。 图3-17 新建角色
(3) 用户绑定角色 在[集群权限/用户管理]页面,单击dlhuser02用户对应的按钮,为其绑定dlh02角色。 (4) 使用dlhuser02用户重新执行在dlhtest.dlhtab1数据表中查询name列的操作。如图3-18所示,此时显示查询操作执行成功。 图3-18 执行成功
示例2:下面以授予update权限为例,对dlhtest数据库dlhtab1数据表的name列授予update权限,介绍数据列的权限访问控制。操作步骤如下: (5) 继续使用dlhuser02用户在dlhtest. dlhtab1数据表的name列执行插入的操作。如图3-19所示,此时显示没有update权限。 图3-19 无执行权限
(6) 修改角色授权 在[集群权限/角色管理]页面,单击dlh02角色对应的按钮,修改该角色的组件权限设置,授予dlh02角色dlhtest. dlhtab1中name列的update权限,及YARN的default队列的submit-app权限。 图3-20 编辑角色
(7) 使用dlhuser02用户重新在dlhtest. dlhtab1数据表的name列执行插入的操作。如图3-21和图3-22所示,此时显示插入操作执行成功。 图3-21 执行成功
图3-22 查询插入结果 4. UDF权限控制 表3-60 授权配置 操作 权限要求(数据库示例为dlhtest,UDF示例为mylowerperm1) create function 数据库:*或dlhtest,UDF:*或mylowerperm1,权限:create drop function 数据库:*或dlhtest,UDF:*或mylowerperm1,权限:drop select function 不需要授权 show functions 不需要授权
(1) 创建函数类文件 package com.dlh.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text;
public final class LowerFunc extends UDF{ public Text evaluate(final Text s){ if(s == null){return null;} return new Text(s.toString().toLowerCase()); } } 其中pom.xml内容如下: 4.0.0
org.example dlhUdfTest 1.0
org.apache.hive hive-exec 2.1.1-cdh6.2.0
org.apache.maven.plugins maven-shade-plugin 2.4.3
org.apache.maven.plugins maven-compiler-plugin 3.0
1.7 1.7
(2) 将上述文件编译打包后上传到测试集群中,本示例中打包后名字为dlhUdfTest-1.0.jar。 (3) 新建用户 在[集群权限/用户管理]页面,新建udfuser03用户。如图3-23所示,将udf测试jar包上传到HDFS的/tmp/udflib目录下,在授权前执行create操作。如图3-24所示,此时显示用户没有dlhtest数据库下UDF mylowerperm1的create权限。 图3-23 将udf测试jar包上传到HDFS
图3-24 无执行权限
(4) 如图3-25所示,修改HDFS的/tmp/udflib目录权限为777,目的是当其他用户调用该函数时拥有该jar包的访问权限。 图3-25 修改HDFS的/tmp/udflib目录权限
(5) 新建角色 在[集群权限/角色管理]页面,新建udf03角色,授权dlhtest数据库下UDF mylowerperm1的create权限。 图3-26 新建角色
(6) 用户绑定角色 在[集群权限/用户管理]页面,单击udfuser03用户对应的按钮,为其绑定udf03角色。 (7) 使用udfuser03用户重新执行dlhtest数据库下UDF mylowerperm1的create操作,如图3-27和图3-28所示,此时显示create操作执行成功。 图3-27 执行成功
图3-28 查看创建的内容
在新的Session中执行show functions命令时,可能查看不到新建函数,这是因为之前创建function时连接的DLHServer2和当前连接不同。执行reload functions命令或重启当前连接的DLHServer2,即可重新读取数据库的函数并加载到缓存,此时重新执行show functions命令即可查看到新建函数。 5. 数据脱敏配置 数据脱敏是指按照某种规则对指定列进行脱敏,保证数据安全性。配置数据脱敏规则需先配置数据库表的select权限,然后进行数据脱敏配置。下面简单介绍数据脱敏配置的整体流程。 【示例一】 以配置脱敏规则为MASK为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下: (1) 配置数据库表mask01.t01的select权限,即对数据库mask01数据表t01授予select权限。新建用户dlhmask01和角色dlhmask01,为角色授予select权限后,将该用户绑定到dlhmask01角色,并执行查询表操作。详细过程请参考3.7.2 2. 章节,查询结果如图3-29所示。 图3-29 mask01.t01原始数据
(2) 配置脱敏规则MASK 在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则为MASK,即通过规则“x替代小写字母,X替代大写字母,n替代数字”来实现对列name的数据脱敏。 图3-30 配置脱敏规则MASK
(3) 执行查询表mask01.t01操作。如图3-31所示,信息表明对mask01库下t01表的列name脱敏成功。 图3-31 MASK脱敏成功
【示例二】 以配置脱敏规则为MASK_SHOW_LAST_4为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下: (1) 配置脱敏规则MASK_SHOW_LAST_4 在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则的脱敏规则为MASK_SHOW_LAST_4,即通过规则“仅显示后四位,其他用x代替”来实现对列name的数据脱敏。 图3-32 配置脱敏规则MASK_SHOW_LAST_4
(2) 执行查询表mask01.t01操作。如图3-33所示,信息表明对mask01库下t01表的列name脱敏成功。 图3-33 MASK_SHOW_LAST_4脱敏成功
【示例三】 以配置脱敏规则为MASK_SHOW_FIRST_4为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下: (1) 配置脱敏规则MASK_SHOW_FIRST_4 在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则的脱敏规则为MASK_SHOW_FIRST_4,即通过规则“仅显示前四位,其他用x代替”来实现对列name的数据脱敏。 图3-34 配置脱敏规则MASK_SHOW_FIRST_4
(2) 执行查询表mask01.t01操作。如图3-35所示,信息表明对mask01库下t01表的列name脱敏成功。 图3-35 MASK_SHOW_FIRST_4脱敏成功
【示例四】 以配置脱敏规则为MASK_HASH为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下: (1) 配置脱敏规则MASK_HASH 在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则的脱敏规则为MASK_HASH,即通过规则“Hash值替代原值”来实现对列name的数据脱敏。 图3-36 配置脱敏规则MASK_HASH
(2) 执行查询表mask01.t01操作。如图3-37所示,信息表明对mask01库下t01表的列name脱敏成功。 图3-37 MASK_HASH脱敏成功
【示例五】 以配置脱敏规则为MASK_NULL为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下: (1) 配置脱敏规则MASK_NULL 在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则的脱敏规则为MASK_NULL,通过规则“NULL值替代原值”来实现对列name的数据脱敏。 图3-38 配置脱敏规则MASK_NULL
(2) 执行查询表mask01.t01操作。如图3-39所示,信息表明对mask01库下t01表的列name脱敏成功。 图3-39 MASK_NULL脱敏成功
【示例六】 以配置脱敏规则为MASK_NONE为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下: (1) 配置脱敏规则MASK_NONE 在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则的脱敏规则为MASK_NONE,即“显示原值”。 图3-40 配置脱敏规则MASK_NONE
(2) 执行查询表mask01.t01操作。如图3-41所示,数据显示原值。 图3-41 MASK_NONE脱敏成功。
【示例七】 以配置脱敏规则为MASK_DATE_SHOW_YEAR为例,对数据库mask01数据表t03的birth列进行数据脱敏,操作步骤如下: (1) 配置数据库表mask01.t03的select权限,即对数据库mask01数据表t03授予select权限,执行查询表操作。详细过程请参考3.7.2 2. 章节,查询结果如图3-42所示。 图3-42 mask01.t03原始数据
(2) 配置脱敏规则MASK_DATE_SHOW_YEAR 在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列birth的脱敏规则的脱敏规则为MASK_DATE_SHOW_YEAR。 图3-43 配置脱敏规则MASK_DATE_SHOW_YEAR
(3) 执行查询表mask01. t03操作。如图3-44所示,若birth列为date类型,通过规则“显示日期年份,月份和日期均用01代替”来实现对列birth的数据脱敏。 图3-44 MASK_DATE_SHOW_YEAR脱敏成功
注意:若birth列为string类型,配置脱敏规则MASK_DATE_SHOW_YEAR时,年月日均用x替换来实现脱敏。 6. 行级过滤配置配置行级过滤规则需先配置数据库表的select权限,然后配置行级过滤权限。对数据库mask01数据表t01配置行级过滤策略,操作步骤如下: (1) 配置数据库表mask01.t01的select权限,即对数据库mask01数据表t03授予select权限,执行查询表操作。详细过程请参考3.7.2 2. 章节(本章仍以dlhmask01用户和dlhmask01角色为例进行说明),当前若已完成该配置,则跳过此步。 (2) 配置行级过滤规则 在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的的过滤规则为“id>123 and name like '%mask%'”(若配置为空则不过滤)。 图3-45 配置行级过滤规则
(3) 执行查询表mask01. t01操作。如图3-46所示,即为“id>123 and name like '%mask%'”过滤后的结果。 图3-46 行级过滤结果 3.8 添加/删除进程 3.8.1 添加进程 DLH支持添加DLH Metastore、DLHServer2进程。在并发查询任务较多的情况下,可以考虑增加DLH Metastore和DLHServer2进程,分摊查询负载,提高并发数。 1. 操作示例· 本章节仅以添加DLH Metastore进程为例进行说明,其它进程操作类似不再进行说明。 · 若集群中所有节点均已安装DLH Metastore,添加DLH Metastore进程前则需要先在集群中添加主机,然后再执行添加DLH Metastore进程的操作。如果集群中已有添加进程所需要的主机,则可直接执行添加DLH Metastore进程的操作。
添加DLH Metastore进程的操作步骤如下: (1) 在DLH组件详情页面,在右上角组件操作的下拉框中选择按钮。 (2) 弹出添加进程窗口,如图3-47所示。 a. 选择进程及主机 在选择进程项的下拉列表中选择可添加的组件进程,在选择主机项的主机列表中勾选进程安装在哪一个主机上(支持多选)。 b. 部署进程 选择结束后单击下一步部署进程,直至部署进度条结束(部署过程中不支持中止)。 c. 启动进程 部署进程结束后单击下一步启动进程,直至启动进度条结束(启动过程中不支持中止)。 图3-47 添加进程
(3) 查看进程变化 DLH Metastore添加完成之后,在组件详情页面[部署拓扑]页签中可以查看DLH Metastore进程的数量变化以及状态。 (4) 重启组件(根据实际情况选择) 进入集群详情页面,选择[组件]页签,需根据页面提示重新启动相关组件。 3.8.2 删除进程DLH支持删除DLH Metastore、DLHServer2进程。 · 在并发任务较少的情况下,可以考虑减少DLH Metastore和DLHServer2进程,节省集群资源。 · 删除进程后DLH Metastore、DLHServer2分别对应进程的个数均不能少于2个。 1. 操作示例· 删除进程操作不仅可以在组件详情页面的[部署拓扑]页签下执行,也可以在[集群管理/主机管理/主机监控]下的主机详情页面执行。本章节仅以“在组件详情页面的[部署拓扑]页签下执行删除DLH Metastore进程操作”为例进行说明,其它进程操作类似不再进行说明。 · 执行删除DLH Metastore进程操作前,请确认DLH Metastore是否有运行的任务,如果有运行的任务时,删除进程会影响任务执行。
删除DLH Metastore进程的操作步骤如下: (1) 在DLH组件详情页面[部署拓扑]页签下,选择要删除DLH Metastore进程的主机,然后单击该进程右侧操作中的按钮,停止DLH Metastore。 (2) 删除DLH Metastore 待DLH Metastore停止成功后,如图3-48所示,在该进程右侧操作中单击按钮,即可完成删除DLH Metastore。 图3-48 删除进程
(3) 查看进程变化 DLH Metastore进程删除完成之后,在组件详情页面[部署拓扑]页签中可以查看DLH Metastore进程的数量变化以及状态。 (4) 重启组件(根据实际情况选择) 进入集群详情页面,选择[组件]页签,需根据页面提示重新启动相关组件。 4 配置说明 4.1 DLH常用配置 4.1.1 sparrow-default配置 表4-1 sparrow-defaults常用配置 参数名称 参数说明 默认值 spark.eventLog.dir Sparrow事件日志目录 hdfs:///sparrow-history/ spark.eventLog.enabled Sparrow事件日志启用标志 true spark.history.fs.logDirectory Sparrow历史信息日志目录 hdfs:///sparrow-history/ spark.history.ui.port Sparrow历史信息UI端口 18082 spark.yarn.historyServer.address HistoryServer UI的地址 无 spark.yarn.applicationMaster.waitTries YARN模式下application等待Master的次数 10 spark.yarn.driver.memoryOverhead YARN模式下driver占用的内存 384 spark.yarn.max.executor.failures YARN模式下executor最大失败次数 3 spark.yarn.submit.file.replication YARN模式下应用程序上载到HDFS上的最大复制数 3 spark.security.authorization.enable 是否开启权限验证 false 4.1.2 sparrow-thrift-sparkconf配置 表4-2 sparrow-thrift-sparkconf常用配置 参数名称 参数说明 默认值 spark.security.authorization.enable JDBC连接是否开启权限验证 false spark.master ThriftServer的模式 当YARN的NodeManager只有一个时,该参数为local[4];否则为yarn-client spark.yarn.security.tokens.hbase.enabled 安全环境中,ThriftServer YARN模式下是否token认证HBase组件 true spark.yarn.security.credentials.hbase.enabled 安全环境中,ThriftServer YARN模式下是否使用HBase组件凭证 true spark.yarn.queue 使用的YARN队列 default spark.yarn.submit.file.replication 文件上传到HDFS的复本数 3 spark.yarn.driver.memoryOverhead 每个driver可以分配的非堆存储内存,这些内存用于如VM,字符串常量池以及其他本地额外开销等,单位:M 384 spark.yarn.executor.memoryOverhead 每个executor可以分配的非堆存储内存,这些内存用于如VM,字符串常量池以及其他本地额外开销等,单位:M 384 4.1.3 sparrow-hive-site-override配置 表4-3 sparrow-hive-site-override常用配置 参数名称 参数说明 默认值 hive.server2.thrift.port JDBC连接的端口号 10017 hive.server2.transport.mode 传输模式,分为http和binary binary hive.metastore.client.socket.timeout socket连接Hive Metastore的超时时长 1800 hive.security.authorization.manager Metastore授权管理类的名称 org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider 4.1.4 dlh-site配置 表4-4 dlh-site常用配置 参数名称 参数说明 默认值 hive.input.format 输入格式,默认HoodieCombineHiveInputFormat适配hudi表 org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat hive.server2.thrift.port Dlh组件JDBC连接的端口号 13000 hive.metastore.uris Dlh组件元数据库对应的地址 - hive.presto.enable 是否启用presto true hive.strict.checks.cartesian.product 是否开启笛卡尔积sql校验 true hive.flink.enable 是否启用flink true hive.fetch.task.conversion 是否将简单的查询简化为直接从文件中读取。none:禁用该功能;minimal:select * ,limit, filter在一个表所属的分区表上操作,直接从文件中读取数据;more:select,filter,limit 都直接从文件中读取数据 more hive.execution.engine Dlh组件默认执行引擎 spark hive.server2.webui.port Dlh组件UI页面端口 13002 hive.presto.table.threshold 表大小默认阈值为10GB,单位为B。当执行引擎为auto时,若表容量小于该阈值则查询走presto执行,若表容量大于该阈值则查询走hive执行 10737418240 hive.presto.http-server.http.port Presto的协调器节点的地址,比如:http://localhost:18089。若集群开启了HA,可配置为虚拟主机名及代理端口,比如:http://testclr-vserver.vip.hde.com:18090,这部分信息可以从Presto组件的自定义config配置http.proxy.url获取 - hive.compute.query.using.stats 是否使用元数据统计信息返回count统计值 true 4.2 Atlas常用配置 4.2.1 application-properties配置 表4-5 application-properties常用配置 参数名称 参数说明 默认值 atlas.rest.address Atlas服务Rest接口地址 无 atlas.server.http.port Atlas服务Rest接口服务端口 31000 atlas.notification.topics Atlas使用Kafka topics ATLAS_HOOK,ATLAS_ENTITIES atlas.notification.replicas Atlas使用Kafka topic的副本数 1 atlas.kafka.zookeeper.connect Atlas使用Kafka的zookeeper地址 无 atlas.kafka.bootstrap.servers Atlas使用Kafka的broker地址 无 atlas.graph.index.search.solr.zookeeper-url Atlas使用Infra-solr的zookeeper地址 无 atlas.graph.index.search.solr.mode Atlas使用Infra-Solr的集群模式 Cloud atlas.audit.hbase.tablename Atlas审计使用HBase的表名 ATLAS_ENTITY_AUDIT_EVENTS atlas.graph.storage.hbase.table Atlas图结构数据存储使用的HBase的表名 atlas_janus atlas.audit.hbase.zookeeper.quorum Atlas使用HBase的zookeeper地址 无 atlas.graph.storage.hostname Atlas使用HBase的主机名地址 无 5 常见问题解答 1. DLH执行简单count任务时,如 select count(1) from XXX,获取统计值不准确 · 原因分析:DLH中默认hive.compute.query.using.stats=true,为加快查询速度,使用元数据统计信息返回count统计值,但该统计信息会由于写表方式不同,导致统计不准确。 · 解决方法:DLH引擎执行count任务前设置参数set hive.compute.query.using.stats =false,启动count统计任务。
|
今日新闻 |
推荐新闻 |
专题文章 |
CopyRight 2018-2019 实验室设备网 版权所有 |