我在对hdfs配置单元上下文中的数据帧进行排序时遇到问题。试图对类似结构的数据帧进行排序:
+---+--------------+---------------+
| id|parameter_name|parameter_value
+---+--------------+---------------+
|id1| name_en | value a
|id1| name_il | value b
|id1| address_en| value c
|id1| address_il| value d
|id2| name_il | value f
|id2| name_en | value e
|id2| address_il| value h
|id1| address_en| value g
+---+--------------+---------------+
我试图以对id进行排序的方式对该数据帧进行排序,并且每个id的df中的parameter_name序列如下:
name_en
name_il
address_en
address_il
请注意,在本例中,情况并非如此,id之间的名称和地址是翻转的。
尝试使用df.sort(["id","parameter_name"](会产生混合结果,进一步混合数据帧并将id拆分为:
id1, name_en
id1, name_il
id2, name_il
id2, name_en
id1, address_en
id1, address_il
id2, address_il
id2, address_en
我创建了您的数据帧,但为parameter_value
分配了随机值,因此顺序不再相关。
from random import random
data = [
{"id": "id1", "parameter_name": "name_en", "parameter_value": random()},
{"id": "id1", "parameter_name": "name_il", "parameter_value": random()},
{"id": "id1", "parameter_name": "address_en", "parameter_value": random()},
{"id": "id1", "parameter_name": "address_il", "parameter_value": random()},
{"id": "id2", "parameter_name": "name_il", "parameter_value": random()},
{"id": "id2", "parameter_name": "name_en", "parameter_value": random()},
{"id": "id2", "parameter_name": "address_il", "parameter_value": random()},
{"id": "id2", "parameter_name": "address_en", "parameter_value": random()},
]
df = spark.createDataFrame(data)
df.show()
+---+--------------+-------------------+
| id|parameter_name| parameter_value|
+---+--------------+-------------------+
|id1| address_il|0.11850447351294957|
|id2| name_en|0.18902815459657452|
|id2| address_il| 0.294998203578158|
|id1| address_en|0.48741740190944827|
|id2| name_il| 0.5651073044407224|
|id2| address_en| 0.6530661784882391|
|id1| name_il| 0.6797674631659714|
|id1| name_en| 0.9887386653580036|
+---+--------------+-------------------+
然后,我需要创建一个排序列来维护您需要的人工顺序:
from pyspark.sql import functions as F
ordering_col = (
F.when(F.col("parameter_name") == "name_en", 1)
.when(F.col("parameter_name") == "name_il", 2)
.when(F.col("parameter_name") == "address_en", 3)
.when(F.col("parameter_name") == "address_il", 4)
)
df.orderBy("id", ordering_col).show()
+---+--------------+-------------------+
| id|parameter_name| parameter_value|
+---+--------------+-------------------+
|id1| name_en| 0.9887386653580036|
|id1| name_il| 0.6797674631659714|
|id1| address_en|0.48741740190944827|
|id1| address_il|0.11850447351294957|
|id2| name_en|0.18902815459657452|
|id2| name_il| 0.5651073044407224|
|id2| address_en| 0.6530661784882391|
|id2| address_il| 0.294998203578158|
+---+--------------+-------------------+
只需将Pyspark数据帧转换为Pandas数据帧并执行排序操作。您可以使用sparkContext.createDataFrame(panda_df)
将数据帧转换回如下:
panda_df = dataframe.toPandas().sort_values(["id", "parameter_name"], ascending=(True, False))
sorted_df = sparkContext.createDataFrame(panda_df)
sorted_df.show()