PySpark DataFrame SQL - 某些条目中具有特殊字符的列的最大值



我正在使用 PySpark,这是我数据帧的一部分 -

cleanData.show(4, False)
+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+----+-----+-----+------+
|STN   |WBAN |YEARMODA|TEMP|DEWP|SLP   |STP   |VISIB|WDSP|MXSPD|GUST |MAX  |MIN |PRCP |SNDP |FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+----+-----+-----+------+
|010080|99999|20100101|23.2|19.0|9999.9|9999.9|7.0  |6.0 |15.9 |999.9|33.8*|14.0|0.00H|999.9|001000|
|010080|99999|20100102|20.5|16.4|9999.9|9999.9|6.2  |20.4|33.0 |40.0 |33.4*|8.6*|0.00G|5.1  |001000|
|010080|99999|20100103|6.9 |-3.7|9999.9|9999.9|7.2  |14.1|21.4 |999.9|9.7* |4.5*|0.04G|5.1  |001000|
|010080|99999|20100104|4.9 |-6.2|9999.9|9999.9|8.7  |13.1|19.4 |999.9|6.8* |3.2*|0.00G|999.9|001000|
+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+----+-----+-----+------+
only showing top 4 rows

数据框中的几列(如MAXMIN列(在多个条目的末尾有一个*

我需要找出这两列中的最大值和最小值。由于我熟悉 SQL,因此我使用 Spark SQL 发出查询,但像MAXORDER BY这样的子句无法正常工作,例如 -

spark.sql("select MAX from weather2010uncleaned where not MAX='9999.9' order by MAX desc").show()
+-----+
|  MAX|
+-----+
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
+-----+
only showing top 20 rows

(注意 -9999.9表示缺少数据(

我想这是因为所有列都是string类型,所以我使用.cast()将它们转换为float类型(最后链接的 github-gist 中的代码(。

但不知何故,转换为浮动将所有条目替换为末尾的*NULL.

所以,我知道事实上MAX列中的最大值大约是132.8(末尾可能有一个*(,但是当我运行此查询以获取最大值时,我只得到128.8

spark.sql("select STN, YEARMODA AS DATE, MAX from weather2010 where MAX=(select MAX(MAX) from weather2010 where not MAX='9999.9' and not max='99.99')").show()
# +------+--------+-----+
# |   STN|    DATE|  MAX|
# +------+--------+-----+
# |703830|20100613|128.8|
# +------+--------+-----+

这可能是因为在浮动期间最大条目被替换为NULL

有没有办法我可以——

  1. 从数据帧本身的条目中删除所有*,然后再使用createOrReplaceTempView()创建 SQL 视图,或者
  2. 使用 SQL 能够正确运行字符串类型的MAXORDER BY等,同时还包括末尾带有*的条目,因此不需要强制转换,或者
  3. 如果无法使用SQL执行此操作,请单独使用DataFrame API,尽管我对API不是很熟悉。

我不想在这里弄乱问题,所以这是我的要点,其中包含有关其中一些操作的更多代码片段 - 要点。

在 Spark 和常规 SQL 中,你有一个replace()函数,所以这应该适合你:

spark.sql("select cast(replace(MAX,'*') as float) MAX_FLOAT from weather2010uncleaned where not MAX='9999.9' order by MAX_FLOAT desc").show()

从它构建,我们现在可以使用rank()窗口来得出最终结果:

spark.sql("""
select STN, DATE, MAX 
from (
select STN, YEARMODA AS DATE, MAX, 
rank() over (order by cast(replace(MAX,'*') as float) desc) RNK
from weather2010uncleaned where not MAX='9999.9'
) T 
where RNK = 1
""").show()

相关内容

最新更新