Flink 数据集连接内部映射函数



所以我在数据流上运行一个映射函数,在映射函数中,我想连接 2 个单独的数据集。只是想知道这在 Flink 中是否可行。我知道 map 函数本身作为单独分区的单独任务运行,所以想知道 map 函数中是否允许分布式联接?

好的,所以事实证明你不能,因为连接数据集发生在与流处理(发生在 StreamExecutionContext 上(不同的上下文 (ExecutionContext( 上,并且 Flink 不允许彼此内部具有不同执行上下文的操作。

java.lang.IllegalArgumentException: The two inputs have different execution contexts.
    at org.apache.flink.api.java.DataSet.checkSameExecutionContext(DataSet.java:1799)
    at org.apache.flink.api.java.operators.TwoInputOperator.<init>(TwoInputOperator.java:42)
    at org.apache.flink.api.java.operators.TwoInputUdfOperator.<init>(TwoInputUdfOperator.java:80)
    at org.apache.flink.api.java.operators.CrossOperator.<init>(CrossOperator.java:90)
    at org.apache.flink.api.java.operators.CrossOperator$DefaultCross.<init>(CrossOperator.java:150)
    at org.apache.flink.api.java.DataSet.crossWithTiny(DataSet.java:1088)
    at org.myorg.quickstart.MessageStreamProcessor$MessageProcessor.processElement(MessageStreamProcessor.java:138)
    at org.myorg.quickstart.MessageStreamProcessor$MessageProcessor.processElement(MessageStreamProcessor.java:125)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)

如果要将 dataSet1 与 dataSet2 联接,并且 dataSet2 不大。您可以使用 withBroadcastSet 运算符在 dataSet1 的映射中广播 dataSet2。您可以使用 getRuntimeContext((.getBroadcastVariable 在映射函数中获取广播的 dataSet2。然后,您可以在map功能中自行进行连接。为了加快联接速度,可以在广播 dataSet2 之前将 dataSet2 中的数据传输到映射中。 例如:

Map<Integer, String> testMap = new HashMap<>();
dateSet2 = flinkEnv.fromElements(testMap); 
dateSet1.map(new TestRichMapper()).withBroadcastSet(dateSet2, "dateSet2");

在富域图函数中,您可以获取 dateSet2 并将其传输到地图中,如下所示:

Map<Integer, String> testMap = getRuntimeContext().getBroadcastVariable("dateSet2").toArray()[0];

相关内容

  • 没有找到相关文章

最新更新