我想在stream.map()
中使用一个不可序列化的对象,就像这个
stream.map { i =>
val obj = new SomeUnserializableClass()
obj.doSomething(i)
}
这是非常低效的,因为我创建了许多SomeUnserializableClass
实例。实际上,它只能在每个工作者中创建一次。
在Spark中,我可以使用mapPartition
来完成此操作。但在flink stream api中,我不知道。
如果您正在处理一个不可序列化的类,我建议您创建一个RichFunction。在您的情况下是RichMapFunction。
Flink中的Rich运算符有一个open
方法,该方法在任务管理器中仅作为初始值设定项执行一次。
因此,诀窍是使您的字段成为瞬态的,并在您的开放方法中实例化它。
检查以下示例:
public class NonSerializableFieldMapFunction extends RichMapFunction {
transient SomeUnserializableClass someUnserializableClass;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.someUnserializableClass = new SomeUnserializableClass();
}
@Override
public Object map(Object o) throws Exception {
return someUnserializableClass.doSomething(o);
}
}
然后你的代码看起来像:
stream.map(new NonSerializableFieldMapFunction())
p.D:我使用的是java语法,请将其调整为scala。