NTILE函数在Spark SQL 1.5中不起作用



我正在一个简单的数据集上测试NTILE函数,如下所示:

(id:string,value:bdouble)
A 10
B3
C4
D 4
E 4
F 30
C 30
D 10
A 4
H4

对HIVE(在MapReduce上)运行以下查询

SELECT tmp.id, tmp.sum_val, NTILE(4) OVER (ORDER BY tmp.sum_val) AS quartile FROM (SELECT id, sum(value) AS sum_val FROM testntile GROUP BY id) AS tmp

工作良好,结果如下:

(id,sum_val,quartilt)
B3 1
H41
E 4 2
D 14 2
A 14 3
F 30 3
C 34 4

在Spark(v1.5)上对Hive运行相同的查询仍然可以正常工作。

对Spark SQL 1.5(CDH 5.5.1)运行相同的查询

val result = sqlContext.sql("SELECT tmp.id, tmp.sum_val, NTILE(4) OVER (ORDER BY tmp.sum_val) AS quartile FROM (SELECT id, sum(value) AS sum_val FROM testntile GROUP BY id) AS tmp")
result.collect().foreach(println)

我得到以下错误的结果:

[B,3.0,0]
[E,4.0,0]
[H,4.0,0]
[A,14.0,0]
[D,14.0,0]
[F,30.0,0]
[C,34.0,0]

重要提示:结果不具有确定性,因为"有时"会返回正确的值

直接在数据帧上运行相同的算法

val x = sqlContext.sql("select id, sum(value) as sum_val from testntile group by id")
val w = Window.partitionBy("id").orderBy("sum_val")
val resultDF = x.select( x("id"),x("sum_val"), ntile(4).over(w) )

仍然返回错误的结果。

我做错什么了吗?有什么想法吗?提前感谢您的回答。

如果使用Window.partitionBy("id").orderBy("sum_val"),则按id进行分组,然后应用ntile函数。因此,通过这种方式,每个组都有一个元素,并且ntile对每个id应用相同的值。

为了获得第一个结果,您需要删除partitionBy("id")并仅使用Window.orderBy("sum_val")

这就是我如何修改你的代码:

val w = Window.orderBy("sum_val")
val resultDF = x.orderBy("sum_val").select( x("id"),x("sum_val"), ntile(4).over(w) )

这是resultDF.show():的打印

+---+-------+-----+ | id|sum_val|ntile| +---+-------+-----+ | B| 3| 1| | E| 4| 1| | H| 4| 2| | D| 14| 2| | A| 14| 3| | F| 30| 3| | C| 34| 4| +---+-------+-----+

相关内容

  • 没有找到相关文章

最新更新