MapFunction的实现是不可序列化的Flink



我正在尝试实现一个类,该类允许用户在不限制输入流类型的情况下操作N个输入流。

对于初学者,我想将所有的输入数据流转换为keyedStreams。因此,我将输入dataStream映射到一个元组中,然后应用KeyBy将其转换为keystream。

我总是遇到序列化的问题,我试着遵循这个指南https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html但没用。

我想知道的是:

  1. Java中的序列化/反序列化是什么?以及它的用途
  2. 在Flink with Serialization中,我可以解决哪些问题
  3. 我的代码有什么问题(你可以在下面的代码和错误消息中找到(

非常感谢。

主要类别:

public class CEP {
private  Integer streamsIdComp = 0;
final  private Map<Integer, DataStream<?> > dataStreams = new HashMap<>();
final  private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();
public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){
Preconditions.checkNotNull(inputStream, "dataStream");
TypeInformation<T> streamType = inputStream.getType();
KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
map(new MapFunction<T, Tuple2<Integer,T>>() {
@Override
public Tuple2<Integer, T> map(T value) throws Exception {
return Tuple2.of(streamsIdComp, value);
}
}).
keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
@Override
public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
return integerTTuple2.f0;
}
});
return keyedInputStream;
}
public <T1> void addInputStream(DataStream<T1> inputStream) {
TypeInformation<T1> streamType = inputStream.getType();
dataStreamsTypes.put(streamsIdComp, streamType);
dataStreams.put(streamsIdComp, this.converttoKeyedStream(inputStream));
streamsIdComp++;
}
}

测试类

public class CEPTest {
@Test
public void addInputStreamTest() throws Exception {
//test if we can change keys in a keyedStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Record> input1 = env.fromElements(
new Record("1", 1, "a"),
new Record("2", 2, "b"),
new Record("3", 3, "c"))
.keyBy(Record::getBizName);
DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4);
CEP cepObject = new CEP();
cepObject.addInputStream(input1);
cepObject.addInputStream(input2);
}
}

错误消息

org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction 
is not serializable. The implementation accesses fields of its enclosing class, which is a 
common reason for non-serializability. A common solution is to make the function a proper 
(non-inner) class, or a static inner class.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)
at CEP.converttoKeyedStream(CEP.java:25)
at CEP.addInputStream(CEP.java:45)
at CEPTest.addInputStreamTest(CEPTest.java:33)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.io.NotSerializableException: CEP
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 29 more
Flink是一个分布式框架。这意味着,您的程序可能会在数千个节点上运行。这也意味着每个工作节点都必须接收要与所需上下文一起执行的代码。简化一点,流经系统的事件和要执行的函数都必须是可序列化的,因为它们是通过连线传输的。这就是为什么序列化在分布式编程中很重要的原因。

简而言之,序列化是将数据编码为字节表示的过程,该字节表示可以在另一个节点(另一个JVM(上传输和恢复。


回到问题上来。以下是您的原因:

Caused by: java.io.NotSerializableException: CEP

这是由线路引起的

return Tuple2.of(streamsIdComp, value);

您使用的是streamsIdComp变量,它是CEP类中的一个字段。这意味着,Flink必须序列化整个类,才能在执行MapFunction时访问该字段。您可以通过在converttoKeyedStream函数中引入局部变量来克服它:

public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){
Preconditions.checkNotNull(inputStream, "dataStream");
TypeInformation<T> streamType = inputStream.getType();
// note this variable is local
int localStreamsIdComp = streamsIdComp;
KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
map(new MapFunction<T, Tuple2<Integer,T>>() {
@Override
public Tuple2<Integer, T> map(T value) throws Exception {
// and is used here
return Tuple2.of(localStreamsIdComp, value);
}
}).
keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
@Override
public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
return integerTTuple2.f0;
}
});
return keyedInputStream;
}

通过这种方式,Flink必须只序列化这一个变量,而不是整个类本身。

相关内容

  • 没有找到相关文章

最新更新