spark数据框过滤功能不工作



我是spark的新手,我们有一个从hbase读取数据并将其保存到rdd的项目。数据帧数为5280000,代码如下:

val df = spark.createDataFrame(rddDump, schema)
def sampledOrNot = udf((count: Int) => {
if(count < TEN_K_SELLER_ITEM_BENCH){
1
}else{
val randomId = random.nextLong(0, 1000000000000L)
var targetValue = 10000/count.toDouble
var base = 1
while (targetValue < 1){
targetValue = targetValue * base
base = base * 10
}
if(randomId % base <= (targetValue.intValue() + 1)) 1 else 0
}
})
val sampleBasedAll = df.withColumn("sampled", sampledOrNot(col("count")))
sampleBasedAll.repartition(10).write.option("header", value = true).option("compression", "gzip").csv("/sampleBasedAll")
val sampledDF = sampleBasedAll.repartition(100).filter("sampled = 1").select($"sellerId", $"siteId", $"count", $"desc")
scribe.info("sampledDF.count = " + sampledDF.count())

奇怪的是文件夹sampleBasedAll保存了有效的csv数据帧结果,但sampledDF.count作为prod日志显示为零。

我从sampleBasedAll文件夹下载csvs,然后重新运行

sampleBasedAll.repartition(100).filter("sampled = 1").select($"sellerId", $"siteId", $"count", $"desc").count()

有13500条记录显示…

我的问题是为什么

sampleBasedAll.filter("sampled = 1")

在本地运行时有记录,但是prod运行没有生成任何记录…

这篇文章对随机整数进行连接操作的UDF的意外行为给了我提示

Spark假设UDF是一个确定性函数

udf可以多次执行,通过在

下面添加.asNondeterministic()来更新样例udf
def sampledOrNot = udf((count: Int) => {
if(count < TEN_K_SELLER_ITEM_BENCH){
1
}else{
val randomId = random.nextLong(0, 1000000000000L)
var targetValue = 10000/count.toDouble
var base = 1
while (targetValue < 1){
targetValue = targetValue * base
base = base * 10
}
if(randomId % base <= 10000/count.toDouble * base) 1 else 0
}.asNondeterministic()
})

解决不一致问题

最新更新