我正在尝试从Flink 1.12.x数据集api迁移到Flink 1.14.x数据流api。mapPartition在Flink数据流中不可用。
我们的代码使用Flink 1.12.x数据集
dataset
.<few operations>
.mapPartition(new SomeMapParitionFn())
.<few more operations>
public static class SomeMapPartitionFn extends RichMapPartitionFunction<InputModel, OutputModel> {
@Override
public void mapPartition(Iterable<InputModel> records, Collector<OutputModel> out) throws Exception {
for (InputModel record : records) {
/*
do some operation
*/
if (/* some condition based on processing *MULTIPLE* records */) {
out.collect(...); // Conditional collect ---> (1)
}
}
// At the end of the data, collect
out.collect(...); // Collect processed data ---> (2)
}
}
(1(-Collector.collection在处理了几条记录后,根据某些条件调用
(2( -Collector.collect在数据结束时调用
最初我们考虑使用flatMap而不是mapPartition,但collector在close函数中不可用。
https://issues.apache.org/jira/browse/FLINK-14709-仅在链接驱动程序的情况下可用
如何在Flink 1.14.x数据流中实现这一点?请告知。。。
注意:我们的应用程序只能处理有限的数据集(批处理模式(
在Flink的数据集API中,MapPartitionFunction有两个参数。输入的迭代器和函数结果的收集器。Flink DataStream程序中的MapPartitionFunction永远不会从第一个函数调用中返回,因为迭代器会在无尽的记录流上迭代。然而,Flink的内部流处理模型要求用户函数返回检查点函数状态。因此,DataStream API不提供mapPartition转换。
为了实现类似的功能,您需要在流上定义一个窗口。Windows离散化流,这在某种程度上类似于小批量,但Windows提供了更大的灵活性
智鹏提供的解决方案
一种解决方案可以是使用streamOperator来实现BoundedOneInput
界面示例代码可在此处找到[1]。
[1]https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75
Flink用户邮件链接:https://lists.apache.org/thread/ktck2y96d0q1odnjjkfks0dmrwh7kb3z