无法重新验证Kafka客户端SASL连接,应关闭连接



根据KIP-368(https://cwiki.apache.org/confluence/display/KAFKA/KIP-368),当"connections.max.reauth.ms"显式设置为正数时,服务器将断开任何未重新身份验证的SASL连接。如果重新身份验证尝试失败,则代理将关闭连接,不支持重试。然而,当我的客户端无法重新进行身份验证时,它将进入无限的重试循环。

INFO [kafka-producer-network-thread | producer-1] org.apache.kafka.common.network.Selector: [Producer clientId=producer-1][Producer clientId=producer-1] Failed authentication with 10.4.252.249/10.4.252.249 (Authentication failed during authentication due to invalid credentials with SASL mechanism)
ERROR [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1] Connection to node 0 (10.4.252.249/10.4.252.249:9096) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism
INFO [kafka-producer-network-thread | producer-1] org.apache.kafka.common.network.Selector: [Producer clientId=producer-1][Producer clientId=producer-1] Failed authentication with 10.4.252.249/10.4.252.249 (Authentication failed during authentication due to invalid credentials with SASL mechanism)
ERROR [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1] Connection to node 0 (10.4.252.249/10.4.252.249:9096) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism

我希望客户退出,这样我就可以冒出异常。有什么想法可以解决吗?

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password

当身份验证失败时,Kafka会抛出SaslAuthenticationException。您可以用try-catch包围客户端代码,并在catch块中关闭客户端。以管理客户端为例。

//... init other properties
// set request time out, the default timeout takes too long
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
// get client
Admin admin = Admin.create(properties);
// test connection, if error happens, close it, in case of infinite retrying to connect to kafka by admin client, because of the meta data update thread
try {
    admin.listTopics().names().get();
} catch (Exception e) {
    closeClient(admin);
    throw new RRException(e);
}

最新更新