所以我在数据流上运行一个映射函数,在映射函数中,我想连接 2 个单独的数据集。只是想知道这在 Flink 中是否可行。我知道 map 函数本身作为单独分区的单独任务运行,所以想知道 map 函数中是否允许分布式联接?
好的,所以事实证明你不能,因为连接数据集发生在与流处理(发生在 StreamExecutionContext 上(不同的上下文 (ExecutionContext( 上,并且 Flink 不允许彼此内部具有不同执行上下文的操作。
java.lang.IllegalArgumentException: The two inputs have different execution contexts.
at org.apache.flink.api.java.DataSet.checkSameExecutionContext(DataSet.java:1799)
at org.apache.flink.api.java.operators.TwoInputOperator.<init>(TwoInputOperator.java:42)
at org.apache.flink.api.java.operators.TwoInputUdfOperator.<init>(TwoInputUdfOperator.java:80)
at org.apache.flink.api.java.operators.CrossOperator.<init>(CrossOperator.java:90)
at org.apache.flink.api.java.operators.CrossOperator$DefaultCross.<init>(CrossOperator.java:150)
at org.apache.flink.api.java.DataSet.crossWithTiny(DataSet.java:1088)
at org.myorg.quickstart.MessageStreamProcessor$MessageProcessor.processElement(MessageStreamProcessor.java:138)
at org.myorg.quickstart.MessageStreamProcessor$MessageProcessor.processElement(MessageStreamProcessor.java:125)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
如果要将 dataSet1 与 dataSet2 联接,并且 dataSet2 不大。您可以使用 withBroadcastSet 运算符在 dataSet1 的映射中广播 dataSet2。您可以使用 getRuntimeContext((.getBroadcastVariable 在映射函数中获取广播的 dataSet2。然后,您可以在map功能中自行进行连接。为了加快联接速度,可以在广播 dataSet2 之前将 dataSet2 中的数据传输到映射中。 例如:
Map<Integer, String> testMap = new HashMap<>();
dateSet2 = flinkEnv.fromElements(testMap);
dateSet1.map(new TestRichMapper()).withBroadcastSet(dateSet2, "dateSet2");
在富域图函数中,您可以获取 dateSet2 并将其传输到地图中,如下所示:
Map<Integer, String> testMap = getRuntimeContext().getBroadcastVariable("dateSet2").toArray()[0];