在Flink runner上通过KafkaIO消耗事件时,Apache Beam Pipeline失败



我有一个带有几个阶段的波束管道,它通过KafkaIO消耗数据,代码如下所示,

pipeline.apply("Read Data from Stream", StreamReader.read())
.apply("Decode event and extract relevant fields", ParDo.of(new DecodeExtractFields()))
.apply(...);

StreamReader.read()方法实现,

public static KafkaIO.Read<String, String> read() {
return KafkaIO.<String, String>read()
.withBootstrapServers(Constants.BOOTSTRAP_SERVER)
.withTopics(Constants.KAFKA_TOPICS)
.withConsumerConfigUpdates(Constants.CONSUMER_PROPERTIES)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
//Line-A  .withMaxReadTime(Duration.standardDays(10))
.withLogAppendTime();
}

Direct Runner上运行管道时,它运行时不会引发任何错误。但在我的情况下,我必须使用Flink Runner,并且当管道在Flink Runner上运行时,它会抛出以下错误,

Exception in thread "main" java.lang.RuntimeException: Error while translating UnboundedSource: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@14b31e37
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:250)
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:336)
at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:161)
....
at Main.main(Main.java:6)
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @2c34f934
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:106)
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @2c34f934

该错误可以通过取消注释上述StreamReader.read()方法中的Line-A来解决,但withMaxReadTime(...)不应使用该方法,而应根据文档进行测试/演示。

管道实例化是这样完成的,

PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
pipelineOptions.setRunner(FlinkRunner.class);
Pipeline pLine = Pipeline.create(pipelineOptions);

问题:

  1. 为什么会出现此错误
  2. 我该如何解决这个问题

如果可能,请提供一些相关资源。

错误似乎不在Beam中,而是在Flink的闭包清理器中,它修改了用户或SDK代码的私有部分。这似乎是最近版本的Java和Flink的一个已知问题。请参阅示例flink作业中的错误消息:无法使字段私有的最终字节[]java.lang.String.value可访问

为什么评论行会改变事情?通常,在阅读卡夫卡时,你会以无限制的方式阅读流。当指定withMaxReadTime时,它将变为有界读取。因此,对基础Flink运算符的转换是不同的。

相关内容

  • 没有找到相关文章

最新更新