更新到1.14.2版本时发生Flink BlockElement异常



以前,flink 1.13.1的一切都很好,最近我们将其更新为flink 1.14.2,下面的代码是运行:,它抛出了这个异常:

<T> DataStream<Tuple3<String, String, T>> returnsInternal(SiddhiOperatorContext siddhiContext, String[] executionPlanIds) {
if (createdDataStream == null) {
DataStream<Tuple2<StreamRoute, Object>> mapped = this.dataStream.map(new MapFunction<Tuple2<StreamRoute, Object>, Tuple2<StreamRoute, Object>>() {
@Override
public Tuple2<StreamRoute, Object> map(Tuple2<StreamRoute, Object> value) throws Exception {
if (executionPlanIds != null && executionPlanIds.length != 0) {
for (String executionPlanId : executionPlanIds) {
if (!executionPlanId.isEmpty()
&& siddhiContext.getExecutionPlan(executionPlanId).IsUsedStream(value.f0.getInputStreamId())) {
value.f0.addExecutionPlanId(executionPlanId);
}
}
}
return value;
}
});

createdDataStream = SiddhiStreamFactory.createDataStream(siddhiContext, mapped);
}
return createdDataStream;
}

异常调用堆栈如下:

org.apache.flink.api.common.InvalidProgramException:BlockElement的实现是不可序列化的。对象可能包含或引用不可序列化的字段。

在org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)在org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)在org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)在org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)在org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)在org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)在org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(流执行环境.java:2139)在org.apache.flink.streaming.api.datastream.datastream.clean(datastream.java:203)在org.apache.flink.streaming.api.datastream.datastream.map(datastream.java:577)在org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSidhiStreamBase.returnsInternal(ExecutionSiidhistreamBase.java:135)在org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSidhiStreamBase.returnsInternal(ExecutionSiidhistreamBase.java:123)在org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSidhiStream.returnAsRow(ExecutionSiidhistream.java:180)在org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSidhiStream.returnAsRowWithQueryId(ExecutionSiidhistream.java:165)在org.apache.flink.streaming.siddhi.SiddhiCEPITCase.testSimplePojoStreamAndReturnRowWithQueryId(SiddhiCEPITCase.java:245)位于的sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)位于java.lang.reflect.Method.ioke(Method.java:498)org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)在org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)在org.junit.runners.model.FrameworkMethod.invokeExploly(FrameworkMethod.java:56)在org.junit.internal.runners.statements.InvokeMethod.eevaluate(InvokeMethod.java:17)在org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)在org.junit.rules.ExternalResource$1.eevaluate(ExternalResource.java:54)在org.apache.flink.util.TestNameProvider$1.eevaluate(TestNameProvider.java:45)网址:org.junit.rules.TestWatcher$1.eevaluate(TestWatcher.java:61)org.junit.runners.ParentRunner$3.评估(ParentRunner.java:306)org.junit.runners.BlockJUnit4ClassRunner$1.eevaluate(BlockJUnit4ClassRunner.java:100)网址:org.junit.runners.ParentRunner.runLeaf(ParentRunner:java:366)org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)在org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)网址:org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)org.junit.runners.ParentRunner$2.eevaluate(ParentRunner.java:293)org.junit.rules.ExternalResource$1.eevaluate(ExternalResource.java:54)在org.junit.rules.ExternalResource$1.eevaluate(ExternalResource.java:54)网址:org.junit.rules.RunRules.evaluate(RunRules.java:20)org.junit.runners.ParentRunner$3.评估(ParentRunner.java:306)org.junit.runners.ParentRunner.run(ParentRunner:413)org.junit.runner.JUnitCore.run(JUnitCores.java:137)com.intellij.junit4.JUnit4DeaTestRunner.startRunnerWithArgs(JUnit4DieTestRunner.java:69)在com.intellij.rt.junit.IideaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)在com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)网址:com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)引起原因:java.io.NotSerializableException:org.apache.flink.configuration.description.TextElement位于java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)位于java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)在org.apache.flink.util.NinstantiationUtil.serializeObject(InstantiationUtil.java:632)在org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143)…还有45个

那么,为什么会有问题,1.13.1&1.14.0,我们如何解决这个问题?

谢谢你,David Anderson。这应该是文件flink core/src/main/java.org/apache/flink/api/common/ExecutionConfig.java 的最新flink提交所引入的错误

在文件中,我们可以发现这里使用了TextElement,其中ClosureCleanerLevel用作Serializable ExecutionConfig的member。

ClosureCleanerLevel 中的TextElement

在Flink Siddhi中,ExecutionConfig被序列化,用于在每个任务管理器上将Flink数据序列化为Siddhi类型,所以这应该是原因。

验证问题的最简单方法是在flink 1.13.5和1.14.0中运行以下代码,异常在1.14.0中重现。1.13.5和1.14.0之间的差异仅为lates-commit。

@Test
public void testExecutionConfigSerializable() throws Exception {
ExecutionConfig config = new ExecutionConfig();
ClosureCleaner.clean(config, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
}

请注意,普通Java序列化仍然适用于ExecutionConfig,只是ClosureCleaner拒绝了它,因为它对w.r.t.可序列化性进行了非常严格的检查。

因此,潜在的问题可能是map函数的闭包过大。传递到方法中的SiddhiOperatorContext将成为映射函数闭包的一部分,因此您可以检查是否可以最小化该上下文的大小,使其不再依赖于ExecutionConfig。

相关内容

  • 没有找到相关文章

最新更新