如果我们有一个 model.approxSimilarityJoin 的数据帧输出
val results =
model
.approxSimilarityJoin(vectorizedDf, vectorizedDf, threshold)
.filter("distCol != 0")
.filter("distCol < 0.2")
.select(col("datasetA.title").alias("idA"), col("datasetB.title").alias("idB"), col("distCol"))
上述命令的输出
**idA|idB|distCol**
A|B|0.125
B|C|0.125
A|D|0.125
D|E|0.125
F|G|0.125
X|Y|0.19
A|M|0.14
A|N|0.14
我们希望对输出进行分组并计算相似的项目,即在上面的示例中我们有
A, B, C, D, E
F,G
X,Y
所需的最终输出应如下所示:
A, 0.125, 5
F, 0.19, 1
A, 0.14, 2
在
火花测试基础中有一个叫做近似数据帧相等性检查的东西
/**
* Compares if two [[DataFrame]]s are equal, checks that the schemas are the same.
* When comparing inexact fields uses tol.
*
* @param tol max acceptable tolerance, should be less than 1.
*/
def assertDataFrameApproximateEquals(
expected: DataFrame, result: DataFrame, tol: Double) {
assert(expected.schema, result.schema)
try {
expected.rdd.cache
result.rdd.cache
assert("Length not Equal", expected.rdd.count, result.rdd.count)
val expectedIndexValue = zipWithIndex(expected.rdd)
val resultIndexValue = zipWithIndex(result.rdd)
val unequalRDD = expectedIndexValue.join(resultIndexValue).
filter{case (idx, (r1, r2)) =>
!DataFrameSuiteBase.approxEquals(r1, r2, tol)}
assertEmpty(unequalRDD.take(maxUnequalRowsToShow))
} finally {
expected.rdd.unpersist()
result.rdd.unpersist()
}
}
根据您的需要设计算法。https://github.com/holdenk/spark-testing-base/wiki/DataFrameSuiteBase
我们试图找到类似自然连接的东西。 在上面的例子中,A 与 B 相关,B 与 C 相关,即 A 与 C 相关。另外我们有
A 与 D 相关,D 与 E 相关,即 A 与 E 相关最后我们应该得出结论,A 与 B、C、D 和 E 具有相同的相似性即 A 计数到 5