最佳实践 | 您所在的位置:网站首页 › 用迅雷做种子 › 最佳实践 |
在SQL 任务中使用kerberos 认证 的Kafka授权示例作业配置在SQL 任务中使用kerberos 认证 的Kafka 本文基于flink 1.12 引擎测试 由于有数开发平台集成了kerberos 做集群的安全认证,在任务提交运行过程中,都需要去认证hdfs和yarn. 平台已在flink conf 配置中添加作业运行过程中需要的认证配置,可以在任意任务的flink 运行页面, Job Manager -> Configuration 参数中查看。相关参数请查阅社区参数配置https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/config.html#auth-with-external-systems 由于实时计算产品目前不支持多用户的方式进行集群认证,因此kafka 的认证用户 需要和平台配置的认证用户保持一致。目前平台使用sloth 用户登录yarn 和hdfs ,如果需要在平台使用kerberos 认证的kafka ,则需要kafka 管理人员通过ACLs 对作业涉及到的topic、消费者组为sloth用户进行相关授权配置ACLs授权 相关操作,请参阅kafka 官方文档 https://kafka.apache.org/documentation/#security_authz 授权示例查看授权 ./kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka -list生产者授权 注意 用户必须指定为sloth ./kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka -add --allow-principal User:sloth --producer --topic test_demo63消费者组授权 注意 用户必须指定为sloth /kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka-add --allow-principal User:sloth --consumer --group=* --topic test_demo63 作业配置flink sql kafka DDL 作业配置示例如下 CREATE TABLE sink_table ( order_id int, order_date VARCHAR, customer_name VARCHAR, price decimal(10,3), product_id int, order_status boolean ) WITH ( 'connector' = 'kafka', 'topic' = 'test_demo63', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'broker1:9092', 'format' = 'json', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name'='kafka' );相较于不带kerberos 认证的kafka ,需要增加如下3个参数,无论kafka 作为source 或者sink。 作为sink 时需要注意的是,指定的消费者组必须为sloth 用户做授权配置 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name'='kafka'同时需要为作业添加一个高级参数配置,以将平台配置的kerberos 认证同时也作为kafka 的认证 参数如下: security.kerberos.login.contexts KafkaClientSloth 版本为384 时,在任务页面右侧的高级参数处添加即可 Sloth 版本为大于3901 时,需要先将任务提交发布,然后在作业配置的,Flink 高级配置处添加 作业完整sql 示例 create table datagen_source( order_id int, order_date varchar, customer_name varchar, price decimal(10,3), product_id int, order_status boolean )with( 'connector' = 'datagen', 'rows-per-second' = '1' ); CREATE TABLE sink_table ( order_id int, order_date VARCHAR, customer_name VARCHAR, price decimal(10,3), product_id int, order_status boolean ) WITH ( 'connector' = 'kafka', 'topic' = 'test_demo63', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'broker1:9092', 'format' = 'json', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name'='kafka' ); insert into sink_table select * from datagen_source; |
CopyRight 2018-2019 实验室设备网 版权所有 |