我目前正在尝试在Apache Spark和Apache Flink中实现一些算法。在执行算法时,我必须做一些集差/减法运算。
虽然Apache Spark有一个内置的subtract
操作,但我在Apache Flink(1.0.3和1.1.0-SNAPSHOT)中找不到类似的操作
因此,我的问题是,给定两个数据集对象d1, d2
都包含相同类型的T
,应用集差的最有效方法是什么,即d1d2
?
val d1: DataSet[T] = ...
val d2: DataSet[T] = ...
val d_diff: DataSet[T] = ???
可能有办法通过coGroup
val d_diff = d1.coGroup(d2).where(0).equalTo(0) {
(l, r, out: Collector[T]) => {
val rightElements = r.toSet
for (el <- l)
if (!rightElements.contains(el)) out.collect(el)
}
}
但我想知道这是否是正确的方法,甚至是最佳实践,或者有人知道更有效的方法吗?
数据集API不为其提供方法,因为它只包含非常基本的一组操作。1.1中的表API将有一个set-minus运算符。您可以在这里看到它是如何实现的。
leftDataSet
.coGroup(rightDataSet)
.where("*")
.equalTo("*")
.`with`(coGroupFunction)
使用此CoGroupFunction。所以,是的,你走在了正确的轨道上。