在Confluent Kafka上验证时出现 kerberos错误



我一直在尝试理解apache beam, confluent kafka和数据流与python 3.8和beam sdk 2.7的集成,期望的结果是建立一个管道(将在数据流上运行),它从confluent kafka中消费,并在gcp上记录消息。(我正在使用JDK 17 btw)

这是我使用的代码:

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'
with open('cluster.configuration.json') as cluster:
data=json.load(cluster)
cluster.close()
def logger(element):
logging.INFO('Something was found')  
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":"SASL_SSL",
"sasl.mechanisms":"PLAIN",
"session.timeout.ms":data["session.timeout.ms"],
"group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username="{data["sasl.username"]}" password="{data["sasl.password"]}";',
"auto.offset.reset":"earliest"
}

def main():

print('======================================================')
beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
with beam.Pipeline(options=beam_options) as p:
msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'])
msgs | beam.FlatMap(logger)

if __name__ == '__main__':    
main()

我已经用数据流测试了这个管道,但也用直接运行器测试了这个管道,在两个运行器上我都得到了这个错误:"在获取主题元数据时超时"

这个错误似乎是由于消费者无法认证到confluent kafka,因为我得到了这些警告:

WARNING:root:severity: WARN
timestamp {
seconds: 1650473787
nanos: 331000000
}
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Bootstrap broker "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null) disconnected"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/ParDo(GenerateKafkaSourceDescriptor)/ParMultiDo(GenerateKafkaSourceDescriptor)"
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "21"

在这个警告之后我得到了另一个警告:

message: "[Consumer clientId=consumer-tto-1, groupId=tto] Error connecting to node "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null)"
trace: "java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]ntat
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login

两件重要的事情是,我已经在python上构建了一个消费者,但没有ReadFromKafka apache beam io,它完美地连接和消费到主题,所以我使用的凭证是相同的,我有相同的协议"SASL_SSL"PLAIN"(与此相关的,我也不知道为什么是一个kerberos错误弹出,因为我不使用kerberos身份验证)…另一件事是转换'ReadFromKafka'是通过扩展服务使用的,因为这个转换只支持java,但与apache beam我可以在Python上使用它。

好吧,这个错误很容易修复,我有一个拼写错误。机制',所以属性没有被识别。

代替sasl。机制使用sasl.mechanism.

最新更新