我正在使用 PySpark,这是我数据帧的一部分 -
cleanData.show(4, False)
+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+----+-----+-----+------+
|STN |WBAN |YEARMODA|TEMP|DEWP|SLP |STP |VISIB|WDSP|MXSPD|GUST |MAX |MIN |PRCP |SNDP |FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+----+-----+-----+------+
|010080|99999|20100101|23.2|19.0|9999.9|9999.9|7.0 |6.0 |15.9 |999.9|33.8*|14.0|0.00H|999.9|001000|
|010080|99999|20100102|20.5|16.4|9999.9|9999.9|6.2 |20.4|33.0 |40.0 |33.4*|8.6*|0.00G|5.1 |001000|
|010080|99999|20100103|6.9 |-3.7|9999.9|9999.9|7.2 |14.1|21.4 |999.9|9.7* |4.5*|0.04G|5.1 |001000|
|010080|99999|20100104|4.9 |-6.2|9999.9|9999.9|8.7 |13.1|19.4 |999.9|6.8* |3.2*|0.00G|999.9|001000|
+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+----+-----+-----+------+
only showing top 4 rows
数据框中的几列(如MAX
和MIN
列(在多个条目的末尾有一个*
。
我需要找出这两列中的最大值和最小值。由于我熟悉 SQL,因此我使用 Spark SQL 发出查询,但像MAX
和ORDER BY
这样的子句无法正常工作,例如 -
spark.sql("select MAX from weather2010uncleaned where not MAX='9999.9' order by MAX desc").show()
+-----+
| MAX|
+-----+
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
|99.9*|
+-----+
only showing top 20 rows
(注意 -9999.9
表示缺少数据(
我想这是因为所有列都是string
类型,所以我使用.cast()
将它们转换为float
类型(最后链接的 github-gist 中的代码(。
但不知何故,转换为浮动将所有条目替换为末尾的*
NULL
.
所以,我知道事实上MAX
列中的最大值大约是132.8
(末尾可能有一个*
(,但是当我运行此查询以获取最大值时,我只得到128.8
。
spark.sql("select STN, YEARMODA AS DATE, MAX from weather2010 where MAX=(select MAX(MAX) from weather2010 where not MAX='9999.9' and not max='99.99')").show()
# +------+--------+-----+
# | STN| DATE| MAX|
# +------+--------+-----+
# |703830|20100613|128.8|
# +------+--------+-----+
这可能是因为在浮动期间最大条目被替换为NULL
。
有没有办法我可以——
- 从数据帧本身的条目中删除所有
*
,然后再使用createOrReplaceTempView()
创建 SQL 视图,或者 - 使用 SQL 能够正确运行字符串类型的
MAX
、ORDER BY
等,同时还包括末尾带有*
的条目,因此不需要强制转换,或者 - 如果无法使用SQL执行此操作,请单独使用DataFrame API,尽管我对API不是很熟悉。
我不想在这里弄乱问题,所以这是我的要点,其中包含有关其中一些操作的更多代码片段 - 要点。
在 Spark 和常规 SQL 中,你有一个replace()
函数,所以这应该适合你:
spark.sql("select cast(replace(MAX,'*') as float) MAX_FLOAT from weather2010uncleaned where not MAX='9999.9' order by MAX_FLOAT desc").show()
从它构建,我们现在可以使用rank()
窗口来得出最终结果:
spark.sql("""
select STN, DATE, MAX
from (
select STN, YEARMODA AS DATE, MAX,
rank() over (order by cast(replace(MAX,'*') as float) desc) RNK
from weather2010uncleaned where not MAX='9999.9'
) T
where RNK = 1
""").show()