KafkaIO withBootStrapServers



我试图获得一个服务器ID作为参数,同时使用ValueProvider执行运行命令

对于选项接口中的值提供者:

ValueProvider<String> getKafkaServer();
void setKafkaServer(ValueProvider<String> value);

withBootstrapServers抛出错误,不兼容的类型:org.apache.beam.sdk.options.ValueProvider不能转化为java.lang.String"

PCollection<String> kafkaMessages = p
.apply("Read from Kafka", KafkaIO.<Long, String>read()
.withBootstrapServers(options.getBootstrapServers())

这个答案建议使用options.getBootstrapServers().get(),但这会产生以下错误

[ERROR] - Expected getter for property [getbootstrapServers] of type [org.apache.beam.sdk.options.ValueProvider]
[ERROR] - Expected setter for property [bootstrapServers] of type [org.apache.beam.sdk.options.ValueProvider]

任何帮助解决这个问题是非常感谢

ValueProviders只对已经支持它们的转换(包括源)有效。它们只需要用于遗留数据流模板。通常可以为管道选项使用具体值(例如String getKafkaServer()),该值可以在管道启动时传递。如果您想将此选择推迟到模板实例化时,请使用flex模板。

相关内容

  • 没有找到相关文章

最新更新