Flink 1.12.x 数据集 --> Flink 1.14.x 数据流



我正在尝试从Flink 1.12.x数据集api迁移到Flink 1.14.x数据流api。mapPartition在Flink数据流中不可用。

我们的代码使用Flink 1.12.x数据集

dataset
.<few operations>
.mapPartition(new SomeMapParitionFn())
.<few more operations>
public static class SomeMapPartitionFn extends RichMapPartitionFunction<InputModel, OutputModel> {
@Override
public void mapPartition(Iterable<InputModel> records, Collector<OutputModel> out) throws Exception {
for (InputModel record : records) {
/*
do some operation    
*/
if (/* some condition based on processing *MULTIPLE* records */) {
out.collect(...); // Conditional collect                ---> (1)
}
}

// At the end of the data, collect
out.collect(...);   // Collect processed data                   ---> (2) 
}
}
  • (1(-Collector.collection在处理了几条记录后,根据某些条件调用

  • (2( -Collector.collect在数据结束时调用

    最初我们考虑使用flatMap而不是mapPartition,但collector在close函数中不可用。

    https://issues.apache.org/jira/browse/FLINK-14709-仅在链接驱动程序的情况下可用

如何在Flink 1.14.x数据流中实现这一点?请告知。。。

注意:我们的应用程序只能处理有限的数据集(批处理模式(

在Flink的数据集API中,MapPartitionFunction有两个参数。输入的迭代器和函数结果的收集器。Flink DataStream程序中的MapPartitionFunction永远不会从第一个函数调用中返回,因为迭代器会在无尽的记录流上迭代。然而,Flink的内部流处理模型要求用户函数返回检查点函数状态。因此,DataStream API不提供mapPartition转换。

为了实现类似的功能,您需要在流上定义一个窗口。Windows离散化流,这在某种程度上类似于小批量,但Windows提供了更大的灵活性

智鹏提供的解决方案

一种解决方案可以是使用streamOperator来实现BoundedOneInput界面示例代码可在此处找到[1]。

[1]https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75

Flink用户邮件链接:https://lists.apache.org/thread/ktck2y96d0q1odnjjkfks0dmrwh7kb3z

相关内容

  • 没有找到相关文章

最新更新