基于数据帧中出现频率的Spark数据帧随机采样



输入描述
我有一个带有列queryId的输入数据帧的spark作业。此queryId相对于数据帧而言不是唯一的。例如,spark数据帧中大约有3M行,具有450k个不同的查询id。

问题
我正在尝试实现采样逻辑,并通过从聚合spark数据帧查询id集中查找查询id来创建一个新列sampledQueryId,该列包含每个数据帧行的随机采样查询id。

采样目标

  1. 限制是采样查询id不应等于输入查询id
  2. 采样应与传入火花数据帧中查询id的出现频率相对应,即给定两个查询id q1和q2,如果出现率为10:1(q1:q2(,则q1应在样本id列中出现大约10倍

到目前为止尝试的解决方案
我曾试图通过将查询id收集到列表中并使用随机采样查找查询id列表来实现这一点,但基于经验证据,我怀疑该逻辑没有按预期工作,例如,我看到一个特定的查询id被采样了200次,但频率相似的查询id从未被采样。

关于这个火花代码是否能按预期工作,有什么建议吗?

val random = new scala.util.Random
val queryIds = data.select($"queryId").map(row => row.getAs[Long](0)).collect()
val sampleQueryId = udf((queryId: Long) =>  {
val sampledId = queryIds(random.nextInt(queryIds.length))
if (sampledId != queryId) sampledId else null        
})
val dataWithSampledIds = data.withColumn("sampledQueryId",sampleQueryId($"queryId"))

为了子孙后代,在不同的论坛上收到了回应。问题是,一个随机实例正通过udf传递给所有执行器。因此,每个执行器的第n行将给出相同的输出。

scala> val random = new scala.util.Random
scala> val getRandom = udf((data: Long) => random.nextInt(10000))
scala> spark.range(0, 12, 1, 4).withColumn("rnd", getRandom($"id")).orderBy($"id").show
+---+----+
| id| rnd|
+---+----+
|  0|6720|
|  1|7667|
|  2|3344|
|  3|6720|
|  4|7667|
|  5|3344|
|  6|6720|
|  7|7667|
|  8|3344|
|  9|6720|
| 10|7667|
| 11|3344|
+---+----+

这个df有4个分区。每第n行的rrd值都是相同的(例如id=1、4、7、10都相同(。解决方案是在Spark中使用rand((内置函数,如下所示。

val queryIds = data.select($"queryId").map(row => row.getAs[Long](0)).collect()
val sampleQueryId = udf((companyId: Long, rand: Double) =>  {
val sampledId = queryIds(scala.math.floor(rand*queryIds.length).toInt)
if (sampledId != queryId) sampledId else null        
})
val dataWithSampledIds = data.withColumn("sampledQueryId",sampleQueryId($"queryId", rand()))

最新更新