14 您所在的位置:网站首页 pgsql行转列连续日期 14

14

2023-05-19 10:28| 来源: 网络整理| 查看: 265

目  录

1 DLH简介

1.1 DLH概述

1.2 DLH架构

1.3 功能特点

1.4 应用场景

2 快速入门

2.1 组件安装

2.1.1 查看组件的日志信息

2.2 运行状态监控

2.2.1 查看组件详情

2.2.2 组件检查

2.3 快速使用指导

2.3.1 非Kerberos环境

2.3.2 Kerberos环境

2.4 快速链接使用

2.4.1 配置组件快速链接

2.4.2 访问监控页面

3 使用指南

3.1 离线查询&交互式查询

3.1.1 语法介绍

3.1.2 配置说明

3.1.3 示例

3.2 外部数据源

3.2.1 语法介绍

3.2.2 Hive数据源

3.2.3 SeaSQL MPP数据源

3.2.4 DataEngine MPP数据源

3.2.5 MySQL数据源

3.2.6 HBase数据源

3.2.7 跨源分析

3.3 Hudi存储格式

3.3.1 Hudi存储格式相关术语

3.3.2 文件组织形式

3.3.3 存储类型和视图

3.3.4 Hudi属性说明

3.3.5 SQL操作

3.4 流SQL操作

3.4.1 保留关键字

3.4.2 流SQL UDF

3.4.3 创建表

3.4.4 数据插入

3.4.5 数据查询

3.5 MLSQL

3.5.1 监督算法

3.5.2 非监督算法

3.5.3 模型选择

3.5.4 特征工程

3.5.5 Pipeline机制

3.6 列加密

3.6.1 公钥加密

3.6.2 私钥加密

3.7 权限访问控制

3.7.1 权限说明

3.7.2 权限使用操作示例

3.8 添加/删除进程

3.8.1 添加进程

3.8.2 删除进程

4 配置说明

4.1 DLH常用配置

4.1.1 sparrow-default配置

4.1.2 sparrow-thrift-sparkconf配置

4.1.3 sparrow-hive-site-override配置

4.1.4 dlh-site配置

4.2 Atlas常用配置

4.2.1 application-properties配置

5 常见问题解答

 

1 DLH简介 1.1  DLH概述

DLH(数据湖仓库)结合数据湖和数据仓库的优势,在数据湖存储上实现了与数据仓库类似的数据结构和数据管理功能,提供“湖仓一体化”的能力。

DLH以HDFS和ONEStor作为数据湖的集中存储库,能够存储结构化、半结构化和非结构化的数据。借助数据集成服务能够将外部系统中数据接入到数据湖内,建立统一管理的数据目录、元数据信息及血缘关系展示等。同时,通过统一SQL接口能够对湖内数据进行离线查询、交互式分析、跨源分析、实时流计算以及机器学习算法训练等。

1.2  DLH架构

DLH架构图如图1-1所示,说明如下:

·     DLH基于Hive提供统一SQL入口,不同场景下SQL语句底层执行引擎可自动进行切换,所有计算任务统一由YARN进行资源调度。

·     存储端支持多种存储格式和数据增量插入、增量查询等能力,并提供简单便捷的数据入湖工具。

·     通过统一的元数据管理界面,能够可视化管理库表结构信息及表数据量或外部数据源大小,并提供血缘关系展示等功能。

·     依赖大数据平台的认证/权限管理、加密管理、审计管理等模板保证DLH组件安全可靠。

图1-1 DLH架构图

 

1.3  功能特点

DLH的功能特点如下:

·     支持存储多种数据类型

能够存储结构化、半结构化和非结构化数据以及流式数据访问。

·     完全兼容Hive标准

Hive作为Hadoop生态SQL事实标准,具有广泛的用户。数据湖仓库服务完全兼容Hive使用方式和语法,原有业务不需要做任何改动。

·     元数据管理

DLH对湖内元数据进行统一管理,能够以可视化的方式管理库表结构信息及表数据量或外部数据源大小等信息,并提供血缘关系展示等功能。

·     流批交互式融合

通过Hive做为SQL统一入口,融合交互式查询、离线查询、流计算等引擎,并扩展SQL优化规则智能选择最佳执行引擎,提供大数据的能力、数据库的体验。

·     协同分析

能够将湖内数据与外部数据源(如MPP数据库、关系型数据库、HBase等)数据进行协同计算,无需搬迁数据即可进行联合分析。

·     数据安全管理

依靠Kerberos和Ranger构建完善的企业级数据安权认证和权限管控。

1.4  应用场景

·     交互式分析

在集群计算资源充足的情况下,DLH提供TB级别的数据交互式查询及分析能力。适用于如报表或大盘查询等高吞吐低时延的场景。

·     跨源查询分析

DLH支持对不同数据中心的多种数据源进行跨源查询及联合分析,避免数据迁移。目前支持对Hive、HBase、DataEngine MPP、SeaSQL MPP、MySQL进行不同集群的跨源分析。

·     数据仓库

数据仓库关注的是数据使用效率、大规模下的数据处理及管理能力。为保证数据和计算在湖和仓之间自由流动,DLH基于Hive进行产品设计及功能开发,完全兼容Hive语法,基于Hive的数仓方案无需进行迁移适配。

·     大数据实时流计算

DLH兼容Flink并实现FlinkSQL远程提交运行,目前DLH支持Flink流SQL通过统一入口直接下发Flink集群运行且无性能损耗。

·     机器学习

DLH集成Spark MLLib和GraphX模块并实现MLSQL和GraphSQL,可以方便的利用DLH计算能力对湖和仓中的数据进行机器学习算法训练和数据挖掘分析。

 

2 快速入门 2.1  组件安装

说明

·     关于在Hadoop集群中,安装DLH时的注意事项和操作指导以及部署过程中相关的参数说明等,详情请参见产品安装部署手册和在线联机帮助。

·     DLH依赖Hadoop、Zookeeper、Kafka、HBase、Flink、Presto以及Infra_Solr组件,安装DLH时要求完成上述组件的安装且组件状态正常,DLH组件安装过程无其他特殊要求。DLH安装完成后会自动同时安装Atlas、Hudi组件。

·     关于Presto组件操作的相关说明和指导,详情请参见Presto用户手册。

 

2.1.1  查看组件的日志信息

DLH安装完成后会自动同时安装Atlas组件,所以进行DLH管理时可能会需要查看这2个组件的日志。

表2-1 组件日志路径说明

组件

日志路径

DLH

·     DLH Server服务日志路径:/var/de_log/dlh/hive

·     DLH流SQL服务日志路径:/var/de_log/flink-sql-gateway/user_hdfs

Atlas

Atlas元数据服务日志路径:/var/de_log/atlas

 

2.2  运行状态监控 2.2.1  查看组件详情

因DLH组件依赖Atlas和Hudi组件,所以进行DLH管理时可能会需要访问这3个组件的组件详情页面。

1. 查看DLH组件详情

进入DLH组件详情页面,如图2-1所示。组件详情页面主要展示基本信息、部署拓扑、配置和配置修改历史等相关信息,同时可对组件或组件进程执行相关管理操作,可查看或修改组件的各配置项信息,也可查看组件的配置修改历史及当前使用配置版本。

主要功能如下:

(1)     基本信息:展示组件的基本配置信息,比如:组件内各服务进程的状态(鼠标悬停在各进程名上时,可查看进程的安装地址)、DLH JDBC连接URL(支持点击URL直接进行复制)。

(2)     部署拓扑:在组件详情的[部署拓扑]页签,可查看组件进程的安装详情以及运行状态详情,并可对组件执行停止、重启、删除等相关操作。

【说明】进程名:同一个进程可分别安装在多个主机节点上,所以进程列表中某一进程名可能重复出现,但同一进程名对应的主机名和主机IP不同。

(3)     配置:在组件详情的[配置]页签,可查看或修改组件各配置项的信息。

(4)     配置修改历史:在组件详情的[配置修改历史]页签,可查询组件的配置历史版本以及当前使用版本。

(5)     组件操作:在组件详情页面右上角,可对组件执行相关管理操作。比如:重启组件、添加进程、下载Client、访问快速链接、查看操作记录等。

图2-1 DLH组件详情

 

2. 查看Atlas组件详情

进入Atlas组件详情页面,如图2-2所示。组件详情页面主要展示基本信息、部署拓扑、配置和配置修改历史等相关信息,同时可对组件或组件进程执行相关管理操作,可查看或修改组件的各配置项信息,也可查看组件的配置修改历史及当前使用配置版本。

主要功能如下:

(1)     基本信息:展示组件的基本配置信息,比如:组件内各服务进程的状态(鼠标悬停在各进程名上时,可查看进程的安装地址)。

(2)     部署拓扑:在组件详情的[部署拓扑]页签,可查看组件进程的安装详情以及运行状态详情,并可对组件执行停止、重启等相关操作。

【说明】进程名:同一个进程可分别安装在多个主机节点上,所以进程列表中某一进程名可能重复出现,但同一进程名对应的主机名和主机IP不同。

(3)     配置:在组件详情的[配置]页签,可查看或修改组件各配置项的信息。

(4)     配置修改历史:在组件详情的[配置修改历史]页签,可查询组件的配置历史版本以及当前使用版本。

(5)     组件操作:在组件详情页面右上角,可对组件执行相关管理操作。比如:重启组件、添加进程、访问快速链接、查看操作记录等。

图2-2 Atlas组件详情

 

3. 查看Hudi组件详情

进入Hudi组件详情页面,如图2-3所示。组件详情页面主要展示基本信息、部署拓扑、配置和配置修改历史等相关信息,同时可对组件或组件进程执行相关管理操作,可查看或修改组件的各配置项信息,也可查看组件的配置修改历史及当前使用配置版本。

主要功能如下:

(1)     基本信息:展示组件的基本配置信息,比如:组件内各服务进程的状态(鼠标悬停在各进程名上时,可查看进程的安装地址)。

(2)     部署拓扑:在组件详情的[部署拓扑]页签,可查看组件进程的安装详情以及运行状态详情,并可对组件执行删除等相关操作。

【说明】进程名:同一个进程可分别安装在多个主机节点上,所以进程列表中某一进程名可能重复出现,但同一进程名对应的主机名和主机IP不同。

(3)     配置:在组件详情的[配置]页签,可查看或修改组件各配置项的信息。

(4)     配置修改历史:在组件详情的[配置修改历史]页签,可查询组件的配置历史版本以及当前使用版本。

(5)     组件操作:在组件详情页面右上角,可对组件执行相关管理操作。比如:重启组件、添加进程、查看操作记录等。

图2-3 Hudi组件详情

 

2.2.2  组件检查

因DLH组件依赖Atlas组件,所以进行DLH管理时可能会需要对这2个组件执行组件检查操作。

1. DLH组件检查

集群在使用过程中,根据实际需要,可对DLH组件执行组件检查操作。

(1)     对DLH组件执行组件检查的方式有以下三种,任选其一即可:

¡     在[集群管理/集群列表]页面,单击某集群名称可跳转至对应的集群详情页面。

-     在集群详情页面选择[组件]页签,单击组件列表中DLH组件对应的按钮。

-     在集群详情页面选择[组件]页签,单击组件列表中DLH组件名称进入组件详情页面,在右上角组件操作的下拉框中选择按钮。

¡     在组件管理的组件详情页面右上角组件操作的下拉框中选择按钮。

(2)     组件检查结束后,检查窗中会显示组件检查成功或失败的状态。如图2-4所示,组件检查成功则表示该组件可正常使用。

图2-4 组件检查

 

(3)     组件检查结束后,在组件详情页面单击按钮,进入操作记录窗口。可查看“DLH Service Check”组件操作执行的详细信息以及操作日志详情,根据操作日志可判断组件检查的具体情况。

图2-5 组件检查日志详情

 

2. Altas组件检查

集群在使用过程中,根据实际需要,可对Altas组件执行组件检查操作。

(1)     对Altas组件执行组件检查的方式有以下两种,任选其一即可:

¡     在[集群管理/集群列表]页面,单击某集群名称可跳转至对应的集群详情页面。

-     在集群详情页面选择[组件]页签,单击组件列表中Altas组件对应的按钮。

-     在集群详情页面选择[组件]页签,单击组件列表中Altas组件名称进入组件详情页面,在右上角组件操作的下拉框中选择按钮。

(2)     组件检查结束后,检查窗中会显示组件检查成功或失败的状态。如图2-6所示,组件检查成功则表示该组件可正常使用。

图2-6 组件检查

 

(3)     组件检查结束后,在组件详情页面单击按钮,进入操作记录窗口。可查看“ATLAS Service Check”组件操作执行的详细信息以及操作日志详情,根据操作日志可判断组件检查的具体情况。

图2-7 组件检查日志详情

 

2.3  快速使用指导

DLH既可以通过集群用户访问,又可以通过组件超级用户访问。其中:

·     集群用户:指在大数据集群的[集群权限/用户管理]页面可查看到的用户,包括集群超级用户和集群普通用户。其中:

¡     集群超级用户:仅Hadoop集群拥有集群超级用户。新建Hadoop集群成功后,集群超级用户会自动同步到[集群权限/用户管理]页面,且对应描述为“集群超级用户”。

¡     集群普通用户:指在[集群权限/用户管理]页面新建的用户。开启权限管理后,普通用户绑定角色后即可拥有该角色所具有的权限;不开权限管理时,普通用户缺省仅拥有各组件原生的用户权限。

·     组件超级用户:指组件内部的最高权限用户,如Hive组件的hive用户。大数据集群中安装组件时均会缺省创建组件内置超级用户,也是集群用户的一种。

2.3.1  非Kerberos环境

说明: 说明

非Kerberos环境下,用户不需要做身份认证即可直接连接DLH执行相关管理操作。

 

非Kerberos环境下,通过控制台执行DLH SQL操作前需连接DLHServer2,连接操作支持“通过dlh命令连接”和“通过beeine脚本连接”两种方式。

1. 通过dlh命令连接

登录集群内的任一节点(默认所有节点均安装了DLH Client),直接执行dlh命令即可进行连接。通过该方式连接DLHServer2时会自动填入连接信息,连接用户使用当前用户。

图2-8 dlh命令连接

 

2. 通过beeine脚本连接

注意

执行连接操作的用户需具备相应的脚本执行权限。

 

登录集群内的任一节点(默认所有节点均安装了DLH Client),执行如下命令:

/usr/hdp/3.0.1.0-187/dlh/hive/bin/beeline -u "{jdbc_url}" -n hive -p ""

【说明】

·     在beeline命令中:

¡     -u:指定连接DLHServer2的IP地址,端口号默认为13000。

¡     -n:指定连接所需用户名。

¡     -p:指定连接所需密码(可为空值)。

·     DLH JDBC连接URL信息获取包括以下两种方式:

¡     在DLH组件详情页面获取

访问大数据平台管理系统,进入DLH组件详情页面,在基本信息模块可查看DLHServer2 JDBC URL信息,如图2-9所示,直接拷贝即可。

图2-9 DLH组件JDBC URL

 

¡     手动拼接(未开启Kerberos)

jdbc:hive2://:13000/

其中:

-     jdbc:hive2://:为固定前缀。

-     为DLHServer2所在的机器IP地址或主机名(在集群外通过主机名连接HiveServer2时必须配置本地hosts文件)。

-     13000:为DLHServer2的默认端口号。

2.3.2  Kerberos环境

说明: 说明

Kerberos环境下,用户需要做身份认证才可直接连接DLH执行相关管理操作,认证方式请参见2.3.2  1. Kerberos环境下用户身份认证。

 

Kerberos环境下,通过控制台执行DLH SQL操作前需执行以下两步操作:

(1)     首先进行用户身份认证,才可以正常访问DLH。

(2)     连接DLHServer2,连接操作支持“通过dlh命令连接”和“通过beeine脚本连接”两种方式。

1. Kerberos环境下用户身份认证

如果大数据集群开启Kerberos,若想操作DLH,则必须首先进行用户身份认证。根据用户类型不同,分为以下两类:

·     集群用户身份认证

·     组件用户身份认证

(一)集群用户身份认证

说明: 说明

·     集群用户指在大数据集群的[集群权限/用户管理]页面可查看到的用户,包括集群超级用户和集群普通用户。

·     Kerberos环境下,集群用户的认证文件可在[集群权限/用户管理]页面,单击用户列表中用户对应的按钮进行下载。

 

DLH还可以通过集群用户访问。在开启Kerberos的大数据集群中进行集群用户(以user1用户示例)身份认证的方式,包括以下两种(根据实际情况任选其一即可):

(1)     方式一:(此方式不要求知道用户密码,直接使用keytab文件进行认证)

a.     将用户user1的认证文件(即keytab配置包)解压后,上传至访问节点的/etc/security/keytabs/目录下,然后将keytab文件的所有者修改为user1,命令如下:

chown user1 /etc/security/keytabs/user1.keytab

b.     使用klist命令查看user1.keytab的principal名称,命令如下:

klist -k user1.keytab

【说明】如图2-10所示,红框内容即为user1.keytab的principal名称。

图2-10 认证文件的principal名称

 

c.     切换至用户user1,并执行身份验证的命令如下:

su user1

kinit -kt user1.keytab [email protected]

【说明】其中:user1.keytab为用户user1的keytab文件,[email protected]为user1.keytab的principal名称。

d.     输入klist命令可查看认证结果。

图2-11 查看认证结果

 

(2)     方式二:(此方式要求用户密码已知,通过密码直接进行认证)

a.     切换至用户user1进行认证,输入以下命令:

su user1

kinit user1

b.     根据提示输入密码Password for [email protected]

c.     输入klist命令可查看认证结果。

图2-12 查看认证结果

 

(二)组件超级用户身份认证

DLH可以通过组件超级用户访问,在开启Kerberos的集群中进行组件超级用户(以hive用户示例)认证的步骤如下:

(1)     在集群内节点的/etc/security/keytabs/目录下,查找hive的认证文件“hive.service.keytab”。

【说明】在DLH Client节点上,需要将hive的认证文件“hive.service.keytab”上传节点的/etc/security/keytabs/目录下进行认证。

(2)     使用klist命令查看hive.service.keytab的principal名称,命令如下:

klist -k hive.service.keytab

【说明】如图2-13所示,红框内容即为hive.service.keytab的principal名称。

图2-13 认证文件的principal名称

 

(3)     切换至用户hive,并执行身份验证的命令如下:

su hive

kinit -kt hive.service.keytab hive/[email protected]

【说明】其中:hive.service.keytab为hive的认证文件,hive/[email protected]为hive.service.keytab的principal名称。

(4)     输入klist命令可查看认证结果。

图2-14 查看认证结果

 

2. 通过dlh命令连接

登录集群内的任一节点(默认所有节点均安装了DLH Client),直接执行dlh命令即可进行连接。通过该方式连接DLHServer2时会自动填入连接信息,连接用户使用当前用户。

图2-15 dlh命令连接

 

3. 通过beeine脚本连接

注意

执行连接操作的用户需具备相应的脚本执行权限。

 

登录集群内的任一节点(默认所有节点均安装了DLH Client),执行如下命令:

/usr/hdp/3.0.1.0-187/dlh/hive/bin/beeline -u "{jdbc_url}" -n hive -p ""

【说明】

·     在beeline命令中:

¡     -u:指定连接DLHServer2的IP地址,端口号默认为13000。

¡     -n:指定连接所需用户名。

¡     -p:指定连接所需密码(可为空值)。

·     DLH JDBC连接URL信息获取包括以下两种方式:

¡     在DLH组件详情页面获取

访问大数据平台管理系统,进入DLH组件详情页面,在基本信息模块可查看DLHServer2 JDBC URL信息,如图2-16所示,直接拷贝即可。

图2-16 DLH组件JDBC URL

 

¡     手动拼接URL(开启Kerberos)

jdbc:hive2:// :13000/;principal=hive/[email protected]

其中:

-     -u用于指定连接DLHServer2的地址和端口号。

-     DLHServer2的端口号默认为13000,principal名称为hive/[email protected](其中_HOST是DLHServer2进程所在主机的域名地址,principal获取方式请参考下面的说明),示例:hive/[email protected]

-     -n用于指定连接所需的用户名(可为空值)。

-     -p用于指定连接所需的密码(可为空值)。

说明

关于principal的配置值获取,可以登录大数据平台管理系统,进入DLH组件详情页面,在组件详情的[配置]页签,可查看配置文件dlh-site.xml中“hive.server2.authentication.kerberos.principal”的配置项获取。

 

2.4  快速链接使用

因DLH组件依赖Atlas组件,所以进行DLH管理时可能会需要访问这2个组件的快速链接页面。

1. DLH快速链接

DLH快速链接包括DLHServer监控和HistroyServer任务监控,分别访问不同的监控页面可以查看不同的信息。如图2-17所示,在DLH组件详情页面的右上角[快速链接]的下拉框中,可以获取不同监控页面的访问入口信息。

【说明】当集群开启高可用时,DLH组件会同步开启HA,此时部分监控页面有两个访问入口,任选其一即可。

图2-17 DLH快速链接

 

2. Atlas快速链接

Atlas快速链接为Atlas UI,提供了基于DLH库、表、列及血缘等元数据管理功能。如图2-18所示,在Atlas组件详情页面的右上角[快速链接]的下拉框中,可以获取Atlas UI访问入口信息。

【说明】当集群开启高可用时,Atlas组件会同步开启HA,此时组件UI页面有两个访问入口,任选其一即可。

图2-18 Atlas快速链接

 

2.4.1  配置组件快速链接

大数据集群部署完成后,若想要访问DLH或Atlas监控页面,首先需要修改本地hosts文件,用以确保组件的快速链接页面通过域名访问能够顺利跳转。

修改本地hosts文件的方法如下:

(1)     登录大数据集群中任意一节点,查看当前集群的hosts文件(Linux环境下位置为/etc/hosts)。

(2)     将集群的hosts文件信息添加到本地hosts文件中。若本地电脑是Windows环境,则hosts文件位于C:\Windows\System32\drivers\etc\hosts,修改该hosts文件并保存。

(3)     在本地hosts文件中配置主机域名信息完成后,即可访问组件的快速链接。

2.4.2  访问监控页面 1. DLHServer监控

在DLH组件详情页面的右上角[快速链接]的下拉框中选择DLHServer(如果多个快速链接,任选其一即可),此时不需要通过用户名和密码,即可直接跳转至DLHServer监控UI页面。

如图2-19所示,DLHServer主页监控页面显示的是当前链接的会话,包括:IP、用户名、当前执行的查询数量、链接总时长、空闲时长等。如果有会话执行查询,页面中的Queries模块会显示查询的语句、执行耗时等。

图2-19 访问DLHServer UI页面

 

2. HistroyServer任务监控

在DLH组件详情页面的右上角[快速链接]的下拉框中选择HistoryServer,根据集群是否开启Kerberos,访问HistoryServer快速链接分为两种情况:

·     若集群没有开启Kerberos认证,则此时点击访问入口链接,不需要通过用户名和密码,即可直接跳转访问HistoryServer任务监控UI页面。

·     若集群开启了Kerberos认证,则此时点击访问入口链接,需要输入用户名和密码进行认证(可以使用集群创建时填写的超级用户,也可以使用用户管理中创建的用户),然后才可跳转访问对应的UI页面。

在HistoryServer监控页面,可查看如下信息:

(1)     如图2-20所示,History Server页面默认展示已完成的历史任务列表。单击页面左下角“Show incomplete applications”可跳转到正在运行的任务列表。

图2-20 访问HistroyServer UI页面

 

(2)     单击任务列表中的App ID,可进入该任务的Jobs列表页面,可对Completed Jobs已完成的任务查看详情。

图2-21 Spark Jobs列表页面

 

(3)     在Spark Jobs列表页面,单击“Completed Jobs”列表中的任意Job,可进入该Job的详情页面,查看Stage列表。

图2-22 Job详情页面

 

(4)     在Job详情页面,单击“Completed Stages”列表中任意Stage,可进入该Stage的详情页面,查看每个task的执行时间、GC时间等。

图2-23 Stage详细页面

 

3. Atlas元数据管理

DLH基于Atlas提供了Hive元数据管理功能,对DLH内部的数据库、数据表、数据流、数据列以及列血缘等元数据提供了查看基本属性(在基本属性中可以在业务层面增加属性以及标签)、血缘分析以及业务类别分类等能力。

访问DLH的Atlas元数据管理页面的方式如下:

(1)     在集群管理页面的左侧导航树中选择[集群列表],进入集群列表页面,单击集群名称可跳转至集群详情页面。

(2)     在集群详情页面选择[组件]页签,单击组件列表中Atlas组件名进入Atlas组件详情页面。

(3)     在Atlas组件详情页面的右上角[快速链接]的下拉框中,选择Atlas(如果多个快速链接,任选其一即可),可跳转至DLH对应的Atlas元数据管理登录页面,此时需要通过admin用户和密码(admin用户的密码为CloudOS5#DE3@Atlas),才可访问Atlas元数据管理UI页面。

说明

本章节仅对Atlas元数据管理UI页面进行简单说明,更多关于Atlas原生UI界面的功能特性说明请参见atlas官网。

 

下面对Atlas元数据管理UI页面进行说明:

(1)     Atlas元数据管理UI页面主要分为左右两部分,左部分为功能模块菜单,包括Entities、Classification、Business Metadata、Glossaries和Custom Filters,右部分为功能展示区。

(2)     如图2-24所示,在Entities菜单中,单击具体的实体元数据(如:hive_table(4))则显示对应表的元数据信息。如图2-25所示,在表的元数据信息中再单击具体的表Name可以跳转至表的详情页面。上面中间的搜索框可以对Entities菜单中的各类实体元数据进行全局搜索。

图2-24 Entities菜单

 

图2-25 通过表Name查看表的详情

 

(3)     如图2-26所示,在Classifications菜单中,可以添加类别,供Entities中具体的元数据使用。

图2-26 Classifications菜单

 

(4)     如图2-27所示,在Business Metadata菜单中,可以添加业务元数据、枚举值等进行管理,业务元数据可以供Entities中具体的元数据使用。

图2-27 Business Metadata菜单

(5)     如图2-28所示,在Glossaries菜单中,可以管理词汇表,词汇表可以运用在Entities中具体的元数据中。

图2-28 Glossaries菜单

 

(6)     如图2-29所示,在Custom Filters菜单中,显示保存的自定义查询,在此处可以进行重命名、删除以及快速定位原来查询条件的查询结果。

图2-29 Custom Filters菜单

 

3 使用指南 3.1  离线查询&交互式查询

DLH离线查询功能完全兼容Hive,同时支持基于Presto的交互式查询功能。并且,当前版本中,已实现离线查询和交互式查询引擎的自动选择功能。

3.1.1  语法介绍 1. 离线查询语法 DLH基于Hive开发,离线查询语法完全兼容Hive语法。 2. 交互式查询语法

DLH基于Presto提供交互式查询能力,SQL语句兼容Presto基本查询语法。另外,Presto自身也支持CLI和JDBC接口进行查询。

说明: 说明

·     Presto查询语法请参见Trino官网(v316)。

·     外部数据源表示数据源和DLH组件不属于同一个大数据集群。DLH不支持对外部数据源的库表查询引擎切换。

 

3.1.2  配置说明

DLH对非外部数据源的库表进行SQL查询时,可根据业务需要选择合适的执行引擎,引擎选择开关由表3-1中所示配置决定。

表3-1 离线查询&交互式查询执行引擎相关配置

配置项

默认值

说明

dlh.query.engine

hive

执行引擎选项支持hive、presto、auto。该值为hive表示执行引擎固定为Hive,该值为presto表示执行引擎固定为Presto,该值为auto时默认执行引擎为Presto(若使用Presto执行失败或表大小超设定阈值则转交至Hive执行)

hive.presto.table.threshold

10737418240

表大小默认阈值为10GB,单位为B。当执行引擎为auto时,若表容量小于该阈值则查询走presto执行,若表容量大于该阈值则查询走hive执行

hive.presto.http-server.http.port

-

Presto的协调器节点的地址,比如:http://localhost:18089。若集群开启了HA,可配置为虚拟主机名及代理端口,比如:http://testclr-vserver.vip.hde.com:18090,这部分信息可以从Presto组件的自定义config配置http.proxy.url获取

 

3.1.3  示例

(1)     默认配置下,DLH非外部数据源的库表查询默认由Hive执行

0: jdbc:hive2://qcj39.hde.com:2181,qcj40.hde.> set dlh.query.engine;

+----------------------+

|        set           |  

+----------------------+

| dlh.query.engine=hive|

+----------------------+

1 row selected (0.017 seconds)

(2)     设置DLH非外部数据源的库表查询引擎为Presto,此时SQL查询由交互式查询引擎执行

0: jdbc:hive2://qcj39.hde.com:2181,qcj40.hde.> set dlh.query.engine = presto;

No rows affected (0.01 seconds)

(3)     执行SQL查询,此时查询任务由Presto完成(可通过DLH组件管理界面的快速链接查看对应任务)

0: jdbc:hive2://qcj39.hde.com:2181,qcj40.hde.> select count(*) from reason;

+-----+

| c0  |

+-----+

| 35  |

+-----+

1 row selected (2.877 seconds)

(4)     设置DLH非外部数据源的库表查询引擎为auto,表大小阈值为1GB(表大小默认阈值为10GB),此时超过1G的表走Hive执行,小于1G的表走Presto执行

0: jdbc:hive2://qcj39.hde.com:2181,qcj40.hde.> set dlh.query.engine = auto;

No rows affected (0.01 seconds)

0: jdbc:hive2://qcj39.hde.com:2181,qcj40.hde.> set hive.presto.table.threshold=10485760;

No rows affected (0.01 seconds)

3.2  外部数据源

说明

·     外部数据源表示数据源和DLH组件不属于同一个大数据集群。

·     外部数据源配置存放资源文件的路径时(如:hdfs-site.xml、core-site.xml或认证文件krb5.conf、keytab等),仅支持/etc、/opt、/usr/hdp、/tmp、/apps/presto及其子目录。

·     外部数据源分析基于Presto实现,在DLH中执行需要把执行引擎设置为presto。

 

DLH组件支持对Hive、SeaSQL MPP、DataEngine MPP、MySQL、HBase等外部数据源进行跨源查询,避免数据迁移。

3.2.1  语法介绍

·     注册数据源

create catalog (if not exists) using with properties ("" = "", "" = "", ....);

其中:

¡     catalogType支持Hive、HBase、SeaSQL MPP、DataEngine MPP、MySQL等,不区分大小写。

¡     注册数据源时,不同的数据源需要的配置项不同,详情请参见3.2.2  ~3.2.6  章节。比如,注册hive数据源时,命令如下:

create catalog if not exists default using hive with properties ( "hive.metastore.uri" = "thrift://node1.hde.com:19083","hive.config.resources"="/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml","hive.allow-drop-table"="true","hive.allow-rename-table"="true","hive.parquet.use-column-names"="true");

·     获取所有的数据源信息

show catalogs;

·     切换数据源

use catalog ;

·     删除数据源

drop catalog ;

·     切换到数据源下的某个数据库(或schema)

use .;

·     查看某个数据源的创建数据源语句

show create catalog ;

·     查看某个数据源下面的所有库列表

show databases in 或show databases from ;

·     查看某个数据源下面的所有表列表

show tables in .;

·     读取某个数据源下的某个库下的某个表数据

select * from ..;

3.2.2  Hive数据源

Hive数据源支持通过DLH组件进行创库、建表、查询的操作。

1. 基础配置

表3-2 Hive数据源相关配置

配置项

默认值

说明

hive.metastore.uri

-

使用Thrift协议连接Hive元存储的URL。如果提供了多个URL,则默认使用第一个URL。该属性必选。

示例:thrift://192.0.2.3:9083或thrift://192.0.2.3:9083,thrift://192.0.2.4:9083

hive.config.resources

-

HDFS配置文件列表,以逗号分隔。该文件必须存在于部署DLH组件的所有节点上。

示例:/etc/hdfs-site.xml

hive.storage-format

ORC

创建新表时采用的默认文件格式,支持ORC、PARQUET、AVRO、RCBINARY、SEQUENCEFILE、JSON、TEXTFILE、CSV

hive.parquet.use-column-names

false

配置为true时,查询读取Parquet存储格式的表(如hudi表)时,使用列名而非下标进行组装数据,可避免元数据schema修改后列名与数据对应错位的问题

 

2. 安全相关配置

通过在Hive目录属性文件中设置hive.security属性可以启用Hive的授权检查,hive.security属性必须是表3-3中的值。

表3-3 Hive数据源安全相关配置

属性值

说明

legacy(默认值)

授权检查很少执行,因此大多数操作都是允许的。使用配置属性hive.allow-drop-table、hive.allow-rename-table、hive.allow-add-column、hive.allow-drop-column和hive.allow-rename-column来限定数据源是否进行相应操作

read-only

允许读数据和读取元数据的操作(如SELECT),不允许写数据或者写元数据的操作(如CREATE、INSERT或DELETE)

 

3. Kerberos相关配置

Hive数据源开启Kerberos时,需要的额外配置如表3-4所示。

表3-4 Kerberos环境下额外配置

配置项

默认值

说明

hive.metastore.authentication.type

NONE

Hive元存储身份验证类型,可选择:NONE或KERBEROS

hive.metastore.service.principal

-

Hive元存储服务的Kerberos主体,Hive数据源开启Kerberos时需要。此属性值中可使用_HOST占位符

示例:hive/[email protected]

hive.metastore.client.principal

-

在连接到Hive元存储服务时将使用的Kerberos主体,Hive数据源开启Kerberos时需要

示例:[email protected]

hive.metastore.client.keytab

-

Hive元存储客户端的keytab位置,Hive数据源开启Kerberos时需要

hive.metastore.krb5.conf.path

-

Kerberos服务器地址,Hive数据源开启Kerberos时需要

hive.hdfs.authentication.type

-

身份验证类型,可选择:NONE或KERBEROS

hive.hdfs.presto.principal

-

HDFS客户端Kerberos主体,Hive数据源开启Kerberos时需要

hive.hdfs.presto.keytab

-

HDFS客户端Kerberos认证文件,Hive数据源开启Kerberos时需要

 

说明: 注意

Hive数据源开启Kerberos时:

·     若想通过DLH集群访问Hive外部数据源,需要进行相关互信配置,详情请参见3.2.2  4. (2)示例2:创建并访问开启Kerberos的Hive数据源示例。

·     通过DLH组件查看外部Hive数据源的库表时,与通过Hive查看时数据类型有所不同。比如:show databases时,会多显示一个information_schema数据库。

 

4. 示例

(1)     示例1:创建并访问未开启Kerberos的Hive外部数据源

a.     将Hive外部数据源对应的HDFS配置文件:core-site.xml文件和hdfs-site.xml文件,拷贝到安装DLH组件的所有节点上(示例:拷贝至/opt/190目录下)

b.     将Hive外部数据源集群节点的映射关系(hosts文件内容)添加到安装DLH组件的所有节点上的/etc/hosts文件中

c.     后台登录DLH控制台,创建名为hive2数据源,该数据源访问集群中xldwhj190.hde.com节点的hive

create catalog hive2

using hive with properties (

"hive.metastore.uri" = "thrift://xldwhj190.hde.com:9083",

"hive.config.resources" = "/opt/190/core-site.xml,/opt/190/hdfs-site.xml"

);

d.     切换数据源为hive2

use catalog hive2;

e.     创建名为db2的数据库

create database db2;

f.     切换到hive2.db2的数据库下

use hive2.db2;

g.     创建t11表

create table t11(id int, name string);

h.     查看t11表的建表语句

show create table t11;

i.     插入一条记录

insert into t11 values(0, 'a');

j.     查询t11表中数据

select * from t11;

 

(2)     示例2:创建并访问开启Kerberos的Hive数据源

a.     将Hive外部数据源对应的HDFS配置文件:core-site.xml文件和hdfs-site.xml文件,拷贝到安装DLH组件的所有节点上

b.     将Hive外部数据源集群节点的映射关系(hosts文件内容)添加到安装DLH组件的所有节点上的/etc/hosts文件中

c.     登录大数据集群的管理系统,在[集群权限/用户管理]页面下载对应外部集群用户的认证文件

d.     将下载的外部集群用户认证文件,拷贝到安装DLH组件的所有节点上(路径为创建catalog时指定的路径)

e.     配置krb5.conf文件,示例如图3-1所示,有两种方式:

-     方式一:将目的数据源集群的krb5.conf文件拷贝到安装DLH组件的所有节点上(路径为创建catalog时指定的路径)。

-     方式二:是将目的数据源集群的krb5.conf文件中并入到数据湖所在集群的/etc/krb5.conf文件,realms内容直接拷贝,domain信息需要将目的集群和本集群的机器名逐一采用机器名=集群名的格式进行编辑(仅修改本地集群即可)。

图3-1 配置/etc/krb5.conf文件

 

f.     后台登录DLH控制台,创建一个名为hive190的数据源,该数据源可以访问目的集群的Hive

create catalog hive190 using hive with properties (

"hive.metastore.uri" = "thrift://xldwhj190.hde.com:9083",-- 外部数据源HiveMetastore地址,多个英文逗号隔开

"hive.allow-drop-table" = "true",

"hive.config.resources" = "/opt/190/core-site.xml,/opt/190/hdfs-site.xml",

"hive.hdfs.authentication.type" = "KERBEROS",

"hive.metastore.authentication.type" = "KERBEROS",

"hive.metastore.service.principal" = "hive/[email protected]",--固定格式,域名为目的集群的realm

"hive.hdfs.presto.principal" = "[email protected]",-- 目的集群用户principal

"hive.hdfs.presto.keytab" = "/opt/190/admin123.keytab",-- 目的集群用户keytabl

"hive.metastore.krb5.conf.path" = "/etc/krb5.conf",

"hive.metastore.client.keytab" = "/opt/190/admin123.keytab",-- 目的集群用户keytab

"hive.metastore.client.principal" = "[email protected]"--目的集群用户principal

);

(3)     示例3:创建指定存储格式的数据源(根据需要添加开启Kerberos或未开启Kerberos所需要的参数项)

其中:"hive.storage-format" = "PARQUET"属性用于指定存储格式。

create catalog hive3 using hive with properties (

"hive.metastore.uri" = "thrift://xldwhj190.hde.com:9083",

"hive.storage-format" = "PARQUET",

"hive.config.resources" = "/opt/190/core-site.xml,/opt/190/hdfs-site.xml"

……

);

验证建表后,查看表存储格式是否为parquet:

create table t12(id int, name string);

show create table t12;

(4)     示例4:创建安全模式为legacy的数据源(根据需要添加开启Kerberos或未开启Kerberos所需要的参数项)

其中:"hive.allow-drop-table" = "true","hive.allow-drop-column"="true","hive.allow-rename-column"="true","hive.allow-add-column"="true","hive.allow-rename-table"="true"等属性用于指定安全模式为legacy。

……

create catalog hive4 using hive with properties (

"hive.metastore.uri" = "thrift://xldwhj190.hde.com:9083",

"hive.config.resources" = "/opt/190/core-site.xml,/opt/190/hdfs-site.xml",

"hive.allow-drop-table" = "true",

"hive.allow-drop-column"="true",

"hive.allow-rename-column"="true",

"hive.allow-add-column"="true",

"hive.allow-rename-table"="true"

……

);

验证删除表、删除列、表重命名、添加列、列重命名是否可行(True表示支持):

alter table t11 RENAME TO t11new;

alter table t11 add column age string;

alter table t11 RENAME COLUMN name TO namenew ;

alter table t11 DROP COLUMN id;

drop table t11new;

(5)     示例5:创建安全模式为read-only的数据源(根据需要添加开启Kerberos或未开启Kerberos所需要的参数项)

其中:"hive.security" = "read-only"属性用于指定安全模式为read-only。

create catalog hive3 using hive with properties (

"hive.metastore.uri" = "thrift://xldwhj190.hde.com:9083",

"hive.config.resources" = "/opt/190/core-site.xml,/opt/190/hdfs-site.xml",

"hive.security" = "read-only"

……

);

验证insert、alter等操作是否已经不可用:

insert into t11 values(0, 'a');

3.2.3  SeaSQL MPP数据源

SeaSQL MPP数据源支持通过DLH组件进行查询和创建表。

说明: 注意

SeaSQL MPP数据源不支持的功能包括:CREATE DATABASE、CREATE SCHEMA、UPDATE、DELETE、GRANT、REVOKE、SHOW GRANTS、SHOW ROLES、SHOW ROLE GRANTS等语法。

 

1. 数据源配置

通过表3-5所示参数,可以配置SeaSQL MPP数据源连接器。

表3-5 SeaSQL MPP数据源相关配置

配置项

默认值

说明

connection-url

-

连接SeaSQL MPP的地址,比如:jdbc:postgresql://example.net:5432/database

connection-user

-

连接SeaSQL MPP的用户名,比如:root

connection-password

-

连接SeaSQL MPP的密码,比如:secret

 

2. 示例

创建SeaSQL MPP数据源并对数据源进行查看数据库、创建表等操作。

·     创建一个名叫seqsql1的数据源

create catalog seqsql1 using seasql with properties (

"connection-url"="jdbc:postgresql://10.121.88.114:5434/postgres", "connection-user"="ssadmin", "connection-password"="passwd"

);

·     查看seqsql1下的数据库

show databases from seqsql1;

·     查看seqsql1.public下的表

show tables  from seqsql1.public;

·     创建一个t1的表

create table seqsql1.public.t1 (i int , s string);

·     向表t1中插入一条记录

insert into seqsql1.public.t1 values(0, 'a');

·     查询表t1中数据

select * from seqsql1.public.t1;

3.2.4  DataEngine MPP数据源

DataEngine MPP数据源支持通过DLH组件进行查询和创建表。

说明: 注意

DataEngine MPP数据源不支持的功能包括:CREATE DATABASE、CREATE SCHEMA、UPDATE、DELETE、GRANT、REVOKE、SHOW GRANTS、SHOW ROLES、SHOW ROLE GRANTS等语法。

 

1. 数据源配置

通过表3-6所示参数,可以配置DataEngine MPP数据源连接器。

表3-6 DataEngine MPP数据源相关配置

配置项

默认值

说明

connection-url

-

连接DataEngine MPP的地址,比如:jdbc:vertica://example.net:5433/database

connection-user

-

连接DataEngine MPP的用户名,比如:root

connection-password

-

连接DataEngine MPP的密码,比如:secret

 

2. 示例

创建DataEngine MPP数据源并对数据源进行查看数据库、创建表等操作。

·     创建一个名叫test01的数据源

create catalog test01 using vertica with properties (

"connection-url"="jdbc:vertica://10.121.66.152:5433/Database", "connection-user"="dbadmin", "connection-password"="passwd"

);

·     查看test01下的模式

show databases from test01;

·     查看test01.public下的表

show tables  from test01.public;

·     切换到test01.public模式下

use test01.public;

·     创建一个test1的表

create table test1(i int ,s  string);

·     向表test1中插入一条记录

insert into  test1 values(0, 'a');

·     查询表test1中数据

select * from vertica1.public. test1;

3.2.5  MySQL数据源

MySQL数据源支持通过DLH组件进行查询和创建表。

说明: 注意

MySQL数据源不支持的功能包括:CREATE DATABASE、CREATE SCHEMA、UPDATE、DELETE、GRANT、REVOKE、SHOW GRANTS、SHOW ROLES、SHOW ROLE GRANTS等语法。

 

1. 数据源配置

通过表3-7所示参数,可以配置MySQL数据源连接器。

表3-7 MySQL数据源相关配置

配置项

说明

connection-url

连接MySQL的地址,比如:jdbc:mysql://example.net:3306

connection-user

连接MySQL的用户名,比如:root

connection-password

连接MySQL的密码,比如:secret

 

2. 示例

创建MySQL数据源并对数据源进行查看数据库、创建表等操作。

·     创建一个名叫mysql12的数据源

create catalog mysql12 using mysql with properties (

"connection-url"="jdbc:mysql://10.121.65.63:53306", "connection-user"="user01", "connection-password"="passwd"

);

·     查看mysql12下的数据库

show databases from mysql12;

·     查看mysql12.ambari_hivedebug0908下的表

show tables  from mysql12.hivedebug0908;

·     创建一个a的表

create table mysql12.hivedebug0908.a(a string);

·     向表a中插入一条记录

insert into mysql12.hivedebug0908.a values('test');

·     查询表a中数据

select * from mysql12.hivedebug0908.a;

3.2.6  HBase数据源

支持通过DLH在HBase实例上查询和创建表。用户可以在HBase连接器中创建表,并映射到HBase Cluster中已有的表(或创建新表),支持insert、select和delete操作。HBase连接器维护着一个元存储,用于持久化HBase元数据,目前此元存储采用MySQL。

说明: 注意

·     基本上支持所有的SQL语句,包括创建、查询、删除模式、添加、删除、修改表、插入数据、删除行等。但不支持对表进行重命名、不支持删除列。

·     HBase没有提供接口来检索表的元数据,所以show tables只能显示用户已与HBase数据源建立关联的表。

·     支持10种数据类型:VARCHAR、TINYINT、SMALLINT、INTEGER、BIGINT、DOUBLE、BOOLEAN、TIME、DATE和TIMESTAMP。

·     HBase连接器支持下推大部分运算符,比如基于RowKey的点查询、基于RowKey的范围查询等。此外,还支持通过谓词条件进行下推,比如:=、>=、>、 表属性 > dlh-site.xml > 默认值。

表3-13 dlh-site属性说明

属性

描述

hoodie.upsert.shuffle.parallelism

upsert并发数,设置为10,可通过表属性或会话设置覆盖

hoodie.insert.shuffle.parallelism

insert并发数,设置为10,可通过表属性或会话设置覆盖

hoodie.blukinsert.shuffle.parallelism

blukinsert并发数,设置为10,可通过表属性或会话设置覆盖

hoodie.delete.shuffle.parallelism

delete并发数,设置为10,可通过表属性或会话设置覆盖

hoodie.datasource.write.hive_style_partitioning

默认true

hoodie.datasource.write.partitionpath.urlencode

默认true

 

表3-14 表属性说明

属性

描述

hoodie.datasource.write.table.type

Hudi表类型,可以设置COPY_ON_WRITE或MERGE_ON_READ两种类型,缺失时默认为COPY_ON_WRITE

hoodie.datasource.write.recordkey.field

Hudi表记录主键,value值需要在表的列中

hoodie.datasource.write.precombine.field

数据集插入前合并记录的判断字段,在recode字段值相当的情况下,默认取该字段数值大的记录进行插入

hoodie.table.name

Hudi表名,缺省时默认取Hive表名

hoodie.mor.isRealTimeInputFormat

hoodie.datasource.write.table.type=MERGE_ON_READ时,该属性设置为true,创建mor类型的rt表,否则创建ro表

 

表3-15 会话属性说明

属性

描述

hoodie.upsert.shuffle.parallelism

upsert并发数,设置为10,可通过表属性或会话设置覆盖

hoodie.insert.shuffle.parallelism

insert并发数,设置为10,可通过表属性或会话设置覆盖

hoodie.blukinsert.shuffle.parallelism

blukinsert并发数,设置为10,可通过表属性或会话设置覆盖

hoodie.delete.shuffle.parallelism

delete并发数,设置为10,可通过表属性或会话设置覆盖

hoodie.datasource.write.operation

Hudi表插入方式,取值为upsert、insert、blukinsert、delete等,缺失时默认为upsert。删除操作时,该值默认为delete

 

3.3.5  SQL操作 1. 建表

遵循Hive原生建表语句,通过扩展stored类型和使用表属性来控制Hudi表的创建特性。

功能描述:

·     支持分区表、内部表、外部表语法。

·     建表语句可缺失内置字段,但是建表后查看表可以看到Hudi内置字段。建表语句如果包含内置字段,类型必须为String类型。

·     建表时需首先明确表明stored as hudi来标明表为Hudi表。

·     建表时可以通过表属性hoodie.table.name设置hoodie表名,可以与Hive表使用不同的名称。

·     建表时可以通过表属性hoodie.datasource.write.table.type设置Hudi表存储类型,默认缺失时为COPY_ON_WRITE类型。

·     可以通过设置表属性hoodie.datasource.write.table.type=MERGE_ON_READ和hoodie.mor.isRealTimeInputFormat=true 创建hudi存储类型为MERGE_ON_READ的rt视图表。

·     MERGE_ON_READ存储类型的Hudi表,如果需要通过Hive SQL进行读优化和近实时视图查询,则需要同时建立2个Hive表同时读取同一套Hudi数据,此时需要保证Hive建表语句中只有hoodie.mor.isRealTimeInputFormat=true有差异,且hoodie.table.name必须配置。

·     建表时表属性hoodie.datasource.write.recordkey.field与hoodie.datasource.write.precombine.field的值必须在表字段中,且不是hudi内置字段。

 

语法:

CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name   

  [(col_name data_type [COMMENT col_comment], ... [constraint_specification])]

  [COMMENT table_comment]

  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]

  STORED AS HUDI

  [LOCATION hdfs_path]

  [TBLPROPERTIES (property_name=property_value, ...)]

[AS select_statement];  

 

表3-16 参数说明

表属性

描述

hoodie.datasource.write.table.type

Hudi表类型,可以设置COPY_ON_WRITE或MERGE_ON_READ两种类型,缺失时默认为COPY_ON_WRITE

hoodie.datasource.write.recordkey.field

Hudi表记录主键,value值需要在表的列中,支持多主键,以逗号分隔

hoodie.datasource.write.precombine.field

数据集插入前合并记录的判断字段,在recode字段值相当的情况下,默认取该字段数值大的记录进行插入,value值需要在表的列中

hoodie.table.name

Hudi表名,缺省时默认取Hive表名

hoodie.mor.isRealTimeInputFormat

hoodie.datasource.write.table.type=MERGE_ON_READ时,该属性设置为true,创建mor类型的rt表,否则创建ro表

 

示例:

·     创建内部表

CREATE TABLE IF NOT EXISTS `hudi_inner`(

 `fare` bigint,

 `rider` string,

 `ts` double,

 `uuid` string)

  STORED AS hudi

 TBLPROPERTIES(  

 'hoodie.datasource.write.recordkey.field'= 'uuid',

 'hoodie.datasource.write.precombine.field'='ts');

·     创建外部表

CREATE EXTERNAL TABLE  IF NOT EXISTS `hudi_outer`(

 `fare` bigint,

 `rider` string,

 `ts` double,

 `uuid` string)

 STORED AS hudi

 TBLPROPERTIES(  

 'hoodie.datasource.write.recordkey.field'= 'uuid',

 'hoodie.datasource.write.precombine.field'='ts');

·     创建分区表

CREATE EXTERNAL TABLE  IF NOT EXISTS `hudi_part`(

 `fare` bigint,

 `rider` string,

 `ts` double,

 `uuid` string)

 PARTITIONED BY (`driver` string,`name` string)

 STORED AS hudi

 TBLPROPERTIES(  

 'hoodie.datasource.write.recordkey.field'= 'uuid',

 'hoodie.datasource.write.precombine.field'='ts');

·     缺失表属性hoodie.datasource.write.table.type 创建cow类型表

CREATE EXTERNAL TABLE  IF NOT EXISTS `hudi_trips_cow`(

 `fare` bigint,

 `rider` string,

 `ts` double,

 `uuid` string)

 PARTITIONED BY (`driver` string,`name` string)

 STORED AS hudi

 LOCATION '/tmp/hudi_trips_cow'

 TBLPROPERTIES(  

 'hoodie.datasource.write.recordkey.field'= 'uuid',

 'hoodie.datasource.write.precombine.field'='ts');

·     指定表属性'hoodie.datasource.write.table.type' = 'COPY_ON_WRITE',创建cow类型表

CREATE EXTERNAL TABLE  IF NOT EXISTS `hudi_trips_cow`(

 `fare` bigint,

 `rider` string,

 `ts` double,

 `uuid` string)

 PARTITIONED BY (`driver` string,`name` string)

 STORED AS hudi

 LOCATION '/tmp/hudi_trips_cow'

 TBLPROPERTIES(

 'hoodie.datasource.write.table.type' = 'COPY_ON_WRITE',   

 'hoodie.datasource.write.recordkey.field'= 'uuid',

 'hoodie.datasource.write.precombine.field'='ts');

·     指定表属性'hoodie.table.name' = xxx',创建cow类型表

CREATE EXTERNAL TABLE  IF NOT EXISTS `hudi_trips_cow`(

 `fare` bigint,

 `rider` string,

 `ts` double,

 `uuid` string)

 PARTITIONED BY (`driver` string,`name` string)

 STORED AS hudi

 LOCATION '/tmp/hudi_trips_cow'

 TBLPROPERTIES(

'hoodie.table.name' = 'hudi_trips_cow1 ',

 'hoodie.datasource.write.table.type' = 'COPY_ON_WRITE',   

 'hoodie.datasource.write.recordkey.field'= 'uuid',

 'hoodie.datasource.write.precombine.field'='ts');

·     创建mor的ro表

CREATE EXTERNAL TABLE  IF NOT EXISTS `default`.`hudi_trips_mor`(

 `fare` bigint,

 `rider` string,

 `ts` double,

 `uuid` string)

 PARTITIONED BY (`driver` string)

 STORED AS hudi

 LOCATION '/tmp/hudi_trips_mor'

 TBLPROPERTIES (

'hoodie.table.name' = 'hudi_trips_mor ',

 'hoodie.datasource.write.table.type' = 'MERGE_ON_READ',

 'hoodie.datasource.write.recordkey.field'= 'uuid',

 'hoodie.datasource.write.precombine.field'='ts');

·     创建mor的rt表

CREATE EXTERNAL TABLE  IF NOT EXISTS `default`.`hudi_trips_mor_rt`(

 `fare` bigint,

 `rider` string,

 `ts` double,

 `uuid` string)

 PARTITIONED BY (`driver` string)

 STORED AS hudi

 LOCATION '/tmp/hudi_trips_mor'

 TBLPROPERTIES(

'hoodie.table.name' = 'hudi_trips_mor ',

 'hoodie.mor.isRealTimeInputFormat' = 'true',

 'hoodie.datasource.write.table.type' = 'MERGE_ON_READ',

 'hoodie.datasource.write.recordkey.field'= 'uuid',

 'hoodie.datasource.write.precombine.field'='ts');

·     创建多主键表

CREATE TABLE IF NOT EXISTS `hudi_inner_multi`(

 `fare` bigint,

 `rider` string,

 `ts` double,

 `uuid` string)

  STORED AS hudi

 TBLPROPERTIES(  

 'hoodie.datasource.write.recordkey.field'= 'uuid,fare',

 'hoodie.datasource.write.precombine.field'='ts');

·     复制表结构

create table `t1` like `hudi_trips_cow`;

 

2. 插入

功能说明:

·     支持insert into values语法

·     支持insert into select 语法

·     支持insert overwrite values 语法

·     支持insert overwrite select 语法

·     支持分区表动静结合使用

·     可以通过设置hoodie.datasource.write.operation定义hoodie的写操作方式,默认为upsert

·     支持通过hoodie属性调整并发数等

 

表3-17 参数说明

表属性

描述

hoodie.datasource.write.operation

Hudi表插入方式,取值为upsert、insert、blukinsert、delete等,缺失时默认为upsert。删除操作时,该值默认为delete

hoodie.upsert.shuffle.parallelism

upsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖

hoodie.insert.shuffle.parallelism

insert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖

hoodie.blukinsert.shuffle.parallelism

blukinsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖

 

示例:

·     插入表数据

insert into hudi_trips_cow partition (driver='driver-213',name) values(58,'sdf',1.8,25,'nihao');

·     使用会话重置属性,插入表数据

set hoodie.datasource.write.operation=upsert;

set hoodie.upsert.shuffle.parallelism=2;

insert into hudi_trips_cow partition (driver='driver-213',name) values(58,'sdf',1.8,25,'nihao');

·     将查询结果插入表中

insert into table hudi_trips_cow partition (driver,name) select fare,rider,ts,uuid,'driver-214',name from hudi_trips_cow_test2;

·     覆盖插入

Insert overwrite table hudi_trips_cow partition (driver='driver-213',name) values(56,'sdf',1.8,25,'sdf'),(57,'sdf',1.9,25,'nihao');

·     指定字段插入

insert into hudi_inner (uuid,ts) values('uid1', 22.0);

·     插入多主键表

insert into hudi_inner_multi(uuid,ts,fare) values('uid1', 22.0, 100);

 

3. 更新

功能说明:

·     遵循Hive标准update语法,需要手动开启DLH的事务特性(DLH默认是关闭Hive的事务特性的),开启方式如下:

¡     设置dlh-site中的hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

¡     设置dlh-site中的hive.support.concurrency=true

配置修改完成后,重启DLH组件。

 

表3-18 参数说明

表属性

描述

hoodie.datasource.write.operation

Hudi表插入方式,取值为upsert、insert、blukinsert、delete等,缺失时默认为upsert。删除操作时,该值默认为delete

hoodie.upsert.shuffle.parallelism

upsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖

hoodie.insert.shuffle.parallelism

insert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖

hoodie.blukinsert.shuffle.parallelism

blukinsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖

 

示例:

·     更新表数据

update hudi_trips_cow set rider='test' where uuid='25';

·     使用UDF函数更新表数据

update hudi_trips_cow set fare=length('test') where uuid='25';

·     使用会话重置属性,更新表数据

set hoodie.datasource.write.operation=upsert;

set hoodie.upsert.shuffle.parallelism=3;

update hudi_trips_cow set fare=length('test') where uuid='25';

 

4. 删除

功能说明:

·     遵循Hive标准delete语法,需要手动开启DLH的事务特性(DLH默认是关闭Hive的事务特性的),开启方式如下:

¡     设置dlh-site中的hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

¡     设置dlh-site中的hive.support.concurrency=true

配置修改完成后,重启DLH组件。

 

表3-19 参数说明

表属性

描述

hoodie.datasource.write.operation

Hudi表插入方式,取值为upsert、insert、blukinsert、delete等,缺失时默认为upsert。删除操作时,该值默认为delete

hoodie.upsert.shuffle.parallelism

upsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖

hoodie.insert.shuffle.parallelism

insert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖

hoodie.blukinsert.shuffle.parallelism

blukinsert并发数,全局(hive-site.xml)设置为10,可通过表属性或会话设置覆盖

 

示例:

·     删除表数据

delete from hudi_trips_cow where uuid='25';

·     重置删除相关属性,删除表数据

set hoodie.delete.shuffle.parallelism=3;

delete from hudi_trips_cow where uuid='25';

 

5. 新增列

功能说明:

·     cow表支持,mor表不支持

·     遵循Hive标准add COLUMNS语法

 

示例:

·     表hudi_trips_cow新增列名为test,类型String

alter table hudi_trips_cow add COLUMNS (`test` String);

 

6. 新增分区

功能说明:

·     遵循Hive标准add partition语法

 

示例:

·     新增表hudi_trips_cow分区driver='driver-213',name="sdf"

alter table hudi_trips_cow add partition(driver='driver-213',name="sdf") location '/tmp/hudi_trips_cow/driver=driver-213/name=sdf';

 

7. 删除分区

功能说明:

·     删除分区要求Hudi表是分区表,且是外部表。当Hudi表是分区表,且是内部表时,删除分区不支持

 

示例:

·     删除表hudi_trips_cow分区driver='driver-213',name="sdf"

alter table hudi_trips_cow drop partition(driver='driver-213',name="sdf");

 

8. 表重命名

功能说明:

·     只修改Hive表的名字,Hudi表名不变

 

示例:

·     重命名表名hudi_trips_cow为hudi_trips_cow1

alter table hudi_trips_cow rename to hudi_trips_cow1;

 

9. 近实时查询

功能说明:

·     支持COPY_ON_WRITE存储类型的Hudi表

·     支持MERGE_ON_READ存储类型的Hudi表,且Hive表必须是rt类型表

 

表3-20 参数说明

表属性

描述

hoodie.hudi_trips_cow.consume.mode

查询方式:SNAPSHOT、INCREMENTAL。近实时查询只能配置为SNAPSHOT

 

示例:

·     查询cow表

set hoodie.hudi_trips_cow.consume.mode=SNAPSHOT;

select * from hudi_trips_cow;

·     查询mor表(rt表)

set hoodie.hudi_trips_mor.consume.mode=SNAPSHOT;

select * from hudi_trips_mor_rt;

【说明】hudi_trips_mor为Hudi表名,hudi_trips_mor_rt为Hive表名,二者不一定相同,Hudi表名可以通过“show create table +Hive表名”进行查看。

 

10. 增量查询

功能说明:

·     查询前需要设置消费模式为INCREMENTAL

·     支持通过设置开始和增量值进行增量查询

 

表3-21 参数说明

表属性

描述

hoodie.hudi_trips_cow.consume.mode

查询方式:SNAPSHOT、INCREMENTAL。增量查询只能配置为INCREMENTAL

hoodie.hudi_trips_cow.consume.start.timestamp

INCREMENTAL模式下使用

hoodie.hudi_trips_cow.consume.max.commits

INCREMENTAL模式下使用

 

示例:

前提条件:查询时需确保消费模式为INCREMENTAL类型。

·     查询cow表

set hoodie.hudi_trips_cow.consume.mode=INCREMENTAL;

set hoodie.hudi_trips_cow.consume.start.timestamp=20201019165434;

set hoodie.hudi_trips_cow.consume.max.commits=1;

select * from hudi_trips_cow where `_hoodie_commit_time` > '20201019165434';

·     查询mor表(rt表)

set hoodie.hudi_trips_mor.consume.mode=INCREMENTAL;

set hoodie.hudi_trips_mor.consume.start.timestamp=20201019165434;

set hoodie.hudi_trips_mor.consume.max.commits=1;

select * from hudi_trips_mor_rt where `_hoodie_commit_time` > '20201019165434';

 

11. 读优化查询

功能说明:

·     查询前需要设置消费模式为SNAPSHOT

·     读优化查询只读取parquet数据。对于cow类型表,读优化查询就是实时查询;但是对于mor表,读优化查询只能查询到插入的数据,对于更新参数的log数据则无法查询。

 

表3-22 参数说明

表属性

描述

hoodie.hudi_trips_cow.consume.mode

查询方式:SNAPSHOT、INCREMENTAL。读优化查询只能配置为SNAPSHOT

 

示例:

·     查询mor表(ro表)

set hoodie.hudi_trips_mor.consume.mode=SNAPSHOT;

select * from hudi_trips_mor;

3.4  流SQL操作

说明

如果集群手动开启kerberos认证,需要在flink自定义配置sql-gateway-flink-conf新增配置项目security.kerberos.login.keytab、security.kerberos.login.principal,具体值为集群超级用户对应的keytab和principal,如:/etc/security/keytabs/admin123.keytab、[email protected]

 

3.4.1  保留关键字

DLH流SQL兼容Flink SQL能力,虽然Flink SQL 特性并未完全实现,但是一些字符串及其组合已经被预留为关键字以备未来使用,如表3-23所示。如果您希望使用以下字符串作为字段名,请在使用时使用反引号将该字段名包起来(示例`value`, `count` )。

表3-23 预留字符串

A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, RAW, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRING, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE

 

3.4.2  流SQL UDF

说明

当前版本中,DLH流SQL暂不支持自定义UDF,DLH流SQL的内置UDF兼容Flink1.12版本中的内置UDF,Flink1.12的内置UDF请参见:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html。

 

3.4.3  创建表

DLH流表的建表语法完全兼容开源FlinkSQL的建表语法。但当前版本中,DLH流表连接器的数据源端仅支持Kafka,数据目的端仅支持Kafka、Mysql、PostgreSQL、Elasticsearch、Hive。其中,Kafka连接器支持的数据格式为csv或json,Hive连接器支持的数据格式为orc、parquet或textfile, Elasticsearch支持的数据格式为json,Mysql和PostgreSQL连接器不区分数据格式。

当前版本中,DLH流表的数据源端支持事件时间类型表、处理时间类型表。

1. DLH流表的建表语句

图3-3 DLH流表的建表语句中需设置的基本属性值

属性

默认值

是否必选

说明

connector

连接器类型。在当前版本中,DLH仅支持kafka、elasticsearch-7、jdbc

topic

连接器所连接的Kafka topic

properties.group.id

消费Kafka数据时指定的groupID。作为source表时此属性必填,作为sink表时此属性可不填

scan.startup.mode

group-offsets

Kafka消费端offset配置。可选值:latest-offset、earliest-offset、group-offsets

properties.bootstrap.servers

连接Kafka必需的boostrap.server配置

format

数据流格式。在当前版本中,DLH仅支持csv、json

csv.field-delimiter

数据流字符串分隔符。csv格式下必填

properties.security.protocol

Kerberos环境下,连接Kafka所需的安全协议,值为:SASL_PLAINTEXT

properties.sasl.mechanism

Kerberos环境下,连接Kafka所需的认证机制,值为:GSSAPI

properties.sasl.kerberos.service.name

Kerberos环境下,连接Kafka所需的认证服务名称,值为:kafka

 

2. 示例

场景1:在数据源端创建表

·     创建CSV表

¡     Kafka(csv)连接器事件时间下的建表语句:

CREATE TABLE csv_source(

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3),

WATERMARK FOR ts as ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'csv_source',

'properties.group.id' = 'flink1',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'format' = 'csv',

'csv.field-delimiter' = '|'

);

¡     Kafka(csv)连接器处理时间下的建表语句:

注意:ts为额外的处理时间字段,数据源中如果存在会被覆盖

CREATE TABLE csv_source(

user_id INT,

product STRING,

amount INT,

ts as proctime()

) WITH (

'connector' = 'kafka',

'topic' = 'csv_source',

'properties.group.id' = 'flink1',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'format' = 'csv',

'csv.field-delimiter' = '|'

);

·     创建json表

¡     Kafka(json)连接器事件时间下的建表语句:

CREATE TABLE json_source(

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3),

WATERMARK FOR ts as ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'json_source',

'properties.group.id' = 'flink_json',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'format' = 'json'

);

¡     Kafka(json)连接器处理时间下的建表语句:

【注意】ts为额外的处理时间字段,数据源中如果存在会被覆盖。

CREATE TABLE json_source(

user_id INT,

product STRING,

amount INT,

ts as proctime()

) WITH (

'connector' = 'kafka',

'topic' = 'json_source',

'properties.group.id' = 'flink_json',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'format' = 'json'

);

 

场景2:在数据目的端创建表

·     Kafka(csv)连接器的建表语句:

CREATE TABLE csv_source(

user_id INT,

product STRING,

amount INT

) WITH (

'connector' = 'kafka',

'topic' = 'csv_source',

'properties.group.id' = 'flink1',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'format' = 'csv',

'csv.field-delimiter' = '|'

);

·     Kafka(json)连接器的建表语句:

CREATE TABLE json_source(

user_id INT,

product STRING,

amount INT

) WITH (

'connector' = 'kafka',

'topic' = 'json_source',

'properties.group.id' = 'flink_json',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'format' = 'json'

);

·     Elasticsearch(json)连接器的建表语句:

CREATE TABLE sk_es (

  user_id INT,

  product STRING,

  amount INT,

  PRIMARY KEY (user_id) NOT ENFORCED

) WITH (

  'connector' = 'elasticsearch-7',

  'hosts' = 'http://qcj36:9200',

  'index' = 'users'

);

·     Mysql连接器的建表语句:

CREATE TABLE sk_jdbc_mysql (

  user_id INT,

  product STRING,

  amount INT

) WITH (

   'connector' = 'jdbc',

   'url' = 'jdbc:mysql://10.121.65.53:53306/mydatabase',

   'table-name' = 'users',

   'username' = 'cluster',

   'password' = 'CloudOS5#DE3@Cluster'

);

·     PostgreSQL连接器的建表语句:

CREATE TABLE sk_jdbc_postgresql (

  user_id INT,

  product STRING,

  amount INT

) WITH (

   'connector' = 'jdbc',

   'url' = 'jdbc:postgresql://10.121.47.114:5432/mydatabase',

   'table-name' = 'users',

   'username' = 'postgres',

   'password' = 'postgres'

);

·     Hive(textfile)连接器的建表语句:

CREATE TABLE hive_txt(

  user_id INT,

  product STRING,

  amount INT

)PARTITIONED BY(dt STRING, hr STRING) stored as TEXTFILE TBLPROPERTIES (

  'sink.rolling-policy.file-size'='1MB',

  'sink.rolling-policy.rollover-interval'='1 min',

  'sink.rolling-policy.check-interval'='1 s',

  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',

  'sink.partition-commit.trigger'='partition-time',

  'sink.partition-commit.delay'='1 s',

  'sink.partition-commit.policy.kind'='metastore,success-file'

);

·     Hive(orc)连接器的建表语句:

CREATE TABLE hive_orc(

  user_id INT,

  product STRING,

  amount INT

)PARTITIONED BY(dt STRING, hr STRING) stored as orc TBLPROPERTIES (

  'sink.rolling-policy.file-size'='1MB',

  'sink.rolling-policy.rollover-interval'='1 min',

  'sink.rolling-policy.check-interval'='1 s',

  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',

  'sink.partition-commit.trigger'='partition-time',

  'sink.partition-commit.delay'='1 s',

  'sink.partition-commit.policy.kind'='metastore,success-file'

);

·     Hive(parquet)连接器的建表语句:

CREATE TABLE hive_parquet(

  user_id INT,

  product STRING,

  amount INT

)PARTITIONED BY(dt STRING, hr STRING) stored as parquet TBLPROPERTIES (

  'sink.rolling-policy.file-size'='1MB',

  'sink.rolling-policy.rollover-interval'='1 min',

  'sink.rolling-policy.check-interval'='1 s',

  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',

  'sink.partition-commit.trigger'='partition-time',

  'sink.partition-commit.delay'='1 s',

  'sink.partition-commit.policy.kind'='metastore,success-file'

);

3.4.4  数据插入 1. 插入语句

DLH流表的数据插入语句兼容FlinkSQL的insert into……select……语句:

INSERT INTO tablename1(col1,col2,col3,…)SELECT col1,col2,col3,…From tablename2

【注意】

·     非Kerberos环境下,可直接执行DLH流表的数据插入语句。

·     Kerberos环境下,执行DLH流表的数据插入语句,需满足以下条件:

¡     准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。

¡     确保在Flink的所有节点上,都存在上述用户对应的keytab文件,且需要保证该用户为该keytab文件的所有者。

¡     修改Flink的配置:security.kerberos.login.keytab= ;security.kerberos.login.principal= ,以上两个配置项的值修改完成并保存后,重启Flink。

2. 窗口类型

DLH支持FlinkSQL所有窗口类型:tumble、hop、session、over等。

3. 示例

(1)     示例1:非Kerberos环境下进行DLH流表的数据插入(CSV),步骤如下:

a.     创建Kafka topic:csv_source和csv_sink

./kafka-topics.sh --zookeeper zyf07:2181 --topic csv_source --create --partitions 3 --replication-factor 2

./kafka-topics.sh --zookeeper zyf07:2181 --topic csv_sink --create --partitions 3 --replication-factor 2

b.     连接DLH beeline并创建数据源表

CREATE TABLE csv_source(

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3),

WATERMARK FOR ts as ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'csv_source',

'properties.group.id' = 'flink_csv',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'format' = 'csv',

'csv.field-delimiter' = '|'

);

c.     创建数据输出表

CREATE TABLE csv_sink (

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3)

) WITH (

'connector' = 'kafka',

'topic' = 'csv_sink',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'format' = 'csv'

);

d.     提交流任务

INSERT INTO csv_sink(user_id, product, amount, ts) SELECT user_id, product, amount, ts FROM csv_source;

e.     向Kafka的topic:csv_source生产数据

【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。

可以提前准备数据,然后将数据写入到一个文件(示例csv_input.txt)。

表3-24 示例数据

1|beer|3|2019-12-12 00:00:01

1|diaper|4|2019-12-12 00:00:02

2|pen|3|2019-12-12 00:00:04

2|rubber|3|2019-12-12 00:00:06

3|rubber|2|2019-12-12 00:00:05

4|beer|1|2019-12-12 00:00:08

 

向Kafka的topic:csv_source生产数据,命令如下:

./kafka-console-producer.sh --broker-list zyf07:6667  --topic csv_source < csv_input.txt

f.     消费Kafka的topic:csv_sink数据

./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic csv_sink --from-beginning

g.     测试结束,停止流任务

yarn application –list  获取对应的applicationId

yarn application --kill  

h.     删除已创建的csv_source、csv_sink表

drop table csv_source;

drop table csv_sink;

【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。

 

(2)     示例2:非Kerberos环境下进行DLH流表的数据插入(JSON),步骤如下:

a.     创建Kafka topic:json_source和json_sink

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink --create --partitions 3 --replication-factor 2

b.     连接DLH beeline并创建数据源表

CREATE TABLE json_source(

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3),

WATERMARK FOR ts as ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'json_source',

'properties.group.id' = 'flink_json',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'format' = 'json'

);

c.     创建数据输出表

CREATE TABLE json_sink (user_id INT,product STRING,amount INT,ts TIMESTAMP(3)) WITH (

'connector' = 'kafka',

'topic' = 'json_sink',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'format' = 'json'

);

d.     提交流任务

INSERT INTO json_sink(user_id, product, amount, ts) SELECT user_id, product, amount, ts FROM json_source;

e.     向Kafka的topic:json_source生产数据

【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。

可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。

表3-25 示例数据

{"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"}

{"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"}

{"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"}

{"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"}

{"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"}

{"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}

 

向Kafka的topic:json_source生产数据,命令如下:

./kafka-console-producer.sh --broker-list zyf07:6667  --topic json_source < json_input.txt

f.     消费Kafka的topic:json_sink数据

./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink --from-beginning

g.     测试结束,停止流任务

yarn application –list  获取对应的applicationId

yarn application --kill 

h.     删除已创建的json_source、json_sink表

drop table json_source;

drop table json_sink;

【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。

 

(3)     示例3:Kerberos环境下进行DLH流表的数据插入(CSV),步骤如下:

a.     创建Kafka topic:csv_source和csv_sink

./kafka-topics.sh --zookeeper zyf07:2181 --topic csv_source --create --partitions 3 --replication-factor 2

./kafka-topics.sh --zookeeper zyf07:2181 --topic csv_sink --create --partitions 3 --replication-factor 2

b.     连接DLH beeline并创建数据源表

CREATE TABLE csv_source(

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3),

WATERMARK FOR ts as ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'csv_source',

'properties.group.id' = 'flink_csv',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'csv',

'csv.field-delimiter' = '|'

);

c.     创建数据输出表

CREATE TABLE csv_sink (user_id INT,product STRING,amount INT,ts TIMESTAMP(3)) WITH (

'connector' = 'kafka',

'topic' = 'csv_sink',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'csv'

);

d.     提交流任务

前置条件:

-     准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。

-     确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。

-     修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。

以上前置条件全部满足之后,才可以提交流任务SQL:

INSERT INTO csv_sink(user_id, product, amount, ts) SELECT user_id, product, amount, ts FROM csv_source;

e.     向Kafka的topic:csv_source生产数据

【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。

可以提前准备数据,然后将数据写入到一个文件(示例csv_input.txt)。

表3-26 示例数据

1|beer|3|2019-12-12 00:00:01

1|diaper|4|2019-12-12 00:00:02

2|pen|3|2019-12-12 00:00:04

2|rubber|3|2019-12-12 00:00:06

3|rubber|2|2019-12-12 00:00:05

4|beer|1|2019-12-12 00:00:08

 

向Kafka的topic:csv_source生产数据,命令如下:

./kafka-console-producer.sh --broker-list zyf07:6667 --producer-property security.protocol=SASL_PLAINTEXT --topic csv_source < csv_input.txt

f.     消费Kafka的topic:csv_sink数据

./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic csv_sink --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning

g.     测试结束,停止流任务

yarn application –list  获取对应的applicationId

yarn application --kill 

h.     删除已创建的csv_source、csv_sink表

drop table csv_source;

drop table csv_sink;

【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。

 

(4)     示例4:Kerberos环境下进行DLH流表的数据插入(JSON),步骤如下:

a.     创建Kafka topic:json_source和json_sink

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink --create --partitions 3 --replication-factor 2

b.     连接DLH beeline并创建数据源表

CREATE TABLE json_source(

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3),

WATERMARK FOR ts as ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'json_source',

'properties.group.id' = 'flink_json',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'json'

);

c.     创建数据输出表

CREATE TABLE json_sink (user_id INT,product STRING,amount INT,ts TIMESTAMP(3)) WITH (

'connector' = 'kafka',

'topic' = 'json_sink',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'json'

);

d.     提交流任务

前置条件:

-     准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。

-     确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。

-     修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。

以上前置条件全部满足之后,才可以提交流任务SQL:

INSERT INTO json_sink(user_id, product, amount, ts) SELECT user_id, product, amount, ts FROM json_source;

e.     向Kafka的topic:json_source生产数据

【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。

可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。

表3-27 示例数据

{"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"}

{"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"}

{"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"}

{"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"}

{"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"}

{"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}

 

向Kafka的topic:json_source生产数据,命令如下:

./kafka-console-producer.sh --broker-list zyf07:6667  --topic json_source --producer-property security.protocol=SASL_PLAINTEXT < json_input.txt

f.     消费Kafka的topic:json_sink数据

./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning

g.     测试结束,停止流任务

yarn application –list  获取对应的applicationId

yarn application --kill 

h.     删除已创建的json_source、json_sink表

drop table json_source;

drop table json_sink;

【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。

 

(5)     示例5:Kerberos环境下进行DLH流表的数据插入(tumble窗口),步骤如下:

a.     创建Kafka topic:json_source和json_sink_tumble

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink_tumble --create --partitions 3 --replication-factor 2

b.     连接DLH beeline并创建数据源表

CREATE TABLE json_source(

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3),

WATERMARK FOR ts as ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'json_source',

'properties.group.id' = 'flink_json',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'json'

);

c.     创建数据输出表

CREATE TABLE json_sink_tumble (

cnt BIGINT,

ts TIMESTAMP(3)

) WITH (

'connector' = 'kafka',

'topic' = 'json_sink_tumble',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'json'

);

d.     提交流任务

前置条件:

-     准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。

-     确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。

-     修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。

以上前置条件全部满足之后,才可以提交流任务SQL:

INSERT INTO json_sink_tumble SELECT  count(product), tumble_start(ts, interval '1' minute) FROM json_source group by tumble(ts, interval '1' minute);

e.     向Kafka的topic:json_source生产数据

【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。

可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。

表3-28 示例数据

{"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"}

{"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"}

{"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"}

{"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"}

{"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"}

{"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}

 

向Kafka的topic:json_source生产数据,命令如下:

./kafka-console-producer.sh --broker-list zyf07:6667  --topic json_source --producer-property security.protocol=SASL_PLAINTEXT < json_input.txt

f.     消费Kafka的topic:json_sink_tumble数据

./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink_tumble --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning

g.     测试结束,停止流任务

yarn application –list  获取对应的applicationId

yarn application --kill 

h.     删除已创建的json_source、json_sink_tumble表

drop table json_source;

drop table json_sink_tumble;

【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。

 

(6)     示例6:Kerberos环境下进行DLH流表的数据插入(hop窗口),步骤如下:

a.     创建Kafka topic:json_source和json_sink_hop

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink_hop --create --partitions 3 --replication-factor 2

b.     连接DLH beeline并创建数据源表

CREATE TABLE json_source(

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3),

WATERMARK FOR ts as ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'json_source',

'properties.group.id' = 'flink_json',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'json'

);

c.     创建数据输出表

CREATE TABLE json_sink_hop (

win_start TIMESTAMP(3),

win_end TIMESTAMP(3),

win_rowtime TIMESTAMP(3),

user_id INT,

product_cnt bigint

) WITH (

'connector' = 'kafka',

'topic' = 'json_sink_hop',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'json'

);

d.     提交流任务

前置条件:

-     准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。

-     确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。

-     修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。

以上前置条件全部满足之后,才可以提交流任务SQL:

INSERT INTO json_sink_hop SELECT hop_start(ts, interval '2' second, interval '5' second),hop_end(ts, interval '2' second, interval '5' second), hop_rowtime(ts, interval '2' second, interval '5' second),user_id,count(product) FROM json_source group by hop(ts, interval '2' second, interval '5' second),user_id;

e.     向Kafka的topic:json_source生产数据

【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。

可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。

表3-29 示例数据

{"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"}

{"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"}

{"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"}

{"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"}

{"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"}

{"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}

 

向Kafka的topic:json_source生产数据,命令如下:

./kafka-console-producer.sh --broker-list zyf07:6667  --topic json_source --producer-property security.protocol=SASL_PLAINTEXT < json_input.txt

f.     消费Kafka的topic:json_sink_hop数据

./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink_hop --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning

g.     测试结束,停止流任务

yarn application –list  获取对应的applicationId

yarn application --kill 

h.     删除已创建的json_source、json_sink_hop表

drop table json_source;

drop table json_sink_hop;

【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。

 

(7)     示例7:Kerberos环境下进行DLH流表的数据插入(session窗口),步骤如下:

a.     创建Kafka topic:json_source和json_sink_session

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink_session --create --partitions 3 --replication-factor 2

b.     连接DLH beeline并创建数据源表

CREATE TABLE json_source(

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3),

WATERMARK FOR ts as ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'json_source',

'properties.group.id' = 'flink_json',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'json'

);

c.     创建数据输出表

CREATE TABLE json_sink_session (

session_start TIMESTAMP(3),

session_end TIMESTAMP(3),

session_rowtime TIMESTAMP(3),

user_id INT,

product_cnt bigint

) WITH (

'connector' = 'kafka',

'topic' = 'json_sink_session',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'json'

);

d.     提交流任务

前置条件:

-     准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。

-     确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。

-     修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。

以上前置条件全部满足之后,才可以提交流任务SQL:

INSERT INTO json_sink_session SELECT session_start(ts, interval '5' second),session_end(ts, interval '5' second), session_rowtime(ts, interval '5' second),user_id,count(product) FROM json_source group by session(ts, interval '5' second),user_id;

e.     向Kafka的topic json_source生产数据

【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。

可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。

表3-30 示例数据

{"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"}

{"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"}

{"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"}

{"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"}

{"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"}

{"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}

 

向Kafka的topic json_source生产数据,命令如下:

./kafka-console-producer.sh --broker-list zyf07:6667  --topic json_source --producer-property security.protocol=SASL_PLAINTEXT < json_input.txt

f.     消费Kafka的topic:json_sink_session数据

./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink_session --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning

g.     测试结束,停止流任务

yarn application –list  获取对应的applicationId

yarn application --kill 

h.     删除已创建的json_source、json_sink_session表

drop table json_source;

drop table json_sink_session;

【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。

 

(8)     示例8:Kerberos环境下进行DLH流表的数据插入(over窗口),步骤如下:

a.     创建Kafka topic:json_source和json_sink_over

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_source --create --partitions 3 --replication-factor 2

./kafka-topics.sh --zookeeper zyf07:2181 --topic json_sink_over --create --partitions 3 --replication-factor 2

b.     连接DLH beeline并创建数据源表

CREATE TABLE json_source(

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3),

WATERMARK FOR ts as ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'json_source',

'properties.group.id' = 'flink_json',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'json'

);

c.     创建数据输出表

CREATE TABLE json_sink_over (

user_id INT,

product STRING,

amount INT,

maxAmount INT

) WITH (

'connector' = 'kafka',

'topic' = 'json_sink_over',

'properties.group.id' = 'flink2',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '10.121.65.7:6667',

'properties.security.protocol' = 'SASL_PLAINTEXT',

'properties.sasl.mechanism' = 'GSSAPI',

'properties.sasl.kerberos.service.name' = 'kafka',

'format' = 'json'

);

d.     提交流任务

前置条件:

-     准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。

-     确保在Flink的所有节点上,都存在上述用户对应的keytab文件(示例/opt/admin123.keytab),且需要保证该用户为该keytab文件的所有者。

-     修改Flink的配置:security.kerberos.login.keytab= /opt/admin123.keytab;security.kerberos.login.principal= [email protected],以上两个配置项的值修改完成并保存后,重启Flink。

以上前置条件全部满足之后,才可以提交流任务SQL:

insert into json_sink_over select user_id,product,amount,max(amount) over(partition by product order by ts rows between 2 preceding and current row) as maxAmount from json_source;

e.     向Kafka的topic:json_source生产数据

【注意】写入topic的数据字符串格式与数据源表的表结构务必一致,否则会造成数据丢失或任务失败。

可以提前准备数据,然后将数据写入到一个文件(示例json_input.txt)。

表3-31 示例数据

{"user_id":1, "product":"beer","amount":3, "ts":"2019-12-12 00:00:01"}

{"user_id":1, "product":"diaper","amount":4, "ts":"2019-12-12 00:00:02"}

{"user_id":2, "product":"pen","amount":200, "ts":"2019-12-12 00:00:03"}

{"user_id":2, "product":"rubber","amount":30, "ts":"2019-12-12 00:00:04"}

{"user_id":3, "product":"rubber","amount":3000, "ts":"2019-12-12 00:00:05"}

{"user_id":4, "product":"beer","amount":2222, "ts":"2019-12-12 00:00:09"}

 

向Kafka的topic:json_source生产数据,命令如下:

./kafka-console-producer.sh --broker-list zyf07:6667  --topic json_source --producer-property security.protocol=SASL_PLAINTEXT < json_input.txt

f.     消费Kafka的topic:json_sink_over数据

./kafka-console-consumer.sh --bootstrap-server zyf07:6667 --topic json_sink_over --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning

g.     测试结束,停止流任务

yarn application –list  获取对应的applicationId

yarn application --kill 

h.     删除已创建的json_source、json_sink_over表

drop table json_source;

drop table json_sink_over;

【说明】此处的删除仅表示删除存在DLH中Hive里面的元数据,并不会删除实际存储的真实数据(即Kafka中的topic数据依旧存在)。

3.4.5  数据查询

说明

·     流计算任务为长驻任务。在查询SQL执行过程中,如果出现Ctrl+C或其他操作导致查询异常中止,Flink查询任务也无法自动停止。在此情况下,若需要终止Flink查询任务,则需要根据查询SQL的开始执行时间在YARN上查询对应的Flink任务并手动终止。

·     在当前版本中,DLH会话中窗口查询结果不在控制台直接打印,实际计算结果需在对应的Flink作业监控页面进行查询。实际业务场景下,多数是通过insert into select from 语句来执行窗口操作。

 

1. 查询语法

DLH流表的数据查询语句兼容FlinkSQL的streaming模式下的查询语句。

·     非Kerberos环境下,可直接执行DLH流表的数据查询语句。

·     Kerberos环境下,执行DLH流表的数据查询语句,需满足以下条件:

¡     准备一个同时具备输入表的读权限和输出表的写权限的用户(为方便起见,也可以直接使用集群超级用户)。

¡     确保在Flink的所有节点上,都存在上述用户对应的keytab文件,且需要保证该用户为该keytab文件的所有者。

¡     修改Flink的配置:security.kerberos.login.keytab= ;security.kerberos.login.principal= ,以上两个配置项的值修改完成并保存后,重启Flink。

2. 示例

(1)     创建Kafka topic:test001,用于数据源产生(若开启kerberos则需追加--command-config kerberos参数)

./kafka-topics.sh --create --bootstrap-server rzt237:6667,rzt238:6667,rzt239:6667 --replication-factor 1 --partitions 1 --topic test001 [--command-config kerberos]

(2)     创建流表flink55

【说明】DLH流表的建表语句中需设置的基本属性值请参考3.4.3章节。

CREATE TABLE flink55 (

user_id INT,

product STRING,

amount INT,

ts TIMESTAMP(3),

WATERMARK FOR ts as ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'test001',

'properties.group.id' = 'flink22',

'scan.startup.mode' = 'earliest-offset',

'properties.bootstrap.servers' = '10.121.64.238:6667',

'format' = 'csv',

'csv.field-delimiter' = '|'

);

(3)     向Kafka的topic:test001中生产数据

【注意】数据格式务必与建表语句中format.type和formmat.field-delimiter属性保持一致。

表3-32 测试数据

1|diaper|4|2019-12-12 00:00:02

2|pen|3|2019-12-12 00:00:04

2|rubber|3|2019-12-12 00:00:06

3|rubber|2|2019-12-12 00:00:05

4|beer|1|2019-12-12 00:00:08

1|diaper|4|2019-12-12 00:00:02

2|pen|3|2019-12-12 00:00:04

2|rubber|3|2019-12-12 00:00:06

3|rubber|2|2019-12-12 00:00:05

 

(4)     进行流表数据查询

¡     select语句

执行SQL:

select * from flink55;

¡     select as语句

执行SQL:

select user_id,amout as ddd from flink55;

【注意】自定义字段名需避免使用保留关键字。

¡     过滤查询select * from [tab] where [filter_condition]语句

执行SQL:

select * from flink55 where user_id > 2;

¡     查询嵌套常用udf

执行SQL:

select UPPER(product) from flink55;

¡     分组聚合

执行SQL:

select user_id,sum(amount) as cnt from flink55 group by user_id

¡     去重语法

执行SQL:

select DISTINCT user_id from flink55;

¡     having语法

执行SQL:

select sum(amount) from flink55 group by user_id having sum(amount) > 5;

¡     join语法

前置条件:

-     创建kafka topic:test002,并向test002中生产部分模拟数据。

-     创建新的Flink表(表中字段与topic test002中字段格式需保持一致)。

以上前置条件全部满足之后,执行SQL:

select flink55.user_id,flink55.product from flink55 inner join flink55_2 on flink55.user_id = flink55_2.user_id;

【注意】此情况下,时间字段不能作为查询过滤条件及查询返回结果字段。

¡     分组窗口聚合

在当前版本中,DLH会话中窗口查询结果不在控制台直接打印,实际计算结果需在对应的Flink作业监控页面进行查询。实际业务场景下,多数是通过insert into select from 语句来执行窗口操作。

3.5  MLSQL

MLSQL是基于MLlib的SQL中间件。MLlib是Spark的机器学习库,目标是让机器学习更加容易且更具扩展性,使用Mllib时需要编写MLlib应用,操作复杂。但是通过MLSQL,用户可直接通过SQL语句使用MLlib中的算法,使操作的便利性得到了很大提高。

MLSQL目前支持监督算法(比如:分类算法、回归算法)、非监督算法(比如:聚类算法)、特征工程算法、模型选择、Pipeline机制。

3.5.1  监督算法 1. 语法

·     训练语法

SELECT

TrainAlgorithmName(’model_result_table’,label,col1,col2,…,[’parms’])

from TrainTableName;

·     预测语法

SELECT

PredictAlgorithmName(‘model_result_table’,col1,col2,col3,…,’id’,’predict_result_table’)

from PredictTableName;

2. 语法说明

表3-33 训练语法说明

参数

说明

常见配置选项

TrainAlgorithmName

训练算法名称,必须指定,支持的训练算法见算法说明:训练算法部分

model_result_table

算法训练结果模型表,必须指定

TrainTableName

训练数据表名,用作算法的数据源,必须指定

label

训练数据表中的列,用作算法的参数,作为标签列,必须指定

col1,col2,…

训练数据表中的列,用作算法的参数,作为特征列,必须指定

parms

算法的其它参数,根据算法需要,可以不指定(内容格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …),参数说明

 

表3-34 预测语法说明

参数名称

参数解释

备注

PredictAlgorithmName

预测算法名称,必须指定,算法说明:预测算法部分

model_result_table

模型表名,必须指定

用来指定使用哪个模型进行预测,对应训练中的model_result_table

PredictTableName

预测数据表名,用来预测的数据,必须指定

id

预测数据表的id列,用作算法的参数,作为标签列,必须指定

col1,col2,…

表的列,用作算法的参数,必须指定

predict_result_table

使用模型进行预测后的结果表

临时表

 

3. 算法说明

表3-35 训练算法

算法名

算法说明

备注

LogisticRegression

逻辑回归训练函数

分类

DecisionTreeClassifier

决策树分类训练函数

[注意]:参与训练的数据属性列的值不能相同

分类

GBTClassifier

GBT分类训练函数

[注意]:参与训练的数据属性列的值不能相同

分类(二分类)

RandomForestClassifier

随机森林分类训练函数

[注意]:参与训练的数据属性列的值不能相同

分类

MultilayerPerceptronClassifier

多层感知机训练函数

分类

NaiveBayes

朴素贝叶斯训练函数

分类

OneVsRest

Onevsrest训练函数:一般用于多分类,需要设置基础二分类分类器

分类

LinearRegression

线性回归训练函数

回归

GeneralizedLinearRegression

广义线性回归训练函数

回归

DecisionTreeRegressor

决策树回归训练函数

[注意]:参与训练的数据属性列的值不能相同

回归

RandomForestRegressor

随机森林回归训练函数

[注意]:参与训练的数据属性列的值不能相同

回归

GBTRegressor

GBT回归训练函数

[注意]:参与训练的数据属性列的值不能相同

回归

IsotonicRegression

保序回归训练函数

回归

AFTSurvivalRegression

AFT-Survival回归训练函数

回归

 

表3-36 预测算法

算法名

算法说明

备注

LogisticRegressionPrediction

逻辑回归预测函数

分类预测

DecisionTreeClassificationPrediction

决策树分类预测函数

分类预测

GBTClassificationPrediction

GBT分类预测函数

分类预测

RandomForestClassificationPrediction

随机森林分类预测函数

分类预测

MultilayerPerceptronPrediction

多层感知机分类预测函数

分类预测

NaiveBayesPrediction

朴素贝叶斯分类预测函数

分类预测

OneVsRestPrediction

Onevsrest分类预测函数

分类预测

LinearRegressionPrediction

线性回归预测函数

回归预测

GeneralizedLinearRegressionPrediction

广义线性回归预测函数

回归预测

DecisionTreeRegressionPrediction

决策树回顾预测函数

回归预测

GBTRegressorPrediction

GBT回归预测函数

回归预测

RandomForestRegressorPrediction

随机森林预测函数

回归预测

IsotonicRegressionPrediction

保序回归预测函数

回归预测

AFTSurvivalRegressionPrediction

Aft-Survival回归预测函数

回归预测

 

4. 参数说明

表3-37 参数说明

参数名

参数说明

支持算法

aggregationdepth

设置分布式统计时的层数,主要用在treeAggregate中,数据量越大,可适当加大这个值,默认是2,int型

logisticregression、aftsurvivalregression、linearregression

standardization

训练模型前是否需要对训练特征进行标准化处理,boolean型

Logisticregression、linearregression

threshold

二分类预测的阈值,范围[0,1],double型

Logisticregression

thresholds

多分类预测的阈值,array[double]型

Logisticregression

fitintercept

是否训练intercept,boolean型

Logisticregression、aftsurvivalregression、linearregression、generalizedlinearregression

tol

优化算法迭代求解过程中的收敛阈值,默认值:1e-4,不能为负数,double型

Logisticregression、multilayerperceptronclassifier、aftsurvivalregression、generalizedlinearregression、linearregression

weightcol

列权重,string型

Logisticregression、naivebayes、isotonicregression、generalizedlinearregression、linearregression

regparam

正则化参数(>=0),调节拟合程度,越小-过拟合,越大-欠拟合;double型

Logisticregression、generalizedlinearregression、linearregression

elasticnetparam

弹性网络混合参数(范围[0,1]),用于调节L1和L2之间的比例,两种正则化比例加起来是1,默认是0,即只使用L2正则化,设置为1,即只使用L1正则化;double型

Logisticregression、linearregression

family

模型中使用的误差分布类型

·     Logisticregression可选:auto,binomial,multinomial,不写默认使用auto

·     GeneralizedLinearregression可选:gaussian,binomial,poisson,gamma

Logisticregression、generalizedlinearregression

seed

随机种子,long型

Decisiontreeclassifier、gbtclassifier、randomforestclassifier、multilayerperceptronclassifier、 decisiontreeregressor、gbtregressor、randomforestregressor

maxdepth

树的最大高度,int型

Decisiontreeclassifier、gbtclassifier、randomforestclassifier、decisiontreeregressor、gbtregressor、randomforestregressor

maxbins

每个特征分裂时,最大划分(桶)数量;int型

Decisiontreeclassifier、gbtclassifier、randomforestclassifier、decisiontreeregressor、gbtregressor、randomforestregressor

mininstancespernode

需保证节点分割出的子节点的最少样本数达到这个值,int型

Decisiontreeclassifier、gbtclassifier、randomforestclassifier、decisiontreeregressor、gbtregressor、randomforestregressor

mininfogain

最小信息增益,当前节点的所有属性分割带来的信息增益都要比这个值大,double型

Decisiontreeclassifier、gbtclassifier、randomforestclassifier、decisiontreeregressor、gbtregressor、randomforestregressor

subsamplingrate

二次抽样率,指定每棵树训练的数据比例,double型

gbtclassifier、randomforestclassifier、gbtregressor、randomforestregressor

maxiter

优化算法求解的最大迭代次数,默认值100,int型

LinearRegression、

Logisticregression、Gbtclassifier、multilayerperceptronclassifier、

AFTSurvivalRegression、GBTRegressor、GeneralizedLinearRegression

stepsize

每次迭代优化步长,double型,取值范围:(0,1]

Gbtclassifier、multilayerperceptronclassifier、gbtregressor

losstype

使用哪种损失函数

可选用:hinge、logistic、squared

Gbtclassifier、gbtregressor

blocksize

该参数被前馈网络训练器用来将训练样本数据的每个分区都按照blocksize大小分成不同组,并且每个组内的每个样本都会叠加成一个向量,以便在各种优化算法间传递;int型

multilayerperceptronclassifier

layers

必须设置,注意输入/输出的层数

multilayerperceptronclassifier

solver

优化的求解算法

·     Multilayerperceptronclassifier可选:l-bfgs,gd

·     Linearregression可选:auto,l-bfgs,normal

·     Generalizedlinearregression可选:irls

Multilayerperceptronclassifier、generalizedlinearregression、linearregression

modeltype

模型类型(Multinomial,Bernoulli),实际上依据特征的分布不同,朴素贝叶斯又划分为多个子类别,string型

naivebayes

smoothing

一般采用拉普拉斯平滑进行处理,double型

naivebayes

classifier

设置基础二分类分类器,在onevsrest算法中必须要指定的

onevsrest

featuresubsetstrategy

特征子集策略,每棵树中使用那些特征做分裂,这个参数可以用小数比例形式指定,减少这个值会加速训练,但太小会影响效果

Randomforestclassifier、randomforestregressor

numtrees

随机森林中决策树个数

Randomforestclassifier、randomforestregressor

censorcol

设置中心列

aftsurvivalregression

isotonic

递增保序回归或递减保序回归,boolean型

isotonicregression

link

连接函数名,描述线性预测器和分布函数均值之间关系

·     family为gaussian时可选:identity、log、inverse

·     family为binomial时可选:logit、probit、cloglog

·     family为poisson时可选:log、identity、sqrt

·     family为gamma时可选:inverse、identity、log

generalizedlinearregression

linkpredictioncol

连接函数(线性预测器列名)

generalizedlinearregression

 

5. 示例

·     LogisticRegression算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,0.0,2.3,4.5,5.6);

Insert into mltable values(2,1.0,6.5,3.8,4.9);

运行语句示例:

------------------训练-----------------------

drop table if exists lor_model; select LogisticRegression('lor_model',label,col1,col2,col3,'-MaxIter 20') from mltable; select * from lor_model;

--------------------预测---------------------

select LogisticRegressionPrediction('lor_model',col1,col2,col3,'id','lor_pred') from mltable; select * from lor_pred a,mltable b where a.id = b.id;

·     DecisionTreeClassifier算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,0.0,5.7,5.9,7.9);

Insert into mltable values(2,1.0,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists dtc_model; select DecisionTreeClassifier('dtc_model',label,col1,col2,col3) from mltable; select * from dtc_model;

--------------------预测---------------------

select DecisionTreeClassificationPrediction('dtc_model',col1,col2,col3,'id','dtc_pred') from mltable; select * from dtc_pred a,mltable b where a.id = b.id;

·     GBTClassifie算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,0.0,5.7,5.9,7.9);

Insert into mltable values(2,1.0,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists gbtc_model; select GBTClassifier('gbtc_model',label,col1,col2,col3) from mltable; select * from gbtc_model;

--------------------预测---------------------

select GBTClassificationPrediction('gbtc_model',col1,col2,col3,'id', 'gbtc_pred') from mltable; select * from gbtc_pred a,mltable b where a.id = b.id;

·     RandomForestClassifier算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,0.0,5.7,5.9,7.9);

Insert into mltable values(2,1.0,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists rfc_model; select RandomForestClassifier('rfc_model',label,col1,col2,col3) from mltable; select * from rfc_model;

--------------------预测---------------------

select RandomForestClassificationPrediction('rfc_model',col1,col2,col3,'id', 'rfc_pred') from mltable; select * from rfc_pred a,mltable b where a.id = b.id;

·     MultilayerPerceptronClassifier算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,0.0,5.7,5.9,7.9);

Insert into mltable values(2,1.0,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists mpc_model; select MultilayerPerceptronClassifier('mpc_model',label,col1,col2,col3, '-Layers 3 3 2') from mltable; select * from mpc_model;

--------------------预测---------------------

select MultilayerPerceptronPrediction('mpc_model',col1,col2,col3,'id', 'mpc_pred') from mltable; select * from mpc_pred a,mltable b where a.id = b.id;

·     NaiveBayes算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,0.0,5.7,5.9,7.9);

Insert into mltable values(2,1.0,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists nbc_model; select NaiveBayes('nbc_model',label,col1,col2,col3) from mltable; select * from nbc_model;

--------------------预测---------------------

select NaiveBayesPrediction('nbc_model',col1,col2,col3,'id', 'nbc_pred') from mltable; select * from nbc_pred a,mltable b where a.id = b.id;

·     OneVsRest算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,0.0,5.7,5.9,7.9);

Insert into mltable values(2,1.0,7.8,2.6,12.1);

语句示例:

---------------------训练-----------------------

drop table if exists ovrc_model;

select OneVsRest('ovrc_model',label,col1,col2,col3,'-Classifier NaiveBayes') from mltable; select * from ovrc_model;

--------------------预测------------------------

select OneVsRestPrediction('ovrc_model',col1,col2,col3,'id', 'ovrc_pred') from mltable; select * from ovrc_pred a,mltable b where a.id = b.id;

·     LinearRegression算法

数据准备:

Create table mltable2 (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable2 values(1,8.6,5.7,5.9,7.9);

Insert into mltable2 values(2,7.8,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists lr_model; select LinearRegression('lr_model',label,col1,col2,col3) from mltable2; select * from lr_model;

--------------------预测---------------------

select LinearRegressionPrediction('lr_model',col1,col2,col3,'id', 'lr_pred') from mltable2; select * from lr_pred a,mltable2 b where a.id = b.id;

·     GeneralizedLinearRegression算法

数据准备:

Create table mltable2 (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable2 values(1,8.6,5.7,5.9,7.9);

Insert into mltable2 values(2,7.8,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists glr_model; select GeneralizedLinearRegression('glr_model',label,col1,col2,col3) from mltable2; select * from glr_model;

--------------------预测---------------------

select GeneralizedLinearRegressionPrediction('glr_model',col1,col2,col3,'id', 'glr_pred') from mltable2; select * from glr_pred a,mltable2 b where a.id = b.id;

·     DecisionTreeRegression算法

数据准备:

Create table mltable2 (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable2 values(1,8.6,5.7,5.9,7.9);

Insert into mltable2 values(2,7.8,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists dtr_model; select DecisionTreeRegressor('dtr_model',label,col1,col2,col3) from mltable2; select * from dtr_model;

-------------------预测---------------------

select DecisionTreeRegressionPrediction('dtr_model',col1,col2,col3,'id', 'dtr_pred') from mltable2;

select * from dtr_pred a,mltable2 b where a.id = b.id;

·     RandomForestRegression算法

数据准备:

Create table mltable2 (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable2 values(1,8.6,5.7,5.9,7.9);

Insert into mltable2 values(2,7.8,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists rfr_model; select RandomForestRegressor('rfr_model',label,col1,col2,col3) from mltable2; select * from rfr_model;

-------------------预测---------------------

select RandomForestRegressorPrediction('rfr_model',col1,col2,col3,'id', 'rfr_pred') from mltable2; select * from rfr_pred a,mltable2 b where a.id = b.id;

·     GBTRegression算法

数据准备:

Create table mltable2 (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable2 values(1,8.6,5.7,5.9,7.9);

Insert into mltable2 values(2,7.8,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists gbtr_model; select GBTRegressor('gbtr_model',label,col1,col2,col3) from mltable2; select * from gbtr_model;

-------------------预测---------------------

select GBTRegressorPrediction('gbtr_model',col1,col2,col3,'id', 'gbtr_pred') from mltable2; select * from gbtr_pred a,mltable2 b where a.id = b.id;

·     IsotonicRegression算法

数据准备:

Create table mltable2 (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable2 values(1,8.6,5.7,5.9,7.9);

Insert into mltable2 values(2,7.8,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists gbtr_model; select IsotonicRegression('isor_model',label,col1,col2,col3) from mltable2; select * from isor_model;

-------------------预测---------------------

select IsotonicRegressionPrediction('isor_model',col1,col2,col3,'id', 'isor_pred') from mltable2; select * from isor_pred a,mltable2 b where a.id = b.id;

·     AFTSurvivalRegressioin算法

数据准备:

Create table afts_table (id int,label double,censor double,col1 double,col2 double);

Insert into afts_table values(1,8.6,1.0 ,5.9,7.9);

Insert into afts_table values(2,7.8,0.0,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists aftsr_model; select AFTSurvivalRegression('aftsr_model',label,censor,col1,col2) from afts_table; select * from aftsr_model;

-------------------预测---------------------

select AFTSurvivalRegressionPrediction('aftsr_model',col1,col2,'id', 'aftsr_pred') from afts_table; select * from aftsr_pred a,afts_table b where a.id = b.id;

3.5.2  非监督算法 1. 语法

·     训练语法

SELECT

TrainAlgorithmName(’model_result_table’,label,col1,col2,…,[’parms’])

from TrainTableName;

·     预测语法

SELECT

PredictAlgorithmName(‘model_result_table’,col1,col2,col3,…,’id’,’predict_result_table’)

from PredictTableName;

2. 语法说明

表3-38 训练语法说明

参数

说明

常见配置选项

TrainAlgorithmName

训练算法名称,必须指定,支持的训练算法见算法说明:训练算法部分

model_result_table

算法训练结果模型表,必须指定

TrainTableName

训练数据表名,用作算法的数据源,必须指定

label

训练数据表中的列,用作算法的参数,作为标签列,必须指定

col1,col2,…

训练数据表中的列,用作算法的参数,作为特征列,必须指定

parms

算法的其它参数,根据算法需要,可以不指定(内容格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …),详细解释见参数说明

 

表3-39 预测语法说明

参数名称

参数解释

备注

PredictAlgorithmName

预测算法名称,必须指定,支持的预测算法见算法说明:预测算法部分

model_result_table

模型表名,必须指定

用来指定使用哪个模型进行预测,对应训练中的model_result_table

PredictTableName

预测数据表名,用来预测的数据,必须指定

id

预测数据表的id列,用作算法的参数,作为标签列,必须指定

col1,col2,…

表的列,用作算法的参数,必须指定

predict_result_table

使用模型进行预测后的结果表

临时表

 

3. 算法说明

表3-40 训练算法

算法名

算法说明

备注

kmeans

Kmeans聚类训练函数

聚类

lda

Lda训练函数

聚类

gaussianmixture

高斯混合训练函数

聚类

als

Als协同过滤训练函数,算法必须加参数-ratingcol,-usercol,-itemcol

协同过滤

 

表3-41 预测算法

算法名

算法说明

备注

kmeansprediction

Kmeans聚类预测函数

聚类预测

ldaprediction

Lda预测函数

聚类预测

gmprediction

高斯混合预测函数

聚类预测

alsprediction

als预测函数

协同过滤预测

 

4. 参数说明

表3-42 参数说明

参数名

参数说明

支持算法

k

设置聚类K值,int型

Kmeans、lda、gaussianmixture

initmode

初始中心点选择方式,“random”“k-means”

Kmeans

initsteps

设置初始化步长

Kmeans

docconcentrations

文档中心,array[double]型

lda

docconcentration

Dirichlet分布的参数a,文档在主题上分布的先验参数(超参数a)。当前必须大于1,值越大,推断出的分布越平滑。默认为-1,自动设置

lda

optimizedocconcentration

是否优化文档中心列

lda

optimizer

优化器,用来学习LDA模型,一般是EMLDAOptimizer或OnlineLDAOptimizer;可选值:em或online

lda

alpha

ALS隐式反馈变化率

als

implicitprefs

使用显式反馈ALS变量或隐式反馈

als

itemcol

设置item列,必须要设置

als

nonnegative

是否支持非负

als

numblocks

并行计算的block数(-1为自动配置)

als

numitemblocks

Item的block数

als

numuserblocks

User的block数

als

rank

对应ALS模型中的隐藏因子数,即矩阵分解出的两个矩阵的新的行/列数

als

ratingcol

Rating列,必须要设置

als

usercol

User列,必须要设置

als

tol

优化算法迭代求解过程中的收敛阈值,默认值是1e-4,不能为负数

Kmeans、gaussianmixture

maxiter

优化算法求解的最大迭代次数(>=0),默认值100

Kmeans、lda、gaussianmixture、als

regparam

控制模型的正则化过程,从而控制模型的过拟合情况

als

checkpointinterval

检查点间隔,当maxiter很大的时候,检查点可以帮助减少shuffle文件大小并且可以帮助故障恢复;int型

Lda、als

seed

随机种子,long型

Kmeans、lda、gaussianmixture、als

 

5. 示例

·     KMeans算法

数据准备:

Create table mltable1 (id int,col1 double,col2 double,col3 double,col4 double);

Insert into mltable1 values(1,8.6,5.7,5.9,7.9);

Insert into mltable1 values(2,7.8,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists km_result; select KMeans('km_result',col1,col2,col3,col4,'-MaxIter 20 -K 2') from mltable1; select * from km_result;

 -------------------预测---------------------

select KMeansPrediction('km_result',col1,col2,col3,col4,'id','km_pred') from mltable1; select * from km_pred a,mltable1 b where a.id = b.id;

·     LDA算法

数据准备:

Create table mltable1 (id int,col1 double,col2 double,col3 double,col4 double);

Insert into mltable1 values(1,8.6,5.7,5.9,7.9);

Insert into mltable1 values(2,7.8,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists lda_model; select LDA('lda_model',col1,col2,col3,col4) from mltable1; select * from lda_model;

-------------------预测---------------------

select LDAPrediction('lda_model',col1,col2,col3,col4,'id','lda_pred') from mltable1; select * from lda_pred a,mltable1 b where a.id = b.id;

·     GaussianMixture算法

数据准备:

Create table mltable1 (id int,col1 double,col2 double,col3 double,col4 double);

Insert into mltable1 values(1,8.6,5.7,5.9,7.9);

Insert into mltable1 values(2,7.8,7.8,2.6,12.1);

语句示例:

------------------训练-----------------------

drop table if exists gm_model; select GaussianMixture('gm_model',col1,col2,col3,col4) from mltable1; select * from gm_model;

-------------------预测---------------------

select GMPrediction('gm_model',col1,col2,col3,col4,'id','gm_pred') from mltable1; select * from gm_pred a,mltable1 b where a.id = b.id;

·     ALS算法

数据准备:

Create table als_table (userid int,movieid int,rating double,timestamp bigint);

Insert into als_table values(0,2,3,1424380312);

Insert into als_table values(1,36,2,1424380312);

Insert into als_table values(2,4,3,1424380312);

语句示例:

------------------训练-----------------------

drop table if exists als_model;

select ALS('als_model',userId,movieId,rating,'-UserCol userId -ItemCol movieId -RatingCol rating') from als_table; select * from als_model;

-------------------预测--------------------

select alsprediction('als_model',userId,movieId,rating,'userId', 'p_als_0') from als_table;

select * from p_als_0;

3.5.3  模型选择 1. 语法

·     训练语法

SELECT

ModelSelectTypeAlgorithmName(label,col1,col2,col3,…,’model_result_table’,

’-Estimator TrainAlgorithmName [params1]’,

‘[params2]’,

‘-Evaluator EvaluatorName [params3]’)

from TrainTableName;

·     预测语法

SELECT ModelSelectTypePredictAlgorithmName(‘model_result_table’,col1,col2,col3,…,’id’,’predict_result_table’)

from PredictTableName;

2. 语法说明

表3-43 训练语法说明

参数

说明

常见配置选项

TrainAlgorithmName

训练算法名称,必须指定,支持的训练算法见算法说明:训练算法部分

model_result_table

算法训练结果模型表,必须指定

TrainTableName

训练数据表名,用作算法的数据源,必须指定

label

训练数据表中的列,用作算法的参数,作为标签列,必须指定

col1,col2,…

训练数据表中的列,用作算法的参数,作为特征列,必须指定

EvaluatorName

模型选择所使用的评估器,可选:regression、multiclassclassification、binaryclassification

若使用二分类训练算法,则使用binaryclassification评估器;若使用多分类训练算法,则使用multiclassclassification评估器;若使用回归训练算法,则使用regression回归器;

params1

模型选择所使用的训练算法的参数,根据算法需要,可以不指定(内容格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …),详细解释见参数说明

params2

模型选择的网格参数设置,(内容格式:-N MultilayerPerceptronClassifierumFolds n -G_Param G_ParamValue …) 待选择的参数组,可以不指定,详细解释见参数说明

其中NumFolds参数必须指定

params3

评估算法设定的参数,可以不指定,(内容格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …)

评估器参数ParamKey可选:metricname、labelcol、rawpredictioncol、predictioncol

 

表3-44 预测语法说明

参数名称

参数解释

备注

PredictAlgorithmName

预测算法名称,必须指定,支持的预测算法见算法说明:预测算法部分

model_result_table

模型表名,必须指定

用来指定使用哪个模型进行预测,对应训练中的model_result_table

PredictTableName

预测数据表名,用来预测的数据,必须指定

id

预测数据表的id列,用作算法的参数,作为标签列,必须指定

col1,col2,…

表的列,用作算法的参数,必须指定

predict_result_table

使用模型进行预测后的结果表

临时表

 

3. 算法说明

表3-45 训练算法

算法名

算法说明

备注

ModelSelect_CrossValidation

使用十则交叉的方式进行模型选择

十则交叉

 

表3-46 预测算法

算法名

算法说明

备注

CrossValidationPrediction

十则交叉模型选择预测函数

十则交叉

 

4. 参数说明

表3-47 参数说明

参数名

参数说明

备注

regParam

控制模型的正则化过程,从而控制模型的过拟合情况

maxIter

优化算法求解的最大迭代次数(>=0),默认值100

threshold

二分类预测的阈值,范围[0,1],double型

thresholds

多分类预测的阈值,array[double]型

checkpointInterval

检查点间隔,当maxiter很大的时候,检查点可以帮助减少shuffle文件大小并且可以帮助故障恢复,int型

fitIntercept

是否训练intercept,boolean型

standardization

训练模型前是否需要对训练特征进行标准化处理,boolean型

seed

随机种子

elasticNetParam

弹性网络混合参数(范围[0,1]),用于调节L1和L2之间的比例,两种正则化比例加起来是1,默认是0,即只使用L2正则化,设置为1,即只使用L1正则化,double型

tol

优化算法迭代求解过程中的收敛阈值,默认值:1e-4,不能为负数,double型

stepSize

每次迭代优化步长,double型

solver

优化的求解算法

aggregationDepth

设置分布式统计时的层数,主要用在treeAggregate中,数据量越大,可适当加大这个值,默认是2,int型

featuresCol

设置特征列

labelCol

设置标签列

predictionCol

设置预测列

inputCol

设置输入列,string型

inputCols

设置输入列,array[string]型

outputCol

设置输出列

weightCol

设置权重列

 

5. 示例

·     十则交叉模型选择

数据准备:

Create table test_table (id int,label double,col1 double,col2 double,col3 double);

Insert into test_table values(1,8.6,5.7,5.9,7.9);

Insert into test_table values(2,7.8,7.8,2.6,12.1);

Insert into test_table values(3,8.8,9.6,4.6,32.1);

Insert into test_table values(4,11.9,12.5,8.6,62.1);

Insert into test_table values(5,16.9,17.5,6.6,32.1);

Insert into test_table values(6,21.9,23.4,10.6,43.2);

语句示例:

select ModelSelect_CrossValidation(label,col1,col2,col3,'model_table1','-Estimator LinearRegression -MaxIter 10000','-NumFolds 3 -G_elasticNetParam 0.4 0.5 -G_regParam 0.1 0.2','-Evaluator Regression -MetricName mse') from test_table;

select CrossValidationPrediction('model_table1',col1,col2,col3,'id','res_table1') from test_table;

select * from res_table1;

3.5.4  特征工程 1. 语法

·     不包含训练过程的特征工程语法

SELECT

FeatureAlgorithmName(’transform_result_table’,’id’,col1,…,[’parms’])

from TableName;

·     包含训练过程的特征工程语法

SELECT

FeatureAlgorithmName(‘model_result_table’,’transform_result_table’,’id’,col1,…,[’parms’]) from TableName;

2. 语法说明

表3-48 不包含训练过程的特征工程语法说明

参数

说明

常见配置选项

FeatureAlgorithmName

特征工程算法名称,必须指定

transform_result_table

转换结果表名,必须指定

TableName

表名,用作算法的数据源,必须指定

id

表的连接列的字段名称,指定tablename表中的列名作为该参数值,必须指定

col1,,…

表的列,用作算法的参数,必须指定

parms

算法的其它参数,根据算法需要,可以不指定(格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …)

 

表3-49 包含训练过程的特征工程语法说明

参数

说明

常见配置选项

FeatureAlgorithmName

特征工程算法名称,必须指定

model_result_table

训练结果模型表,必须指定

transform_result_table

转换结果表名,必须指定

tablename

表名,用作算法的数据源,必须指定

id

表的连接列的字段名称,指定tablename表中的列名作为该参数值,必须指定

col1,,…

表的列,用作算法的参数,必须指定

parms

算法的其它参数,根据算法需要,可以不指定(格式:-ParamKey1 ParamValue1 -ParamKey2 ParamValue2 …)

 

3. 算法说明

表3-50 包含训练的特征工程算法

算法名

算法说明

支持参数

Word2vec

一种嵌入方法,可计算每个单词在给定语料库环境下的分布式词向量

InputCol:String //设置输入列

OutputCol:String //设置输出列

MaxIter:Int //设置最大迭代次数

MaxSentenceLength:Int //设置最大句子长度

MinCount:Int //设置

NumPartitions:Int //设置分区数

Seed:Long //设置种子

StepSize:Double //设置步长

VectorSize:Int //设置转换后的向量的长度

WindowSize:Int //设置

CountVectorizer

通过计数来将一个文档转换为向量

Binary:Boolean //

InputCol:String //设置输入列

MinDF:Double //

MinTF:Double //

OutputCol:String //设置输出列

VocabSize:Int //设置词汇量

Pca

主成分分析,用来降低特征维度

InputCol:String //设置输入列—指定为features

OutputCol:String //设置输出列

K:Int //设置主成分数

stringindexer

将字符串标签列编码为标签索引

InputCol:String //设置输入列

OutputCol:String //设置输出列

vectorindexer

解决向量数据集中的类别特征索引

MaxCategories:Int//最大类别数,小于它的认为是类别特征,否则被认为是连续特征

InputCol:String//设置输入列

OutputCol:String//设置输出列

standardscaler

用来转换一个向量行数据集,规范化每一个特征,使其标准差、均值为0

InputCol:String//设置输入列

OutputCol:String//设置输出列

WithStd:Boolean//设置是否使用标准差

WithMean:Boolean//设置是否使用均值

minmaxscaler

可将向量行缩放到范围[0,1]

InputCol:String//设置输入列

OutputCol:String//设置输出列

maxabsscaler

该特征的绝对值的最大值转换向量行,使其特征在范围[-1,1]内

InputCol:String//设置输入列

OutputCol:String//设置输出列

quantileDiscretizer

连续特征转化为类别特征,类别数通过numBuckets参数来设置

RelativeError:Double//近似的精度控制

NumBuckets:Int//箱化数

InputCol:String//设置输入列

OutputCol:String//设置输出列

RFormula

通过形如clicked ~ country + hour这个关系式,得到新的特征和标签列

FeaturesCol:String //设置特征列

LabelCol:String//设置标签列

Formula:Sring//设置关系式

ChiSqSelector

卡方选择器

FeaturesCol:String//设置特征列

LabelCol:String//设置标签列

OutputCol:Int//输出的属性

NumTopFeatures:Int//选中排在前几个的属性

Percentile:Double//选中排在前百分之几的属性

SelectorType:String//设置卡方选择器的类型

 

表3-51 不包含训练的特征工程算法

算法名

算法说明

支持参数

Tfidf

文本挖掘中使用的特征向量化方法

Binary:Boolean

InputCol:String//设置输入列

OutputCol:String//设置输出列

NumFeatures:Int//设置特征数

MinDocFreq:Int

tokenizer

将文本(如:一个句子)分割成一个个独立的词(通常是一个单词)

InputCol:String//设置输入列

OutputCol:String//设置输出列

stopwordsremover

从数据中删除停用词

CaseSensitive:Boolean//设置大小写敏感

InputCol:String//设置输入列

OutputCol:String//设置输出列

StopWords:Array[String]//设置停用词

ngram

n-gram是一个序列,将输入特征转化为n-gram形式

N:Int //每个ngram中含有的元素个数

InputCol:String//设置输入列

OutputCol:String//设置输出列

binarize

二值化通过阈值将特征映射为(0/1)

InputCol:String//设置输入列

OutputCol:String//设置输出列

Threshold:Double//设置阈值

polynomialExpansion

多项式扩展是将特征映射到多项式空间的过程

Degree:Int

InputCol:String//设置输入列

OutputCol:String//设置输出列

onehotencoder

将标签索引列映射为二进制向量列

InputCol:String//设置输入列

OutputCol:String//设置输出列

DropLast:Boolean//是否删除最后类别

normalizer

可以转换向量行的数据集,规范化每一向量(0到1范围内)

P:Double//设置P值

InputCol:String//设置输入列

OutputCol:String//设置输出列

elementwiseproduct

将每一个向量和权重向量使用元素对应相乘

InputCol:String//设置输入列

OutputCol:String//设置输出列

ScalingVec:Vector//设置乘数向量

vectorSlicer

输入特征向量,输出为新的特征向量,新的特征向量是原始特征向量的子集

Indices:Array[Int]//设置需要映射的索引

InputCol:String//设置输入列

OutputCol:String//设置输出列

Interaction

获取向量或实型列产生单一的向量列,向量列中包含输入向量中所有连接的结果

InputCols::Array[String]//设置输入列

OutputCol::String//设置输出列

VectorAssembler

可以将多个列合并为一个列

InputCols::Array[String] //设置输入列

OutputCol:String//设置输出列

 

4. 示例

·     TFIDF算法

数据准备:

Create table tfidf_table (id int,sentence string);

Insert into tfidf_table values(1, 'test');

Insert into tfidf_table values(2, 'mytest');

运行语句:

select TFIDF('tfidf_trans_res','id',sentence ,'-InputCol sentence -OutputCol features') from tfidf_table;

select * from tfidf_trans_res;

select * from tfidf_trans_res a,tfidf_table b where a.id = b.id;

·     Word2Vec算法

数据准备:

Create table word2vec_table (id int,text array);

Insert into word2vec_table select 1,array('sparrow', 'mysparrow');

Insert into word2vec_table select 2,array('test','mytest');

运行语句:

drop table if exists w2v_mod_res;

select Word2Vec ('w2v_mod_res','w2v_trans_res','id',text,'-VectorSize 3 -MinCount 0 -InputCol text -OutputCol result') from word2vec_table;

select * from w2v_mod_res;

select * from w2v_trans_res;

·     CountVectorizer算法

数据准备:

Create table countvectorizer_table (id int,words array);

Insert into countvectorizer_table select 1,array('sparrow', 'mysparrow');

Insert into countvectorizer_table select 2,array('test', 'mytest');

运行语句:

drop table if exists countvec_mod_res;

select CountVectorizer('countvec_mod_res','countvec_trans_res','id',words,'-VocabSize 3 –MinDF 0 -InputCol words -OutputCol features') from countvectorizer_table;

select * from countvec_mod_res;

select * from countvec_trans_res;

·     PCA 算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,5.7,2.3,4.5,5.6);

Insert into mltable values(2,8.6,6.5,3.8,4.9);

运行语句:

drop table if exists pca_mod_res;

select pca('pca_mod_res','pca_trans_res','id',col1,col2,col3,'-K 3 -InputCol features -OutputCol pcafeatures') from mltable;

select * from pca_mod_res;

select * from pca_trans_res;

·     Tokenizer算法

数据准备:

Create table tokenizer_table (id int,sentence string);

Insert into tokenizer_table values(1, 'test');

Insert into tokenizer_table values(2, 'mytest');

运行语句:

select Tokenizer('tokenizer_trans_res','id',sentence,'-InputCol sentence -OutputCol words') from tokenizer_table;

select * from tokenizer_trans_res;

·     StopWordRemover算法

数据准备:

Create table stopwordremover_table (id int,raw array);

Insert into stopwordremover_table select 1,array('sparrow', 'mysparrow');

Insert into stopwordremover_table select 2, array('test', 'mytest');

运行语句:

select StopWordsRemover(' swr_trans_res ','id',raw,'-InputCol raw -OutputCol filtered') from stopwordremover_table;

select * from swr_trans_res ;

·     Ngram算法

数据准备:

Create table ngram_table (id int,words array);

Insert into ngram_table select 1,array('sparrow', 'mysparrow');

Insert into ngram_table select 2, array('test', 'mytest');

运行语句:

select NGram (' ngram_trans_res ','id', words,'-N 2 -InputCol words -OutputCol ngrams') from ngram_table;

select * from ngram_trans_res;

·     Binarizer算法

数据准备:

Create table binarizer_table (id int,feature double);

Insert into binarizer_table values(1,5.7);

Insert into binarizer_table values(2,2.6);

运行语句:

select binarize('bin_trans_res','id', feature,'-Threshold 0.5 -InputCol feature -OutputCol binarized_feature') from binarizer_table;

select * from bin_trans_res;

·     PolynomialExpansion算法

数据准备:

Create table polynominalexpansion (id int,col1 double,col2 double);

Insert into polynominalexpansion values(1,5.7,11.8);

Insert into polynominalexpansion values(2,2.6,13.6);

运行语句:

select PolynomialExpansion ('pe_trans_res','id',col1,col2,'-Degree 3 -InputCol features -OutputCol polyFeatures') from polynominalexpansion;

select * from pe_trans_res;

·     StringIndex算法

数据准备:

Create table stringindex_table (id int,category string);

Insert into stringindex_table values(1, 'test');

Insert into stringindex_table values(2, 'mytest');

运行语句:

select StringIndexer ('si_mod_res',' si_trans_res ','id', category,' -InputCol category -OutputCol categoryIndex') from stringindex_table;

select * from  si_mod_res;

select * from si_trans_res;

·     OneHotEncoder算法

数据准备:

Create table onehotencoder_table (id int,category_index double);

Insert into onehotencoder_table values(1,5);

Insert into onehotencoder_table values(2,6);

运行语句:

select OneHotEncoder ('onehot_trans_res','id',category_index,' -InputCol category_index -OutputCol categoryVec') from onehotencoder_table;

select * from onehot_trans_res;

·     VectorIndexer算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,5.6,2.3,4.5,5.6);

Insert into mltable values(2,7.8,6.5,3.8,4.9);

运行语句:

select VectorIndexer ('vi_mod_res','vi_trans_res','id',col1,col2,col3,'- MaxCategories 10 -InputCol features -OutputCol indexed') from mltable;

select * from vi_mod_res;

select * from vi_trans_res;

·     Normalizer算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,5.6,2.3,4.5,5.6);

Insert into mltable values(2,7.8,6.5,3.8,4.9);

运行语句:

select Normalizer('norm_trans_res','id',col1,col2,col3,'-P 1.0 -InputCol features -OutputCol normFeatures') from mltable;

select * from norm_trans_res;

·     StandardScaler算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,5.6,2.3,4.5,5.6);

Insert into mltable values(2,7.8,6.5,3.8,4.9);

运行语句:

select StandardScaler ('standard_mod_res','standard_trans_res','id',col1,col2,col3,'-WithStd true -WithMean false -InputCol features -OutputCol scaledFeatures') from mltable;

select * from standard_mod_res;

select * from standard_trans_res;

·     MinMaxScaler算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,5.6,2.3,4.5,5.6);

Insert into mltable values(2,7.8,6.5,3.8,4.9);

运行语句:

select MinMaxScaler('minmax_mod_res','minmax_trans_res','id',col1,col2,col3,'-InputCol features -OutputCol scaledFeatures') from mltable;

select * from minmax_mod_res;

select * from minmax_trans_res;

·     MaxAbsScaler算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,5.6,2.3,4.5,5.6);

Insert into mltable values(2,7.8,6.5,3.8,4.9);

运行语句:

select MaxAbsScaler('maxabs_mod_res','maxabs_trans_res','id',col1,col2,col3,'-InputCol features -OutputCol scaledFeatures') from mltable;

 

select * from  maxabs_mod_res;

select * from maxabs_trans_res;

·     ElementwiseProduct算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,5.6,2.3,4.5,5.6);

Insert into mltable values(2,7.8,6.5,3.8,4.9);

运行语句:

select ElementwiseProduct('ep_trans_res','id',col1,col2,col3,'-ScalingVec 0.0 1.0 2.0 -InputCol vector -OutputCol transformedVector') from mltable;

select * from ep_trans_res;

·     QuantileDiscretizer算法

数据准备:

Create table quantile_discret_table (id int,hour double);

Insert into quantile_discret_table values(1,5.6);

Insert into quantile_discret_table values(2,7.8);

运行语句:

select QuantileDiscretizer('qd_mod_res','qd_trans_res','id',hour,'-NumBuckets 3 -InputCol hour -OutputCol result') from quantile_discret_table;

select * from qd_mod_res;

select * from qd_trans_res;

·     VectorSlicer算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,5.6,2.3,4.5,5.6);

Insert into mltable values(2,7.8,6.5,3.8,4.9);

运行语句:

select VectorSlicer('trans_res','id',col1,col2,col3,'-Indices 1 2 -InputCol userFeatures -OutputCol features') from mltable;

select * from trans_res;

·     RFormula算法

数据准备:

Create table rformula_table (id int,counter string,hour double,clicked double);

Insert into rformula_table values(1, 'henan',2.3,5.6);

Insert into rformula_table values(2, 'hangzhou',6.5,3.8);

运行语句:

select RFormula('rf_mod_res','rf_trans_res','id',counter,hour,clicked,'-Formula clicked~counter+hour -FeaturesCol features -LabelCol label') from rformula_table;

select * from rf_mod_res;

select * from rf_trans_res;

·     ChiSqSelector算法

数据准备:

Create table mltable (id int,label double,col1 double,col2 double,col3 double);

Insert into mltable values(1,5.6,2.3,4.5,5.6);

Insert into mltable values(2,7.8,6.5,3.8,4.9);

运行语句示例:

Select ChiSqSelector('css_mod_res','css_trans_res','id',col1,col2,col3,label,'-NumTopFeatures 1 -FeaturesCol features -LabelCol label -OutputCol selectedFeatures') from mltable;

select * from css_mod_res;

select * from css_trans_res;

·     Interaction算法

数据准备:

Create table vectorassemble_table (id int,col1 double,col2 double,col3 double,col4 double,col5 double ,col6 double);

Insert into vectorassemble_table values(1,5.6,2.3,4.5,5.6,7.8,9.2);

Insert into vectorassemble_table values(2,7.8,6.5,3.8,4.9,8.7,2.6);

运行语句示例:

select Interaction('inter_trans_res','id',col1,col2,col3,col4,col5,col6,'-InputCols vec1 vec2 -OutputCol interactedCol') from vectorassemble_table;

Select * from inter_trans_res;

·     VectorAssembler算法

数据准备:

Create table vectorassemble_table (id int,col1 double,col2 double,col3 double,col4 double,col5 double ,col6 double);

Insert into vectorassemble_table values(1,5.6,2.3,4.5,5.6,7.8,9.2);

Insert into vectorassemble_table values(2,7.8,6.5,3.8,4.9,8.7,2.6);

运行语句示例:

Select VectorAssembler('va_trans_res','id',col1,col2,col3,'-InputCols col1 col2 col3 -OutputCol vec1') from vectorassemble_table;

select * from va_trans_res;

3.5.5  Pipeline机制 1. 语法

·     训练语法

SELECT

pipelinetrain(‘pipeline_model_table’,’stage_algAndparms’,col1,col2,..)

from TableName;

·     预测语法

SELECT

pipelineprediction(‘pipeline_model_table’ ,col1,col2,..,’id’, ’predict_result_table’)

from TableName;

2. 语法说明

表3-52 训练语法说明

参数

说明

常见配置选项

pipelinetrain

指定为pipeline机制训练,且固定

pipeline_modelt_table

训练结果模型表,必须指定

Stage_algAndparms

Pipeline各阶段使用的算法和其参数

id

表的连接列的字段名称,指定tablename表中的列名作为该参数值,必须指定

col1,,…

表的列,用作算法的参数,必须指定

TableName

表名,用作算法的数据源,必须指定

 

表3-53 预测语法说明

参数

说明

常见配置选项

pipelineprediction

指定为pipeline机制预测,必须指定

pipeline_modelt_table

预测使用的模型表,必须指定

col1,,…

表的列,用作算法的参数,必须指定

id

表的连接列的字段名称,指定tablename表中的列名作为该参数值,必须指定

predict_result_table

预测结果表名,将预测结果存储在该表中,必须指定

tablename

表名,用作算法的数据源,必须指定

 

3. 参数说明

Stage_algAndparms:

通过-alg来指定该阶段使用的算法(算法说明,算法说明_1和算法说明_2),其他参数参考对应算法中的说明。

4. 示例

·     创建训练数据

create table pipeline_train_table (id int,text string,label double);

insert into pipeline_train_table values (0,"a b c d e spark",1.0);

insert into pipeline_train_table values (1,"b d",0.0);

insert into pipeline_train_table values (2,"spark f g h",1.0);

insert into pipeline_train_table values (3,"hadoop mapreduce",0.0);

·     使用SQL调用pipeline机制,并查看训练模型存储信息

select piplinetrain('pipeline_model',

       '-alg tokenizer -inputcol text -outputcol words',

       '-alg hashingtf -inputcol words -outputcol features',

       '-alg logisticregression',

       id,text,label)

from pipeline_train_table;

select * from pipeline_model;

·     使用SQL预测新数据,并查看预测结果

create table pipeline_test_table (id int,text string);

insert into pipeline_test_table values (4,"spark i j k"),

(5,"l m n"),

(6,"spark hadoop spark"),

(7,"apache hadoop");

select pipelineprediction('pipeline_model',text,'id','pipeline_res_table') from pipeline_test_table;

select * from pipeline_res_table;

3.6  列加密

DLH支持对String数据类型的列数据进行加密,支持AES、DES和SM4三种加密算法及公钥和私钥两种加密方式。

注意

·     目前DLH只支持对text、orc、parquet格式的表进行列加密。

·     DLH不支持以load的方式将数据导入加密表。

·     当前版本中,暂不支持复杂类型的加密嵌套。

 

用户需要在建表时指定列加密相关参数。列加密参数如表3-54所示,加密参数一旦设置就不允许再进行修改。

表3-54 列加密参数

参数名

说明

encrypt.columns

该参数指定加密列,列必须是String类型

encrypt.algorithm

该参数指定加密算法,支持AES、DES和SM4

encrypt.type

该参数指定加密方式,支持public和private。可以不指定,默认public

encrypt.key

该参数指定密钥。当设置encrypt.type为private时才需要设置此参数。DES加密算法要求密钥的长度大于8位,SM4加密算法要求长度为16

encrypt.pattern

该参数指定加密模式,支持CBC或ECB。在加密算法为SM4时设置

 

3.6.1  公钥加密

使用公钥加密,会将数据加密后再存储。只有使用DLH查询才能得到真实的数据,直接查看数据文件或者通过其他方式查询,只能查看到加密数据。

示例:

·     在DLH中建立ORC格式表,并使用公钥加密和DES加密算法对表的列name、passwd进行数据加密。

create table employee(id int,name string,passwd string) tblproperties("encrypt.columns"="name,passwd","encrypt.algorithm"="DES");

insert into employee values(1,'zhangsan','43211');

select * from employee;

+-----+-----------+---------+

| id  |   name    | passwd  |

+-----+-----------+---------+

| 1   | zhangsan  | 43211   |

+-----+-----------+---------+

·     在DLH中建立parquet格式的表,并使用公钥加密方式和AES加密算法对列salary进行数据加密。

create table salary(id int,name string,salary string) stored as parquet tblproperties ("encrypt.columns"="salary","encrypt.algorithm"="AES");

insert into salary values(1,'zhangsan','8999.5');

select * from salary;

+-----+-----------+---------+

| id  |   name    | salary  |

+-----+-----------+---------+

| 1   | zhangsan  | 8999.5  |

+-----+-----------+---------+

·     在DLH中建立textfile格式的表,并使用公钥加密方式和SM4加密算法的CBC模式对列salary进行数据加密。

create table employeeinfo(id int,city string,address string) stored as textfile tblproperties("encrypt.columns"="city,address","encrypt.algorithm"="SM4","encrypt.pattern"="CBC");

insert into employeeinfo values (1,'hnzz','yulanjie');

select * from employeeinfo;

+-----+-------+-----------+

| id  | city  |  address  |

+-----+-------+-----------+

| 1   | hnzz  | yulanjie  |

+-----+-------+-----------+

·     配置Hive连接DLH使用的metastore地址(如:thrift://sharedev1.hde.com:19083,thrift://sharedev2.hde.com:19083),通过Hive查看这两个表,只能看到加密数据。

[root@node2~]# beeline --hiveconf hive.metastore.uris=thrift://sharedev1.hde.com:19083,thrift://sharedev2.hde.com:19083

beeline> !connect jdbc:hive2://node2:10000

Connecting to jdbc:hive2://node2:10000

Enter username for jdbc:hive2://node2:10000: hdfs

Enter password for jdbc:hive2://node2:10000:

Connected to: Apache Hive (version 2.1.1-cdh6.2.0)

Driver: Hive JDBC (version 2.1.1-cdh6.2.0)

Transaction isolation: TRANSACTION_REPEATABLE_READ

0: jdbc:hive2://node2:10000> select * from employee;

+--------------+---------------------------+------------------+--+

| employee.id  |       employee.name       | employee.passwd  |

+--------------+---------------------------+------------------+--+

| 1            | OYuK0vlc7KHZo7+6lh609A==  | xSIrlXaRby4=     |

+--------------+---------------------------+------------------+--+

1 row selected (3.923 seconds)

0: jdbc:hive2://node2:10000> select * from salary;

+------------+--------------+---------------------------+--+

| salary.id  | salary.name  |       salary.salary       |

+------------+--------------+---------------------------+--+

| 1          | zhangsan     | 0K51RX+6P2FWJ6Rmd7A8Dg==  |

+------------+--------------+---------------------------+--+

1 row selected (1.519 seconds)

 

0: jdbc:hive2://node2:10000> select * from employeeinfo;

+------------------+-------------------------+---------------------------+--+

| employeeinfo.id  |    employeeinfo.name    |      employeeinfo.salary     |

+------------------+-------------------------+---------------------------+--+

| 1                | eCccpUM/EjhaKlzkIl3qg== |    P3YPNJ8bm4Mc3lbdK2vr8w==  |

+------------------+-------------------------+---------------------------+--+

1 row selected (1.519 seconds)

3.6.2  私钥加密

私钥加密也会将数据加密后再存储,同时要求在查询或者插入数据之前先设置私钥,否则认为操作是不被允许的。设置的私钥是对当前Session有效的。

设置私钥的语法如下:

set encrypt.key.db_name.table_name=encrypt_key;

示例:

·     在DLH中建立orc格式的表,并指定私钥加密和DES加密算法对数据表列name、passwd进行数据加密。

create table employee1(id int,name string,passwd string) tblproperties("encrypt.columns"="name,passwd","encrypt.algorithm"="DES","encrypt.type"="private","encrypt.key"="qwertyui");

·     当不设置私钥时,插入和查询都是不允许的。

insert into employee1 values(1,'zhangsan','43211');

java.lang.Exception: encrypt.key is not right

·     设置私钥后可以正常插入和查询。

set encrypt.key.default.employee1=qwertyui;

insert into employee1 values(1,'zhangsan','43211');

select * from employee1;

+-----+-----------+---------+

| id  |   name    | passwd  |

+-----+-----------+---------+

| 1   | zhangsan  | 43211   |

+-----+-----------+---------+

Time taken: 0.111 seconds, Fetched 1 row(s)

·     查看表信息时,encrypt.key也已经被加密。

sparrow> show create table employee1;

CREATE TABLE `employee1`(`id` int, `name` string, `passwd` string)

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'

WITH SERDEPROPERTIES (

  'serialization.format' = '1'

)

STORED AS

  INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'

  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

TBLPROPERTIES (

  'numFiles' = '1',

  'transient_lastDdlTime' = '1536046320',

  'totalSize' = '40',

  'encrypt.algorithm' = 'DES',

  'encrypt.key' = 'FumPB0oW5xSwdVxa/MkAnQ==',

  'encrypt.columns' = 'name,passwd',

  'encrypt.type' = 'private',

  'COLUMN_STATS_ACCURATE' = 'true'

)

3.7  权限访问控制

注意

根据集群是否开启“安全管理/权限管理”功能,集群中新建用户获取DLH组件权限的方式不同:

·     当集群未开启权限管理时,集群中新建用户可直接对DLH组件执行库表的创建、修改、插入、删除等操作,无需进行授权操作。

·     当集群开启权限管理时,集群中新建用户的DLH组件权限需要在[集群权限/角色管理]页面进行配置,集群用户绑定已配置DLH权限的角色后才可执行相关操作。

 

权限管理是集群安全管理的重要组成部分,在开启权限管理的集群中,权限基于角色进行统一管理,角色是权限的集合。

为集群中用户赋予权限的整体流程如下:

(1)     新建角色,并为角色配置权限。

(2)     新建用户,并将角色分配用户,用户即拥有角色所具有的权限。

3.7.1  权限说明

在开启权限管理的集群中,用户对DLH组件的操作需被赋予相关权限后才能执行。

DLH支持对数据库表和数据库UDF配置权限(支持使用通配符*模糊适配),并且提供数据脱敏和行级过滤功能。

·     数据库的配置权限:数据库表可对数据库、数据表、列配置权限,权限包括:select、update、create、drop、alter、index、use,其中all表示配置所有权限。

·     数据库UDF的配置权限:数据库UDF可对数据库、UDF配置权限,权限包括create、drop。

·     数据脱敏:是指按照某种规则对指定列进行脱敏,保证数据安全性,DLH数据脱敏策略如表表3-56所示。

·     行级过滤:是指对表配置行级过滤策略,仅展示满足过滤策略的数据(若过滤策略配置为空则不过滤)。行级过滤类似where子句,示例:配置策略id>1时,查询表时会自动获取id大于1的数据

DLH操作所需权限对应关系如表3-55所示,DLH数据脱敏策略如表3-56所示。

表3-55 DLH权限说明

权限类型

对应的组件常用操作

select

库表查询等相关操作,如:select、export、show、describe等

update

插入或更新库表等相关操作,如:insert、insert overwrite、delete、update、load等

create

创建库表等相关操作,如create table、create database等

【说明】库表的创建者默认不拥有库表的使用权限,但是通过“权限管理”功能可授予其使用权限

drop

删除库表等相关操作,如:drop table等

alter

修改库表等相关操作,如:alter table等

index

索引等相关操作,如:create index等

use

执行show database、use database等操作

all

支持以上所有操作

 

表3-56 DLH数据脱敏策略说明

数据脱敏策略

策略规则

详细说明

MASK

对字母和数字脱敏

·     string:x替代小写字母,X替代大写字母,n替代数字

·     int:1替代数字

·     date:"1900-01-01"替代原值

·     decimal:NULL值替代原值

MASK_SHOW_LAST_4

后四位不脱敏

·     string:仅显示后四位,其他用x代替

·     int:仅显示后四位,其他用1代替;

·     date:"1900-01-01"替代原值

·     decimal:NULL值替代原值

MASK_SHOW_FIRST_4

前四位不脱敏

·     string:仅显示前四位,其他用x代替

·     int:仅显示前四位,其他用1代替

·     date:"1900-01-01"替代原值

·     decimal:NULL值替代原值

MASK_HASH

显示HASH值

·     string:Hash值替代原值

·     int/date/decimal:NULL值替代原值

MASK_NULL

显示NULL值

NULL值替代原值

MASK_NONE

显示原值

显示原值

MASK_DATE_SHOW_YEAR

脱敏时间信息

·     string:x替换数字

·     int:1替代数字

·     date:显示日期年份,月份和日期均用01代替

·     decimal:NULL值替代原值

 

说明

·     若DLH要使用数据脱敏和行级过滤功能,必须在组件详情页面[配置]页签下的高级配置中,配置dlh.query.engine=hive;dlh.query.engine默认值为hive,若切换为dlh.query.engine=presto则不支持数据脱敏和行级过滤功能。

·     新建拥有DLH权限的角色时,若要使用数据脱敏和行级过滤则必须首先配置数据库表的select权限。

 

3.7.2  权限使用操作示例 1. 数据库权限控制

表3-57 数据库授权配置

操作

权限要求(数据库示例为dlhtest)

create database

数据库:*或dlhtest,数据表:*,列:*,权限:create

use database

数据库:*或dlhtest,数据表:*,列:*,权限:use

drop database

数据库:*或dlhtest,数据表:*,列:*,权限:drop

 

下面以授予create权限为例,对dlhtest数据库授予create权限(create权限默认拥有use权限),介绍数据库的权限访问控制(其它权限的授予方式与create权限的操作类似,不再重复说明)。操作步骤如下:

(1)     新建用户

在[集群权限/用户管理]页面,新建dlhuser01用户,授权前执行创建dlhtest数据库操作。如图3-4所示,此时显示dlhuser01用户没有dlhtest数据库的create权限。

图3-4 无执行权限

 

(2)     新建角色

在[集群权限/角色管理]页面,新建dlh01角色,授予dlh01角色dlhtest数据库的create权限。

图3-5 新建角色

 

(3)     用户绑定角色

在[集群权限/用户管理]页面,单击dlhuser01用户对应的按钮,为其绑定dlh01角色。

图3-6 为用户绑定角色进行授权

 

(4)     使用dlhuser01用户重新执行创建dlhtest数据库的操作。如图3-7所示,此时显示dlhtest数据库创建成功。

图3-7 执行成功

 

2. 表权限控制

表3-58 表授权配置

操作

权限要求(数据库示例为dlhtest,表示例为dlhtab1)

create table

数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:create

alter table

数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:alter

insert into table

·     Hive

数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:update

·     YARN

队列:*或default,权限:submit-app

delete from table(针对ACID表)

·     Hive

数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:update

·     YARN

队列:*或default,权限:submit-app

update table(针对ACID表)

·     Hive

数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:update

·     YARN

队列:*或default,权限:submit-app

select * from table

·     Hive

数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:select

·     YARN

队列:*或default,权限:submit-app

create index

数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:index

drop table

数据库:*或dlhtest,数据表:*或dlhtab1,列:*,权限:drop

 

示例1:下面以授予update权限为例,对dlhtest数据库dlhtab1数据表授予update权限,介绍数据表的权限访问控制。操作步骤如下:

(1)     新建用户

在[集群权限/用户管理]页面,新建dlhuser21用户,授权前执行插入dlhtest.dlhtab1数据表的操作。如图3-8所示,此时显示dlhuser21用户没有dlhtest.dlhtab1数据表的update权限。

图3-8 无执行权限

 

(2)     新建角色

在[集群权限/角色管理]页面,新建dlh21角色,授予dlh21角色dlhtest.dlhtab1数据表的update权限。

图3-9 新建角色

 

(3)     用户绑定角色

在[集群权限/用户管理]页面,单击dlhuser21用户对应的按钮,为其绑定角dlh21色。

(4)     使用dlhuser21用户重新执行插入dlhtest.dlhtab1数据表的操作。如图3-10所示,此时显示dlhuser21用户没有提交到yarn default队列的权限。

图3-10 无执行权限

 

(5)     修改角色授权

在[集群权限/角色管理]页面,单击dlh21角色对应的按钮,修改该角色的组件权限设置,授予dlh21角色default队列的submit-app权限。

图3-11 编辑角色

 

(6)     使用dlhuser21用户重新执行插入dlhtest.dlhtab1数据表的操作。如图3-12所示,此时显示dlhtest. dlhtab1数据表插入数据成功。

图3-12 执行成功

 

示例2:下面以授予select权限为例,对dlhtest数据库dlhtab1数据表授予select权限,介绍数据表的权限访问控制。操作步骤如下:

(1)     继续使用dlhuser21用户执行查询dlhtest.dlhtab1数据表的操作。如图3-13所示,此时显示dlhuser21用户没有dlhtest. dlhtab1数据表的select权限。

图3-13 无执行权限

 

(2)     修改角色授权

在[集群权限/角色管理]页面,单击dlh21角色对应的按钮,修改该角色的组件权限设置,授予dlh21角色dlhtest. dlhtab1数据表的select权限。

图3-14 编辑角色

 

(3)     使用dlhuser21用户重新执行查询dlhtest. dlhtab1数据表的操作。如图3-15所示,此时显示查询dlhtest. dlhtab1数据表数据成功。

图3-15 执行成功

 

3. 列级别权限控制

表3-59 授权配置

操作

权限要求(数据库示例为dlhtest,数据表示例为dlhtab1,列示例为name)

insert into table

·     Hive

数据库:*或dlhtest,数据表:*或dlhtab1,列:name,权限:update

·     YARN

队列:*或default,权限:submit-app

select name from table

数据库:*或dlhtest,数据表:*或dlhtab1,列:name,权限:select

 

示例1:下面以授予select权限为例,对dlhtest数据库dlhtab1数据表的name列授予select权限,介绍数据列的权限访问控制。操作步骤如下:

(1)     新建用户

在[集群权限/用户管理]页面,新建dlhuser02用户,授权前执行在dlhtest.dlhtab1数据表中查询name列的操作。如图3-16所示,此时显示dlhuser02用户没有dlhtab1数据表的查询权限。

图3-16 无执行权限

 

(2)     新建角色

在[集群权限/角色管理]页面,新建dlh02角色,授权dlhtest数据库dlhtab1数据表的name列的select权限。

图3-17 新建角色

 

(3)     用户绑定角色

在[集群权限/用户管理]页面,单击dlhuser02用户对应的按钮,为其绑定dlh02角色。

(4)     使用dlhuser02用户重新执行在dlhtest.dlhtab1数据表中查询name列的操作。如图3-18所示,此时显示查询操作执行成功。

图3-18 执行成功

 

示例2:下面以授予update权限为例,对dlhtest数据库dlhtab1数据表的name列授予update权限,介绍数据列的权限访问控制。操作步骤如下:

(5)     继续使用dlhuser02用户在dlhtest. dlhtab1数据表的name列执行插入的操作。如图3-19所示,此时显示没有update权限。

图3-19 无执行权限

 

(6)     修改角色授权

在[集群权限/角色管理]页面,单击dlh02角色对应的按钮,修改该角色的组件权限设置,授予dlh02角色dlhtest. dlhtab1中name列的update权限,及YARN的default队列的submit-app权限。

图3-20 编辑角色

 

(7)     使用dlhuser02用户重新在dlhtest. dlhtab1数据表的name列执行插入的操作。如图3-21和图3-22所示,此时显示插入操作执行成功。

图3-21 执行成功

 

图3-22 查询插入结果

 

4. UDF权限控制

表3-60 授权配置

操作

权限要求(数据库示例为dlhtest,UDF示例为mylowerperm1)

create function

数据库:*或dlhtest,UDF:*或mylowerperm1,权限:create

drop function

数据库:*或dlhtest,UDF:*或mylowerperm1,权限:drop

select function

不需要授权

show functions

不需要授权

 

(1)     创建函数类文件

package com.dlh.hive.udf;

 

import org.apache.hadoop.hive.ql.exec.UDF;

import org.apache.hadoop.io.Text;

 

public final class LowerFunc extends UDF{

    public Text evaluate(final Text s){

        if(s == null){return null;}

        return new Text(s.toString().toLowerCase());

    }

}

其中pom.xml内容如下:

    4.0.0

 

    org.example

    dlhUdfTest

    1.0

 

   

       

        org.apache.hive

        hive-exec

        2.1.1-cdh6.2.0

       

   

   

 

       

           

                org.apache.maven.plugins

                maven-shade-plugin

                2.4.3

           

           

                org.apache.maven.plugins

                maven-compiler-plugin

                3.0

               

                    1.7

                    1.7

               

           

       

 

   

 

(2)     将上述文件编译打包后上传到测试集群中,本示例中打包后名字为dlhUdfTest-1.0.jar。

(3)     新建用户

在[集群权限/用户管理]页面,新建udfuser03用户。如图3-23所示,将udf测试jar包上传到HDFS的/tmp/udflib目录下,在授权前执行create操作。如图3-24所示,此时显示用户没有dlhtest数据库下UDF mylowerperm1的create权限。

图3-23 将udf测试jar包上传到HDFS

 

图3-24 无执行权限

 

(4)     如图3-25所示,修改HDFS的/tmp/udflib目录权限为777,目的是当其他用户调用该函数时拥有该jar包的访问权限。

图3-25 修改HDFS的/tmp/udflib目录权限

 

(5)     新建角色

在[集群权限/角色管理]页面,新建udf03角色,授权dlhtest数据库下UDF mylowerperm1的create权限。

图3-26 新建角色

 

(6)     用户绑定角色

在[集群权限/用户管理]页面,单击udfuser03用户对应的按钮,为其绑定udf03角色。

(7)     使用udfuser03用户重新执行dlhtest数据库下UDF mylowerperm1的create操作,如图3-27和图3-28所示,此时显示create操作执行成功。

图3-27 执行成功

 

图3-28 查看创建的内容

 

说明

在新的Session中执行show functions命令时,可能查看不到新建函数,这是因为之前创建function时连接的DLHServer2和当前连接不同。执行reload functions命令或重启当前连接的DLHServer2,即可重新读取数据库的函数并加载到缓存,此时重新执行show functions命令即可查看到新建函数。

 

5. 数据脱敏配置

数据脱敏是指按照某种规则对指定列进行脱敏,保证数据安全性。配置数据脱敏规则需先配置数据库表的select权限,然后进行数据脱敏配置。下面简单介绍数据脱敏配置的整体流程。

【示例一】

以配置脱敏规则为MASK为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下:

(1)     配置数据库表mask01.t01的select权限,即对数据库mask01数据表t01授予select权限。新建用户dlhmask01和角色dlhmask01,为角色授予select权限后,将该用户绑定到dlhmask01角色,并执行查询表操作。详细过程请参考3.7.2  2. 章节,查询结果如图3-29所示。

图3-29 mask01.t01原始数据

 

(2)     配置脱敏规则MASK

在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则为MASK,即通过规则“x替代小写字母,X替代大写字母,n替代数字”来实现对列name的数据脱敏。

图3-30 配置脱敏规则MASK

 

(3)     执行查询表mask01.t01操作。如图3-31所示,信息表明对mask01库下t01表的列name脱敏成功。

图3-31 MASK脱敏成功

 

【示例二】

以配置脱敏规则为MASK_SHOW_LAST_4为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下:

(1)     配置脱敏规则MASK_SHOW_LAST_4

在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则的脱敏规则为MASK_SHOW_LAST_4,即通过规则“仅显示后四位,其他用x代替”来实现对列name的数据脱敏。

图3-32 配置脱敏规则MASK_SHOW_LAST_4

 

(2)     执行查询表mask01.t01操作。如图3-33所示,信息表明对mask01库下t01表的列name脱敏成功。

图3-33 MASK_SHOW_LAST_4脱敏成功

 

【示例三】

以配置脱敏规则为MASK_SHOW_FIRST_4为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下:

(1)     配置脱敏规则MASK_SHOW_FIRST_4

在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则的脱敏规则为MASK_SHOW_FIRST_4,即通过规则“仅显示前四位,其他用x代替”来实现对列name的数据脱敏。

图3-34 配置脱敏规则MASK_SHOW_FIRST_4

 

(2)     执行查询表mask01.t01操作。如图3-35所示,信息表明对mask01库下t01表的列name脱敏成功。

图3-35 MASK_SHOW_FIRST_4脱敏成功

 

【示例四】

以配置脱敏规则为MASK_HASH为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下:

(1)     配置脱敏规则MASK_HASH

在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则的脱敏规则为MASK_HASH,即通过规则“Hash值替代原值”来实现对列name的数据脱敏。

图3-36 配置脱敏规则MASK_HASH

 

(2)     执行查询表mask01.t01操作。如图3-37所示,信息表明对mask01库下t01表的列name脱敏成功。

图3-37 MASK_HASH脱敏成功

 

【示例五】

以配置脱敏规则为MASK_NULL为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下:

(1)     配置脱敏规则MASK_NULL

在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则的脱敏规则为MASK_NULL,通过规则“NULL值替代原值”来实现对列name的数据脱敏。

图3-38 配置脱敏规则MASK_NULL

 

(2)     执行查询表mask01.t01操作。如图3-39所示,信息表明对mask01库下t01表的列name脱敏成功。

图3-39 MASK_NULL脱敏成功

 

【示例六】

以配置脱敏规则为MASK_NONE为例,对数据库mask01数据表t01的name列进行数据脱敏,操作步骤如下:

(1)     配置脱敏规则MASK_NONE

在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列name的脱敏规则的脱敏规则为MASK_NONE,即“显示原值”。

图3-40 配置脱敏规则MASK_NONE

 

(2)     执行查询表mask01.t01操作。如图3-41所示,数据显示原值。

图3-41 MASK_NONE脱敏成功。

 

 

【示例七】

以配置脱敏规则为MASK_DATE_SHOW_YEAR为例,对数据库mask01数据表t03的birth列进行数据脱敏,操作步骤如下:

(1)     配置数据库表mask01.t03的select权限,即对数据库mask01数据表t03授予select权限,执行查询表操作。详细过程请参考3.7.2  2. 章节,查询结果如图3-42所示。

图3-42 mask01.t03原始数据

 

(2)     配置脱敏规则MASK_DATE_SHOW_YEAR

在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的列birth的脱敏规则的脱敏规则为MASK_DATE_SHOW_YEAR。

图3-43 配置脱敏规则MASK_DATE_SHOW_YEAR

 

(3)     执行查询表mask01. t03操作。如图3-44所示,若birth列为date类型,通过规则“显示日期年份,月份和日期均用01代替”来实现对列birth的数据脱敏。

图3-44 MASK_DATE_SHOW_YEAR脱敏成功

 

注意:若birth列为string类型,配置脱敏规则MASK_DATE_SHOW_YEAR时,年月日均用x替换来实现脱敏。

6. 行级过滤配置

配置行级过滤规则需先配置数据库表的select权限,然后配置行级过滤权限。对数据库mask01数据表t01配置行级过滤策略,操作步骤如下:

(1)     配置数据库表mask01.t01的select权限,即对数据库mask01数据表t03授予select权限,执行查询表操作。详细过程请参考3.7.2  2. 章节(本章仍以dlhmask01用户和dlhmask01角色为例进行说明),当前若已完成该配置,则跳过此步。

(2)     配置行级过滤规则

在[集群权限/角色管理]页面,单击角色列表中dlhmask01角色名进入角色详情页面,在页面右上角单击按钮,配置mask01库下t01表的的过滤规则为“id>123 and name like '%mask%'”(若配置为空则不过滤)。

图3-45 配置行级过滤规则

 

(3)     执行查询表mask01. t01操作。如图3-46所示,即为“id>123 and name like '%mask%'”过滤后的结果。

图3-46 行级过滤结果

 

3.8  添加/删除进程 3.8.1  添加进程

DLH支持添加DLH Metastore、DLHServer2进程。在并发查询任务较多的情况下,可以考虑增加DLH Metastore和DLHServer2进程,分摊查询负载,提高并发数。

1. 操作示例

说明

·     本章节仅以添加DLH Metastore进程为例进行说明,其它进程操作类似不再进行说明。

·     若集群中所有节点均已安装DLH Metastore,添加DLH Metastore进程前则需要先在集群中添加主机,然后再执行添加DLH Metastore进程的操作。如果集群中已有添加进程所需要的主机,则可直接执行添加DLH Metastore进程的操作。

 

添加DLH Metastore进程的操作步骤如下:

(1)     在DLH组件详情页面,在右上角组件操作的下拉框中选择按钮。

(2)     弹出添加进程窗口,如图3-47所示。

a.     选择进程及主机

在选择进程项的下拉列表中选择可添加的组件进程,在选择主机项的主机列表中勾选进程安装在哪一个主机上(支持多选)。

b.     部署进程

选择结束后单击下一步部署进程,直至部署进度条结束(部署过程中不支持中止)。

c.     启动进程

部署进程结束后单击下一步启动进程,直至启动进度条结束(启动过程中不支持中止)。

图3-47 添加进程

 

(3)     查看进程变化

DLH Metastore添加完成之后,在组件详情页面[部署拓扑]页签中可以查看DLH Metastore进程的数量变化以及状态。

(4)     重启组件(根据实际情况选择)

进入集群详情页面,选择[组件]页签,需根据页面提示重新启动相关组件。

3.8.2  删除进程

DLH支持删除DLH Metastore、DLHServer2进程。

·     在并发任务较少的情况下,可以考虑减少DLH Metastore和DLHServer2进程,节省集群资源。

·     删除进程后DLH Metastore、DLHServer2分别对应进程的个数均不能少于2个。

1. 操作示例

说明

·     删除进程操作不仅可以在组件详情页面的[部署拓扑]页签下执行,也可以在[集群管理/主机管理/主机监控]下的主机详情页面执行。本章节仅以“在组件详情页面的[部署拓扑]页签下执行删除DLH Metastore进程操作”为例进行说明,其它进程操作类似不再进行说明。

·     执行删除DLH Metastore进程操作前,请确认DLH Metastore是否有运行的任务,如果有运行的任务时,删除进程会影响任务执行。

 

删除DLH Metastore进程的操作步骤如下:

(1)     在DLH组件详情页面[部署拓扑]页签下,选择要删除DLH Metastore进程的主机,然后单击该进程右侧操作中的按钮,停止DLH Metastore。

(2)     删除DLH Metastore

待DLH Metastore停止成功后,如图3-48所示,在该进程右侧操作中单击按钮,即可完成删除DLH Metastore。

图3-48 删除进程

 

(3)     查看进程变化

DLH Metastore进程删除完成之后,在组件详情页面[部署拓扑]页签中可以查看DLH Metastore进程的数量变化以及状态。

(4)     重启组件(根据实际情况选择)

进入集群详情页面,选择[组件]页签,需根据页面提示重新启动相关组件。

 

4 配置说明 4.1  DLH常用配置 4.1.1  sparrow-default配置

表4-1 sparrow-defaults常用配置

参数名称

参数说明

默认值

spark.eventLog.dir

Sparrow事件日志目录

hdfs:///sparrow-history/

spark.eventLog.enabled

Sparrow事件日志启用标志

true

spark.history.fs.logDirectory

Sparrow历史信息日志目录

hdfs:///sparrow-history/

spark.history.ui.port

Sparrow历史信息UI端口

18082

spark.yarn.historyServer.address

HistoryServer UI的地址

spark.yarn.applicationMaster.waitTries

YARN模式下application等待Master的次数

10

spark.yarn.driver.memoryOverhead

YARN模式下driver占用的内存

384

spark.yarn.max.executor.failures

YARN模式下executor最大失败次数

3

spark.yarn.submit.file.replication

YARN模式下应用程序上载到HDFS上的最大复制数

3

spark.security.authorization.enable

是否开启权限验证

false

 

4.1.2  sparrow-thrift-sparkconf配置

表4-2 sparrow-thrift-sparkconf常用配置

参数名称

参数说明

默认值

spark.security.authorization.enable

JDBC连接是否开启权限验证

false

spark.master

ThriftServer的模式

当YARN的NodeManager只有一个时,该参数为local[4];否则为yarn-client

spark.yarn.security.tokens.hbase.enabled

安全环境中,ThriftServer YARN模式下是否token认证HBase组件

true

spark.yarn.security.credentials.hbase.enabled

安全环境中,ThriftServer YARN模式下是否使用HBase组件凭证

true

spark.yarn.queue

使用的YARN队列

default

spark.yarn.submit.file.replication

文件上传到HDFS的复本数

3

spark.yarn.driver.memoryOverhead

每个driver可以分配的非堆存储内存,这些内存用于如VM,字符串常量池以及其他本地额外开销等,单位:M

384

spark.yarn.executor.memoryOverhead

每个executor可以分配的非堆存储内存,这些内存用于如VM,字符串常量池以及其他本地额外开销等,单位:M

384

 

4.1.3  sparrow-hive-site-override配置

表4-3 sparrow-hive-site-override常用配置

参数名称

参数说明

默认值

hive.server2.thrift.port

JDBC连接的端口号

10017

hive.server2.transport.mode

传输模式,分为http和binary

binary

hive.metastore.client.socket.timeout

socket连接Hive Metastore的超时时长

1800

hive.security.authorization.manager

Metastore授权管理类的名称

org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider

 

4.1.4  dlh-site配置

表4-4 dlh-site常用配置

参数名称

参数说明

默认值

hive.input.format

输入格式,默认HoodieCombineHiveInputFormat适配hudi表

org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat

hive.server2.thrift.port

Dlh组件JDBC连接的端口号

13000

hive.metastore.uris

Dlh组件元数据库对应的地址

-

hive.presto.enable

是否启用presto

true

hive.strict.checks.cartesian.product

是否开启笛卡尔积sql校验

true

hive.flink.enable

是否启用flink

true

hive.fetch.task.conversion

是否将简单的查询简化为直接从文件中读取。none:禁用该功能;minimal:select * ,limit, filter在一个表所属的分区表上操作,直接从文件中读取数据;more:select,filter,limit 都直接从文件中读取数据

more

hive.execution.engine

Dlh组件默认执行引擎

spark

hive.server2.webui.port

Dlh组件UI页面端口

13002

hive.presto.table.threshold

表大小默认阈值为10GB,单位为B。当执行引擎为auto时,若表容量小于该阈值则查询走presto执行,若表容量大于该阈值则查询走hive执行

10737418240

hive.presto.http-server.http.port

Presto的协调器节点的地址,比如:http://localhost:18089。若集群开启了HA,可配置为虚拟主机名及代理端口,比如:http://testclr-vserver.vip.hde.com:18090,这部分信息可以从Presto组件的自定义config配置http.proxy.url获取

-

hive.compute.query.using.stats

是否使用元数据统计信息返回count统计值

true

 

4.2  Atlas常用配置 4.2.1  application-properties配置

表4-5 application-properties常用配置

参数名称

参数说明

默认值

atlas.rest.address

Atlas服务Rest接口地址

atlas.server.http.port

Atlas服务Rest接口服务端口

31000

atlas.notification.topics

Atlas使用Kafka topics

ATLAS_HOOK,ATLAS_ENTITIES

atlas.notification.replicas

Atlas使用Kafka topic的副本数

1

atlas.kafka.zookeeper.connect

Atlas使用Kafka的zookeeper地址

atlas.kafka.bootstrap.servers

Atlas使用Kafka的broker地址

atlas.graph.index.search.solr.zookeeper-url

Atlas使用Infra-solr的zookeeper地址

atlas.graph.index.search.solr.mode

Atlas使用Infra-Solr的集群模式

Cloud

atlas.audit.hbase.tablename

Atlas审计使用HBase的表名

ATLAS_ENTITY_AUDIT_EVENTS

atlas.graph.storage.hbase.table

Atlas图结构数据存储使用的HBase的表名

atlas_janus

atlas.audit.hbase.zookeeper.quorum

Atlas使用HBase的zookeeper地址

atlas.graph.storage.hostname

Atlas使用HBase的主机名地址

 

5 常见问题解答 1. DLH执行简单count任务时,如 select count(1) from XXX,获取统计值不准确

·     原因分析:DLH中默认hive.compute.query.using.stats=true,为加快查询速度,使用元数据统计信息返回count统计值,但该统计信息会由于写表方式不同,导致统计不准确。

·     解决方法:DLH引擎执行count任务前设置参数set hive.compute.query.using.stats =false,启动count统计任务。

 



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有