我正在尝试根据组(目的地ID和位置ID(列计算纬度列的中位数 斯卡拉火花 1.6
JSON 中的数据如下所示:
DESTINATION_ID,LOCATION_ID,LATITUDE
[ENSG00000257017,EAST_0000182,0.07092000000000001]
[ENSG00000257017,WEST_0001397,0.07092000000000001]
[ENSG00000181965,EAST_1001951,0.07056000000000001]
[ENSG00000146648,EAST_0000616,0.07092000000000001]
[ENSG00000111537,WEST_0001845,0.07092000000000001]
[ENSG00000103222,EAST_0000565,0.07056000000000001]
[ENSG00000118137,EAST_0000508,0.07092000000000001]
[ENSG00000112715,EAST_0000616,0.07092000000000001]
[ENSG00000108984,EAST_0000574,0.07056000000000001]
[ENSG00000159640,NORTH_797,0.07092000000000001]
[ENSG00000113522,NORTH_790,0.07056000000000001]
[ENSG00000133895,NORTH_562,0.07056000000000001]
法典
var ds = sqlContext.sql("""
SELECT DESTINATION_ID,LOCATION_ID, avg(LATITUDE) as median
FROM ( SELECT DESTINATION_ID,LOCATION_ID, LATITUDE, rN, (CASE WHEN cN % 2 = 0 then (cN DIV 2) ELSE (cN DIV 2) + 1 end) as m1, (cN DIV 2) + 1 as m2
FROM (
SELECT DESTINATION_ID,LOCATION_ID, LATITUDE, row_number() OVER (PARTITION BY DESTINATION_ID,LOCATION_ID ORDER BY LATITUDE ) as rN,
count(LATITUDE) OVER (PARTITION BY DESTINATION_ID,LOCATION_ID ) as cN
FROM people
) s
) r
WHERE rN BETWEEN m1 and m2
GROUP BY DESTINATION_ID,LOCATION_ID
""")
错误:
**Exception in thread "main" java.lang.RuntimeException: [3.98] failure: ``)''
expected but identifier DIV found**
如果我错过了什么,请帮助我。 或 技术人员请指导我有没有更好的方法来计算火花中的中位数
谢谢
我尝试使用您提供的测试输入执行上述查询,如下所示-
val data =
"""
|DESTINATION_ID,LOCATION_ID,LATITUDE
|ENSG00000257017,EAST_0000182,0.07092000000000001
|ENSG00000257017,WEST_0001397,0.07092000000000001
|ENSG00000181965,EAST_1001951,0.07056000000000001
|ENSG00000146648,EAST_0000616,0.07092000000000001
|ENSG00000111537,WEST_0001845,0.07092000000000001
|ENSG00000103222,EAST_0000565,0.07056000000000001
|ENSG00000118137,EAST_0000508,0.07092000000000001
|ENSG00000112715,EAST_0000616,0.07092000000000001
|ENSG00000108984,EAST_0000574,0.07056000000000001
|ENSG00000159640,NORTH_797,0.07092000000000001
|ENSG00000113522,NORTH_790,0.07056000000000001
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\,").map(_.replaceAll("""^[ t]+|[ t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +---------------+------------+-------------------+
* |DESTINATION_ID |LOCATION_ID |LATITUDE |
* +---------------+------------+-------------------+
* |ENSG00000257017|EAST_0000182|0.07092000000000001|
* |ENSG00000257017|WEST_0001397|0.07092000000000001|
* |ENSG00000181965|EAST_1001951|0.07056000000000001|
* |ENSG00000146648|EAST_0000616|0.07092000000000001|
* |ENSG00000111537|WEST_0001845|0.07092000000000001|
* |ENSG00000103222|EAST_0000565|0.07056000000000001|
* |ENSG00000118137|EAST_0000508|0.07092000000000001|
* |ENSG00000112715|EAST_0000616|0.07092000000000001|
* |ENSG00000108984|EAST_0000574|0.07056000000000001|
* |ENSG00000159640|NORTH_797 |0.07092000000000001|
* |ENSG00000113522|NORTH_790 |0.07056000000000001|
* +---------------+------------+-------------------+
*
* root
* |-- DESTINATION_ID: string (nullable = true)
* |-- LOCATION_ID: string (nullable = true)
* |-- LATITUDE: double (nullable = true)
*/
df.createOrReplaceTempView("people")
spark.sql(
"""
|SELECT
| DESTINATION_ID,
| LOCATION_ID,
| avg(LATITUDE) as median
|FROM
| (
| SELECT
| DESTINATION_ID,
| LOCATION_ID,
| LATITUDE,
| rN,
| (
| CASE WHEN cN % 2 = 0 then (cN / 2) ELSE (cN / 2) + 1 end
| ) as m1,
| (cN / 2) + 1 as m2
| FROM
| (
| SELECT
| DESTINATION_ID,
| LOCATION_ID,
| LATITUDE,
| row_number() OVER (
| PARTITION BY DESTINATION_ID,
| LOCATION_ID
| ORDER BY
| LATITUDE
| ) as rN,
| count(LATITUDE) OVER (PARTITION BY DESTINATION_ID, LOCATION_ID) as cN
| FROM
| people
| ) s
| ) r
|WHERE
| rN BETWEEN m1
| and m2
|GROUP BY
| DESTINATION_ID,
| LOCATION_ID
""".stripMargin)
.show(false)
/**
* +--------------+-----------+------+
* |DESTINATION_ID|LOCATION_ID|median|
* +--------------+-----------+------+
* +--------------+-----------+------+
*/
您需要检查查询或输入,它不提供任何输出
检查以下查询是否有帮助 -
spark.sql(
"""
|SELECT *
|FROM people k NATURAL JOIN
|(SELECT
| DESTINATION_ID,
| LOCATION_ID,
| avg(LATITUDE) as median
|FROM
| (
| SELECT
| DESTINATION_ID,
| LOCATION_ID,
| LATITUDE,
| rN,
| (
| CASE WHEN cN % 2 = 0 then (cN / 2) ELSE (cN / 2) - 1 end
| ) as m1,
| (cN / 2) + 1 as m2
| FROM
| (
| SELECT
| DESTINATION_ID,
| LOCATION_ID,
| LATITUDE,
| row_number() OVER (
| PARTITION BY DESTINATION_ID,
| LOCATION_ID
| ORDER BY
| LATITUDE
| ) as rN,
| count(LATITUDE) OVER (PARTITION BY DESTINATION_ID, LOCATION_ID) as cN
| FROM
| people
| ) s
| ) r
|WHERE
| rN BETWEEN m1
| and m2
|GROUP BY
| DESTINATION_ID,
| LOCATION_ID
| ) t
""".stripMargin)
.show(false)
/**
* +---------------+------------+-------------------+-------------------+
* |DESTINATION_ID |LOCATION_ID |LATITUDE |median |
* +---------------+------------+-------------------+-------------------+
* |ENSG00000111537|WEST_0001845|0.07092000000000001|0.07092000000000001|
* |ENSG00000257017|WEST_0001397|0.07092000000000001|0.07092000000000001|
* |ENSG00000103222|EAST_0000565|0.07056000000000001|0.07056000000000001|
* |ENSG00000108984|EAST_0000574|0.07056000000000001|0.07056000000000001|
* |ENSG00000112715|EAST_0000616|0.07092000000000001|0.07092000000000001|
* |ENSG00000113522|NORTH_790 |0.07056000000000001|0.07056000000000001|
* |ENSG00000118137|EAST_0000508|0.07092000000000001|0.07092000000000001|
* |ENSG00000146648|EAST_0000616|0.07092000000000001|0.07092000000000001|
* |ENSG00000159640|NORTH_797 |0.07092000000000001|0.07092000000000001|
* |ENSG00000181965|EAST_1001951|0.07056000000000001|0.07056000000000001|
* |ENSG00000257017|EAST_0000182|0.07092000000000001|0.07092000000000001|
* +---------------+------------+-------------------+-------------------+
*/