当我的Flink SQL使用带有Kerberos身份验证的Kafka并将其提交给yarn时,为什么它总是无法通过Kafk



在Flink SQL上使用带有Kerberos身份验证的Kafka数据源,Flink的本地测试已经通过,但当我将任务推送到yarn时,错误消息提示找不到Kafka的JAAS文件!

Flink SQL相关代码如下:

create table source_sensor(
id VARCHAR,
ts bigint,
vc double)
WITH (
'connector' = 'kafka',
'topic' = 'huangfu_0110',
'scan.startup.mode' = 'latest-offset',
'properties.group.id' = '1',
'properties.bootstrap.servers' = '10.0.120.23:9092',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'GSSAPI',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.sasl.kerberos.principal.to.local.rules' = 'kafka/cdh-test-1@CDH.COM',
'properties.sasl.sasl.jaas.config' = 'com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/ddmp/kafka/kafka.keytab" principal="kafka/cdh-test-1@CDH.COM";',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'json.fail-on-missing-field' = 'false'
);
create table sink_sensor(
id VARCHAR,
ts bigint,
vc double)
WITH (
'connector'='jdbc',
'url'='jdbc:mysql://10.0.10.118:3306/yf1?useSSL=true',
'table-name'='sink_table',
'username'='root',
'password'='123456',
'sink.buffer-flush.interval'='10s',
'sink.buffer-flush.max-rows'='10000'
);
insert into sink_sensor select * from source_sensor;

提交后,Flink报告以下错误

org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:55) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:551) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [byit-flink-sql-engine.jar:4.1.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /yarn/nm/usercache/ddmp/appcache/application_1642585778653_0040/jaas-2809721433806366634.conf
at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) ~[byit-flink-sql-engine.jar:4.1.1]
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741) ~[byit-flink-sql-engine.jar:4.1.1]
... 15 more

Flink SQL中配置文件和本地目录中的所有文件路径都存在

我观察到的错误是,在连接Kafka时,他总是在[/yarn/nm/usercache/ddmp/appcache/application_1642585778653_0040/jaas-2809721433806366634.conf]目录中查找文件,这是非常奇怪的

如果你能帮我解决问题,我将不胜感激!

这是由flink集群kerberos配置引起的。只需要在flink-conf.yaml上设置一些配置就可以让它正常工作!

设置如下:

security.kerberos.login.use-ticket-cache: false 
security.kerberos.login.keytab: /etc/kafka/kafka.keytab
security.kerberos.login.principal: kafka@HADOOP.COM
security.kerberos.login.contexts: Client,KafkaClient

类似的问题:Flink SQL客户端连接到安全的kafka集群

相关内容

  • 没有找到相关文章

最新更新