最佳实践 您所在的位置:网站首页 用迅雷做种子 最佳实践

最佳实践

#最佳实践| 来源: 网络整理| 查看: 265

在SQL 任务中使用kerberos 认证 的Kafka授权示例作业配置在SQL 任务中使用kerberos 认证 的Kafka

本文基于flink 1.12 引擎测试

由于有数开发平台集成了kerberos 做集群的安全认证,在任务提交运行过程中,都需要去认证hdfs和yarn. 平台已在flink conf 配置中添加作业运行过程中需要的认证配置,可以在任意任务的flink 运行页面, Job Manager -> Configuration 参数中查看。相关参数请查阅社区参数配置https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/config.html#auth-with-external-systems 由于实时计算产品目前不支持多用户的方式进行集群认证,因此kafka 的认证用户 需要和平台配置的认证用户保持一致。目前平台使用sloth 用户登录yarn 和hdfs ,如果需要在平台使用kerberos 认证的kafka ,则需要kafka 管理人员通过ACLs 对作业涉及到的topic、消费者组为sloth用户进行相关授权配置

ACLs授权 相关操作,请参阅kafka 官方文档 https://kafka.apache.org/documentation/#security_authz

授权示例

查看授权

./kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka -list

生产者授权

注意 用户必须指定为sloth

./kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka -add --allow-principal User:sloth --producer --topic test_demo63

消费者组授权

注意 用户必须指定为sloth

/kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka-add --allow-principal User:sloth --consumer --group=* --topic test_demo63 作业配置

flink sql kafka DDL 作业配置示例如下

CREATE TABLE sink_table ( order_id int, order_date VARCHAR, customer_name VARCHAR, price decimal(10,3), product_id int, order_status boolean ) WITH ( 'connector' = 'kafka', 'topic' = 'test_demo63', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'broker1:9092', 'format' = 'json', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name'='kafka' );

相较于不带kerberos 认证的kafka ,需要增加如下3个参数,无论kafka 作为source 或者sink。 作为sink 时需要注意的是,指定的消费者组必须为sloth 用户做授权配置

'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name'='kafka'

同时需要为作业添加一个高级参数配置,以将平台配置的kerberos 认证同时也作为kafka 的认证 参数如下:

security.kerberos.login.contexts KafkaClient

Sloth 版本为384 时,在任务页面右侧的高级参数处添加即可

Sloth 版本为大于3901 时,需要先将任务提交发布,然后在作业配置的,Flink 高级配置处添加

作业完整sql 示例

create table datagen_source( order_id int, order_date varchar, customer_name varchar, price decimal(10,3), product_id int, order_status boolean )with( 'connector' = 'datagen', 'rows-per-second' = '1' ); CREATE TABLE sink_table ( order_id int, order_date VARCHAR, customer_name VARCHAR, price decimal(10,3), product_id int, order_status boolean ) WITH ( 'connector' = 'kafka', 'topic' = 'test_demo63', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'broker1:9092', 'format' = 'json', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name'='kafka' ); insert into sink_table select * from datagen_source;


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有