Flink 版本 1.9.0
斯卡拉版本 2.11.12
卡夫卡集群版本 2.3.0
我正在尝试将我所做的 flink 作业连接到具有 3 个分区的 kafka 集群。我已经针对在我的本地主机上运行的 kafka 集群主题测试了我的工作,该主题有一个分区,它可以读取和写入本地 kafka。当我尝试连接到具有多个分区的主题时,出现以下错误(topicName 是我尝试使用的主题的名称。奇怪的是,当我尝试生成多分区主题时,我没有任何问题。
java.lang.RuntimeException: topicName
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
我的使用者代码如下所示:
def defineKafkaDataStream[A: TypeInformation](topic: String,
env: StreamExecutionEnvironment,
SASL_username:String,
SASL_password:String,
kafkaBootstrapServer: String = "localhost:9092",
zookeeperHost: String = "localhost:2181",
groupId: String = "test"
)(implicit c: JsonConverter[A]): DataStream[A] = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBootstrapServer)
properties.setProperty("security.protocol" , "SASL_SSL")
properties.setProperty("sasl.mechanism" , "PLAIN")
val jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username="%s" password="%s";"
val jaasConfig = String.format(jaasTemplate, SASL_username, SASL_password)
properties.setProperty("sasl.jaas.config", jaasConfig)
properties.setProperty("group.id", "MyConsumerGroup")
env
.addSource(new FlinkKafkaConsumer(topic, new JSONKeyValueDeserializationSchema(true), properties))
.map(x => x.convertTo[A](c))
}
我是否应该设置另一个属性以允许单个作业从多个分区使用?
在挖掘并质疑我过程中的所有内容后,我发现了问题所在。
我查看了具有运行时异常的KafkaPartitionDiscoverer函数的Java代码。
我注意到的一个部分处理了运行时异常
if (kafkaPartitions == null) {
throw new RuntimeException("Could not fetch partitions for %s. Make sure that the topic exists.".format(topic));
}
我正在处理一个我不维护的 kafka 集群,并且有一个我没有先验证的主题名称。当我使用以下命令验证它时:
kafka-topics --describe --zookeeper serverIP:2181 --topic topicName
它返回的响应为:
Error while executing topic command : Topics in [] does not exist
ERROR java.lang.IllegalArgumentException: Topics in [] does not exist
at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:435)
at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:350)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
在我得到正确的主题名称后,一切正常。