我在数据库块上使用pyspark进行项目。我有一部分代码(下面)基于日期(法语)重新格式化字符串。
现有的代码,除了冗长之外,还会导致一些性能问题,如:
- 无法显示数据帧,有一个常量"正在运行的命令">
- 导致"驱动程序已启动,但没有响应,可能是由于GC。
本项目只使用csv文件(读写)。未使用数据库
我正试图以更好的方式处理格式化任务,以避免性能和内存问题。任何建议吗?
谢谢你!
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2020","XXX0120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2020","XXX0220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2020","XXX0320").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2020","XXX0420").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2020","XXX0520").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2020","XXX0620").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2020","XXX0720").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2020","XXX0820").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2020","XXX0920").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2020","XXX1020").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2020","XXX1120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2020","XXX1220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2021","XXX0121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2021","XXX0221").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2021","XXX0321").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2021","XXX0421").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2021","XXX0521").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2021","XXX0621").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2021","XXX0721").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2021","XXX0821").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2021","XXX0921").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2021","XXX1021").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2021","XXX1121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2021","XXX1221").otherwise(courriers["Vague"]))
以编程方式生成完整条件要比逐一应用容易得多。众所周知,withColumn
在大量使用时性能很差。
最简单的方法是定义一个映射并从中生成条件,如下所示:
dates = {"XXX Janvier 2020":"XXX0120",
"XXX Fevrier 2020":"XXX0220",
"XXX Mars 2020":"XXX0320",
"XXX Avril 2020":"XXX0420",
"XXX Mai 2020":"XXX0520",
"XXX Juin 2020":"XXX0620",
"XXX Juillet 2020":"XXX0720",
"XXX Aout 2020":"XXX0820",
"XXX Septembre 2020":"XXX0920",
"XXX Octobre 2020":"XXX1020",
"XXX Novembre 2020":"XXX1120",
"XXX Decembre 2020":"XXX1220",
"XXX Janvier 2021":"XXX0121",
"XXX Fevrier 2021":"XXX0221",
"XXX Mars 2021":"XXX0321",
"XXX Avril 2021":"XXX0421",
"XXX Mai 2021":"XXX0521",
"XXX Juin 2021":"XXX0621",
"XXX Juillet 2021":"XXX0721",
"XXX Aout 2021":"XXX0821",
"XXX Septembre 2021":"XXX0921",
"XXX Octobre 2021":"XXX1021",
"XXX Novembre 2021":"XXX1121",
"XXX Decembre 2021":"XXX1221"
}
,由此可以生成所有可能值的条件:
import pyspark.sql.functions as F
cl = None
for k,v in dates.items():
if cl is None:
cl = F.when(F.col("Vague") == k, F.lit(v))
else:
cl = cl.when(F.col("Vague") == k, F.lit(v))
cl = cl.otherwise(F.col("Vague")).alias("Vague")
,可以这样使用:
df = spark.createDataFrame([["XXX Fevrier 2021"], ["22332"]],
schema="Vague string")
df.select(cl).show()
给出预期结果:
+-------+
| Vague|
+-------+
|XXX0221|
| 22332|
+-------+
理想情况下,可以通过使用正则表达式将其推广到任何年份,如:
dates = {"XXX Janvier 20(d{2})":"XXX01$1",
"XXX Fevrier 20(d{2})":"XXX02$1",
"XXX Mars 20(d{2})":"XXX03$1",
"XXX Avril 20(d{2})":"XXX04$1",
"XXX Mai 20(d{2})":"XXX05$1",
"XXX Juin 20(d{2})":"XXX06$1",
"XXX Juillet 20(d{2})":"XXX07$1",
"XXX Aout 20(d{2})":"XXX08$1",
"XXX Septembre 20(d{2})":"XXX09$1",
"XXX Octobre 20(d{2})":"XXX10$1",
"XXX Novembre 20(d{2})":"XXX11$1",
"XXX Decembre 20(d{2})":"XXX12$1",
}
cl = None
for k,v in dates.items():
if cl is None:
cl = F.regexp_replace(F.col("Vague"), k, v)
else:
cl = F.regexp_replace(cl, k, v)
cl = cl.alias("Vague")
,它会给出相同的结果,但在21世纪的任何一年都适用
另一个解决方案是利用mapType
from pyspark.sql.functions import col, create_map, lit,split,concat
from itertools import chain
df = spark.createDataFrame([["XXX Fevrier 2021"], ["XXX Aout 2021"]],
schema="Vague string")
# Create a dict only for the given months
mapping = {
"Janvier":"01",
"Fevrier": "02",
"Mars": "03",
"Avril": "04",
"Mai": "05",
"Juin": "06",
"Juillet": "07",
"Aout": "08",
"Septembre": "09",
"Octobre": "10",
"Novembre": "11",
"Decembre": "12"}
# Create the mapping
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
res = (
df.withColumn("value", concat(
split(col("Vague"),' ')[0]
, mapping_expr.getItem(split(col("Vague"),' ')[1])
, concat(split(col("Vague"),' ')[2][3:4])))
)
res.show()
提供预期结果
+----------------+-------+
| Vague| value|
+----------------+-------+
|XXX Fevrier 2021|XXX0221|
| XXX Aout 2021|XXX0821|
+----------------+-------+