在 pyspark 中优化"withColumn when otherwise"性能



我在数据库块上使用pyspark进行项目。我有一部分代码(下面)基于日期(法语)重新格式化字符串。

现有的代码,除了冗长之外,还会导致一些性能问题,如:

  • 无法显示数据帧,有一个常量"正在运行的命令">
  • 导致"驱动程序已启动,但没有响应,可能是由于GC。

本项目只使用csv文件(读写)。未使用数据库

我正试图以更好的方式处理格式化任务,以避免性能和内存问题。任何建议吗?

谢谢你!

courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2020","XXX0120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2020","XXX0220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2020","XXX0320").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2020","XXX0420").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2020","XXX0520").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2020","XXX0620").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2020","XXX0720").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2020","XXX0820").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2020","XXX0920").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2020","XXX1020").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2020","XXX1120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2020","XXX1220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2021","XXX0121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2021","XXX0221").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2021","XXX0321").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2021","XXX0421").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2021","XXX0521").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2021","XXX0621").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2021","XXX0721").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2021","XXX0821").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2021","XXX0921").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2021","XXX1021").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2021","XXX1121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2021","XXX1221").otherwise(courriers["Vague"]))

以编程方式生成完整条件要比逐一应用容易得多。众所周知,withColumn在大量使用时性能很差。

最简单的方法是定义一个映射并从中生成条件,如下所示:

dates = {"XXX Janvier 2020":"XXX0120",
"XXX Fevrier 2020":"XXX0220",
"XXX Mars 2020":"XXX0320",
"XXX Avril 2020":"XXX0420",
"XXX Mai 2020":"XXX0520",
"XXX Juin 2020":"XXX0620",
"XXX Juillet 2020":"XXX0720",
"XXX Aout 2020":"XXX0820",
"XXX Septembre 2020":"XXX0920",
"XXX Octobre 2020":"XXX1020",
"XXX Novembre 2020":"XXX1120",
"XXX Decembre 2020":"XXX1220",
"XXX Janvier 2021":"XXX0121",
"XXX Fevrier 2021":"XXX0221",
"XXX Mars 2021":"XXX0321",
"XXX Avril 2021":"XXX0421",
"XXX Mai 2021":"XXX0521",
"XXX Juin 2021":"XXX0621",
"XXX Juillet 2021":"XXX0721",
"XXX Aout 2021":"XXX0821",
"XXX Septembre 2021":"XXX0921",
"XXX Octobre 2021":"XXX1021",
"XXX Novembre 2021":"XXX1121",
"XXX Decembre 2021":"XXX1221"
}

,由此可以生成所有可能值的条件:

import pyspark.sql.functions as F
cl = None
for k,v in dates.items():
if cl is None:
cl = F.when(F.col("Vague") == k, F.lit(v))
else:
cl = cl.when(F.col("Vague") == k, F.lit(v))
cl = cl.otherwise(F.col("Vague")).alias("Vague")

,可以这样使用:

df = spark.createDataFrame([["XXX Fevrier 2021"], ["22332"]], 
schema="Vague string")
df.select(cl).show()

给出预期结果:

+-------+
|  Vague|
+-------+
|XXX0221|
|  22332|
+-------+

理想情况下,可以通过使用正则表达式将其推广到任何年份,如:

dates = {"XXX Janvier 20(d{2})":"XXX01$1",
"XXX Fevrier 20(d{2})":"XXX02$1",
"XXX Mars 20(d{2})":"XXX03$1",
"XXX Avril 20(d{2})":"XXX04$1",
"XXX Mai 20(d{2})":"XXX05$1",
"XXX Juin 20(d{2})":"XXX06$1",
"XXX Juillet 20(d{2})":"XXX07$1",
"XXX Aout 20(d{2})":"XXX08$1",
"XXX Septembre 20(d{2})":"XXX09$1",
"XXX Octobre 20(d{2})":"XXX10$1",
"XXX Novembre 20(d{2})":"XXX11$1",
"XXX Decembre 20(d{2})":"XXX12$1",
}
cl = None
for k,v in dates.items():
if cl is None:
cl = F.regexp_replace(F.col("Vague"), k, v)
else:
cl = F.regexp_replace(cl, k, v)
cl = cl.alias("Vague")

,它会给出相同的结果,但在21世纪的任何一年都适用

另一个解决方案是利用mapType

from pyspark.sql.functions import col, create_map, lit,split,concat
from itertools import chain
df = spark.createDataFrame([["XXX Fevrier 2021"], ["XXX Aout 2021"]], 
schema="Vague string")
# Create a dict only for the given months
mapping = {
"Janvier":"01",
"Fevrier": "02",
"Mars": "03",
"Avril": "04",
"Mai": "05",
"Juin": "06",
"Juillet": "07",
"Aout": "08",
"Septembre": "09",
"Octobre": "10",
"Novembre": "11",
"Decembre": "12"}
# Create the mapping
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
res = (
df.withColumn("value", concat(
split(col("Vague"),' ')[0] 
, mapping_expr.getItem(split(col("Vague"),' ')[1])
, concat(split(col("Vague"),' ')[2][3:4])))
)
res.show()

提供预期结果

+----------------+-------+
|           Vague|  value|
+----------------+-------+
|XXX Fevrier 2021|XXX0221|
|   XXX Aout 2021|XXX0821|
+----------------+-------+

最新更新