如何修复"incompatible types: org.apache.beam.sdk.options.ValueProvider<java.lang.String> cannot be



我点击了这个链接创建了一个模板,该模板构建了一个梁管道以从KafkaIO读取。但我总是遇到"不兼容的类型:org.apache.beam.sdk.options.ValueProvider无法转换为java.lang.String"。是".withBootstrapServers(options.getKafkaServer((("行导致了错误。光束版本是 2.9.0,这是我代码的一部分。

public interface Options extends PipelineOptions {
    @Description("Kafka server")
    @Required
    ValueProvider<String> getKafkaServer();
    void setKafkaServer(ValueProvider<String> value);
    @Description("Topic to read from")
    @Required
    ValueProvider<String> getInputTopic();
    void setInputTopic(ValueProvider<String> value);
    @Description("Topic to write to")
    @Required
    ValueProvider<String> getOutputTopic();
    void setOutputTopic(ValueProvider<String> value);
    @Description("File path to write to")
    @Required
    ValueProvider<String> getOutput();
    void setOutput(ValueProvider<String> value);
}
public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline p = Pipeline.create(options);
    PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read()
            .withBootstrapServers(options.getKafkaServer())
            .withTopic(options.getInputTopic())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata() 
    )

以下是我运行代码的方式:

mvn compile exec:java 
-Dexec.mainClass=${MyClass} 
-Pdataflow-runner -Dexec.args=" 
--project=${MyClass} 
--stagingLocation=gs://${MyBucket}/staging 
--tempLocation=gs://${MyBucket}/temp 
--templateLocation=gs://${MyBucket}/templates/${MyClass} 
--runner=DataflowRunner"

> 为了通过 ValueProvider 访问值,您需要使用 get 方法,然后获取具有具体类型的值。

例如:当有选项时:

ValueProvider<String> getKafkaServer();

您可以通过以下方式访问它:

getKafkaServer().get()这将返回字符串对象。

似乎KafkaIo Api需要获取字符串参数而不是ValueProvider,您必须从ValueProvider包装器中提取值。

我可能会发现问题,即不支持 kafkaIO。以下是来自谷歌创建模板。

"某些 I/O 连接器包含接受 ValueProvider 对象的方法。若要确定对特定连接器和方法的支持,请参阅 I/O 连接器的 API 参考文档。支持的方法具有 ValueProvider 的重载。如果方法没有重载,则该方法不支持运行时参数。以下 I/O 连接器至少具有部分价值提供程序支持:

基于文件的 IO:TextIO、AvroIO、FileIO、TFRecordIO、XmlIO大查询IO*BigtableIO(需要 SDK 2.3.0 或更高版本(PubSubIO扳手IO">

相关内容

最新更新