我正在尝试将Seq[DataSet(Long,Long,Double)]
合并到Flink中单个DataSet[(Long,Long,Double)]
val neighbors= graph.map(el => zKnn.neighbors(results,
el.vector, 150, metric)).reduce(
(a, b) => a.union(b)
).collect()
其中graph是一个常规的scala集合,但可以转换为DataSet;结果是一个DataSet[Vector]
,不应该收集,需要在邻居法
我总是得到FlinkRuntime exception:
目前不能处理多于64个输出的节点。当前不能处理输出超过64个的节点。org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection (OptimizerNode.java: 347)在org.apache.flink.optimizer.dag.SingleInputNode.setInput (SingleInputNode.java: 202
Flink目前不支持超过64个输入数据集的联合运算符。
作为一种解决方法,您可以分层地联合多达64个数据集,并在层次结构的各个级别之间注入身份映射器。比如:
DataSet level1a = data1.union(data2.union(data3...(data64))).map(new IDMapper());
DataSet level1b = data65.union(data66...(data128))).map(new IDMapper());
DataSet level2 = level1a.union(level1b)