Flink DataStream有类似mapPartition的api吗



我想在stream.map()中使用一个不可序列化的对象,就像这个

stream.map { i =>
val obj = new SomeUnserializableClass()
obj.doSomething(i)
}

这是非常低效的,因为我创建了许多SomeUnserializableClass实例。实际上,它只能在每个工作者中创建一次。

在Spark中,我可以使用mapPartition来完成此操作。但在flink stream api中,我不知道。

如果您正在处理一个不可序列化的类,我建议您创建一个RichFunction。在您的情况下是RichMapFunction。

Flink中的Rich运算符有一个open方法,该方法在任务管理器中仅作为初始值设定项执行一次。

因此,诀窍是使您的字段成为瞬态的,并在您的开放方法中实例化它。

检查以下示例:

public class NonSerializableFieldMapFunction extends RichMapFunction {
transient SomeUnserializableClass someUnserializableClass;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.someUnserializableClass = new SomeUnserializableClass();
}
@Override
public Object map(Object o) throws Exception {
return someUnserializableClass.doSomething(o);
}
}

然后你的代码看起来像:

stream.map(new NonSerializableFieldMapFunction())

p.D:我使用的是java语法,请将其调整为scala。

相关内容

  • 没有找到相关文章

最新更新