在单个表中,我需要找到那些某个值最大相差给定量的对。例如,给定下表和最大差0.5
:
val
---
1
1.2
1.3
4
4.5
6
期望的结果将是:
val1 | val2
-----+-----
1 | 1.2
1 | 1.3
1.2 | 1.3
4 | 4.5
主要问题是我的桌子很大,在合理的时间内不可能有交叉乘积。 即这不起作用:
SELECT t1.val, t2.val
FROM table t1, table t2
WHERE abs(t1.val - t2.val) <= 0.5
有没有办法做到这一点?我阅读了窗口函数,所以至少我知道可以计算每个条目的值与前一个条目的值差异,获得上面的示例:
val | diff
----+-----
1 | 0
1.2 | 0.2
1.3 | 0.1
4 | 2.7
4.5 | 0.5
6 | 1.5
从这里开始,我需要找到diff
之和不超过给定最大值的范围。这可能吗?有没有更合理的方法?
我正在使用火花。
谢谢。
编辑:如前所述,我的查询还将包括对称对以及值相等的 es 对。很抱歉模棱两可。
然而,这不是重点。我的问题是连接。数据集对于笛卡尔乘积来说太大了。我正在寻找一种避免使用的解决方案。
此外,我正在处理的数据集的大小是 1000000 个元组。我不是 确定预期的执行时间,但有人建议必须有一个解决方案,避免在数据上使用笛卡尔积。
谢谢。
你尝试的很接近。只需进行一些修改:
select t1.val,t2.val
from tbl t1
join tbl t2 on t2.val-t1.val<=0.5 and t1.val<t2.val
您可以生成基于时间的虚拟窗口:
import org.apache.spark.sql.functions._
import spark.implicits._ // Where spark is an instance of SparkSession
val df = Seq(1.0, 1.2, 1.3, 4.0, 4.5, 6).toDF("val")
val w = window(
$"val".cast("timestamp"), "1000 milliseconds", "500 milliseconds"
).cast("struct<start:double,start:double>").alias("window")
val windowed = df.select(w, $"val")
联接、过滤和删除重复项:
val result = windowed.alias("left")
.join(windowed.alias("right"), "window")
.where(abs($"left.val" - $"right.val") <= 0.5 && $"left.val" < $"right.val")
.drop("window").distinct
结果:
result.show
// +---+---+
// |val|val|
// +---+---+
// |1.0|1.2|
// |1.2|1.3|
// |4.0|4.5|
// |1.0|1.3|
// +---+---+
有人建议我做的一件事是添加一个bucket
列,以便每个可能匹配的元组必须位于同一存储桶或相邻存储桶中。因此,我可以基于存储桶将表与自身连接(等联接),并从条件确实成立的结果中提取元组。我不确定这是否是一个好的解决方案,我还没有能够验证它。
/* max difference cannot span more than 2 buckets */
spark.sql("set max_diff=0.001")
var dec_count = 3
var bucket_size = scala.math.pow(10,-1 * dec_count)
var songs_buckets = songs.orderBy(col("artist_familiarity")).withColumn("bucket", round(col("artist_familiarity"), dec_count))
/*
tuples in adjacent buckets can have very close `artist_familiarity`.
add id to avoid duplicate pairs or tuples paired with themselves.
*/
songs_buckets = songs_buckets.withColumn("bucket2", $"bucket" - bucket_size).withColumn("id", monotonically_increasing_id())
songs_buckets.createOrReplaceTempView("songs_buckets")
var tmp = sql("SELECT s1.title as t1, s2.title as t2, s1.artist_familiarity as f1, s2.artist_familiarity as f2, s1.id as id1, s2.id as id2 FROM songs_buckets s1 JOIN songs_buckets s2 ON s1.bucket = s2.bucket OR s1.bucket = s2.bucket2")
tmp.createOrReplaceTempView("tmp")
var result = sql("SELECT t1, t2 FROM tmp WHERE id1 < id2 and f2 - f1 <= ${max_diff}")
result.show()
我没有费心将变量名称更改回问题中的示例。它在大约 12 秒后显示结果的前 20 行。不确定这是否与延迟加载有关,因为它不会显示结果的count
,但这是我能做的最好的事情。