是否有可能避免交叉转换?



先生们,

我正在使用 DataSet API 在 Apache Flink 中使用 Batch,我想计算 DataSet 中所有元素的"相似性"。

让函数 CalculateSimilarity(e1, e2) 计算并返回元素 e1 和 e2 的相似性。

将数据集与自身交叉工作正常,但是,我浪费了大量时间和不必要的微积分处理。我真的不需要计算所有元素的笛卡尔乘积,因为可以进行一些改进:

i) 不需要计算相同元素的相似性。例如 CalculateSimilarity(A,A)ii) CalculateSimilarity(A,B
) ⇔ CalculateSimilarity(B,A)。相似性(A,B)和(B,A)是等价的,我只需要计算其中之一。

使用 flink,我如何应用一个转换,我可以只计算必要的相似性,而不是所有相似性(交叉)?

如果我上面不清楚,这里有一个简单的例子:
Dt = 具有 4 个元素的数据集。
Dt = {e1, e2, e3 , e4}.无论我使用cross(Dt.cross(Dt)),它返回所有这些组合:((e1,e1),(e1,e2),(e1,e3),(e1,e4),(e2,e1),(e2,e2),(e2,e3),
(e2,e4),(e3,e1),...,(e4,e4))。
但是,我只需要这些组合:

(e1,e2),(e1,e3),(e1,e4),(e2,e3),(e2,e4),(e3,e4)。感谢您的帮助!

您可以做的是手动构造一个避免排列的连接模式。为此,您可以为每个元素分配一个递增的索引(0 到元素数 - 1),然后让每个元素仅与索引小于或等于其自身索引的元素联接:

val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements(1, 2, 3, 4, 5, 6).rebalance()
// we first assign an increasing index from 0 to input.size - 1 to each element
val indexedInput = input.zipWithIndex
// here we generate the join set where we say that (idx, element) will be joined with all
// elements whose index is at most idx
val joinSet = indexedInput.flatMap{
input => for (i <- 0 to input._1.toInt) yield (i.toLong, input._2)
}
// doing the join operation
val resultSet = indexedInput.join(joinSet).where(_._1).equalTo(_._1).apply{
(a, b) => (a._2, b._2)
}

您应该尝试哪些程序运行得更快,因为zipWithIndex将触发单独的作业执行。

相关内容

  • 没有找到相关文章

最新更新