尝试使用启用 Kerberos 的 Apache Kafka(0.9( 与 Apache Spark 1.6.3.时出错.Zookeeper 版本是 3.4.5 我必须连接到两个卡夫卡。一个启用了keberos,另一个没有,所以我没有在Spark执行器的额外java选项中设置java.security.auth.login.config属性。
Kafka Initialization failed: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:648)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:542)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:524)
at com.spark.receiver.helper.KafkaChannelHelper.initializeConnection(KafkaChannelHelper.java:277)
at com.spark.receiver.helper.KafkaChannelHelper$2.run(KafkaChannelHelper.java:240)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in `/home/user/kafka_client.conf`.
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:74)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:79)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:577)
... 4 more
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in `/home/user/kafka_client.conf`.
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:294)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at org.apache.kafka.common.security.kerberos.LoginManager.<init>(LoginManager.java:44)
at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55)
... 7 more
java.security.auth.login.config 是在 Consumer 本身中设置的。连接到kafkaConsumer的代码是:
public void initializeConnection() {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
System.setProperty("java.security.auth.login.config", jassFilePath);
try {
this.consumer = new KafkaConsumer<String, byte[]>(props);
} catch (Exception e) {
LOGGER.error("Kafka Initialization failed: ", e);
}
}
kafka_client.conf 仅包含以下部分:
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
debug=true
useKeyTab=true
keyTab="/etc/security/keytabs/user.keytab"
storeKey=true
principal="user@REALM"
serviceName="kafka";
};
在向/从安全环境发布/使用数据之前,应考虑两件事:
- 配置安全协议
Properties props = new Properties();
props.put("security.protocol", "PLAINTEXTSASL");
- 传递 jaas 配置以及 java vm 选项
java -Djava.security.auth.login.config=/home/kafka-user/kafka-jaas.conf
-Djava.security.krb5.conf=/etc/krb5.conf
-Djavax.security.auth.useSubjectCredsOnly=false
-cp hdp-kafka-sample-1.0-SNAPSHOT.jar:/usr/hdp/current/kafka-broker/libs/*
hdp.sample.KafkaProducer one.hdp:6667 test
查看 secure-kafka-java-producer-with-kerberos 以获得完整的解释。
我对 kafka 1.11.0 也有类似的问题。
同一 JVM 中的监视程序正在访问多个代理,一些代理正在使用 SASL Kerberos,而另一些则不安全。
该参数由程序自身在访问安全集群时添加。
-Djava.security.auth.login.config=/home/kafka-user/kafka-jaas.conf
但是程序会抛出一个异常:
Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /path/to/jaas/kafka_client_jaas_usekeytab.conf
奇怪的是,java.security.auth.login.config
确实设置正确,并且此文件中的内容很好。
另一个具有单个群集的程序工作正常。
Kafka 官方文档 Kafka 客户端的 JAAS 配置说:
Clients may specify JAAS configuration as a producer or consumer property without creating a physical configuration file.
This mode also enables different producers and consumers within the same JVM to use different credentials by specifying different properties for each client.
If both static JAAS configuration system property java.security.auth.login.config and client property sasl.jaas.config are specified, the client property will be used.
这里的另一个问题是:
他只面临一些问题java.security.auth.login.config
.
也许解决方案是:
在程序中提供sasl.jaas.config
和java.security.auth.login.config
。
我将尝试为这种情况进行验证。