我正在一个简单的数据集上测试NTILE函数,如下所示:
(id:string,value:bdouble)
A 10
B3
C4
D 4
E 4
F 30
C 30
D 10
A 4
H4
对HIVE(在MapReduce上)运行以下查询
SELECT tmp.id, tmp.sum_val, NTILE(4) OVER (ORDER BY tmp.sum_val) AS quartile FROM (SELECT id, sum(value) AS sum_val FROM testntile GROUP BY id) AS tmp
工作良好,结果如下:
(id,sum_val,quartilt)
B3 1
H41
E 4 2
D 14 2
A 14 3
F 30 3
C 34 4
在Spark(v1.5)上对Hive运行相同的查询仍然可以正常工作。
对Spark SQL 1.5(CDH 5.5.1)运行相同的查询
val result = sqlContext.sql("SELECT tmp.id, tmp.sum_val, NTILE(4) OVER (ORDER BY tmp.sum_val) AS quartile FROM (SELECT id, sum(value) AS sum_val FROM testntile GROUP BY id) AS tmp")
result.collect().foreach(println)
我得到以下错误的结果:
[B,3.0,0]
[E,4.0,0]
[H,4.0,0]
[A,14.0,0]
[D,14.0,0]
[F,30.0,0]
[C,34.0,0]
重要提示:结果不具有确定性,因为"有时"会返回正确的值
直接在数据帧上运行相同的算法
val x = sqlContext.sql("select id, sum(value) as sum_val from testntile group by id")
val w = Window.partitionBy("id").orderBy("sum_val")
val resultDF = x.select( x("id"),x("sum_val"), ntile(4).over(w) )
仍然返回错误的结果。
我做错什么了吗?有什么想法吗?提前感谢您的回答。
如果使用Window.partitionBy("id").orderBy("sum_val")
,则按id
进行分组,然后应用ntile
函数。因此,通过这种方式,每个组都有一个元素,并且ntile
对每个id
应用相同的值。
为了获得第一个结果,您需要删除partitionBy("id")
并仅使用Window.orderBy("sum_val")
。
这就是我如何修改你的代码:
val w = Window.orderBy("sum_val")
val resultDF = x.orderBy("sum_val").select( x("id"),x("sum_val"), ntile(4).over(w) )
这是resultDF.show()
:的打印
+---+-------+-----+
| id|sum_val|ntile|
+---+-------+-----+
| B| 3| 1|
| E| 4| 1|
| H| 4| 2|
| D| 14| 2|
| A| 14| 3|
| F| 30| 3|
| C| 34| 4|
+---+-------+-----+