ApacheFlink中数据集的联合



我正在尝试将Seq[DataSet(Long,Long,Double)]合并到Flink中单个DataSet[(Long,Long,Double)]

     val neighbors= graph.map(el => zKnn.neighbors(results,
      el.vector, 150, metric)).reduce(
     (a, b) => a.union(b)
      ).collect()

其中graph是一个常规的scala集合,但可以转换为DataSet;结果是一个DataSet[Vector],不应该收集,需要在邻居法

我总是得到FlinkRuntime exception:

目前不能处理多于64个输出的节点。当前不能处理输出超过64个的节点。org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection (OptimizerNode.java: 347)在org.apache.flink.optimizer.dag.SingleInputNode.setInput (SingleInputNode.java: 202

Flink目前不支持超过64个输入数据集的联合运算符。

作为一种解决方法,您可以分层地联合多达64个数据集,并在层次结构的各个级别之间注入身份映射器。比如:

DataSet level1a = data1.union(data2.union(data3...(data64))).map(new IDMapper());
DataSet level1b = data65.union(data66...(data128))).map(new IDMapper());
DataSet level2 = level1a.union(level1b)

相关内容

  • 没有找到相关文章

最新更新