使用性能选择操作重命名PySpark DataFrame中的列



关于如何重命名PySpark DataFrame中的列,还有其他线程,请参阅此处、此处和此处。我认为现有的解决方案没有足够的性能或通用性(我有一个应该更好的解决方案,但我遇到了一个边缘问题(。让我们从回顾当前解决方案的问题开始:

  • 重复调用withColumnRenamed可能会出现与多次调用withColumn相同的性能问题,如本文所述。见本答案中的选项2
  • toDF方法依赖于模式推断,不一定保留列的可为null属性(在生产代码中应避免使用toDF(。我猜这种方法也很慢
  • 这种方法很接近,但还不够通用,而且对于很多列来说,手工操作太多(例如,如果您试图将2000个列名转换为snake_case(

我创建了一个通用函数,适用于所有列类型,除了包含点的列名:

import pyspark.sql.functions as F
def with_columns_renamed(fun):
def _(df):
cols = list(map(
lambda col_name: F.col(col_name).alias(fun(col_name)),
df.columns
))
return df.select(*cols)
return _

假设您有以下DataFrame:

+-------------+-----------+
|i like cheese|yummy stuff|
+-------------+-----------+
|         jose|          a|
|           li|          b|
|          sam|          c|
+-------------+-----------+

以下是如何用下划线替换列名中的所有空白:

def spaces_to_underscores(s):
return s.replace(" ", "_")
df.transform(with_columns_renamed(spaces_to_underscores)).show()
+-------------+-----------+
|i_like_cheese|yummy_stuff|
+-------------+-----------+
|         jose|          a|
|           li|          b|
|          sam|          c|
+-------------+-----------+

该解决方案非常有效,除非列名中包含点。

假设你有这个DataFrame:

+-------------+-----------+
|i.like.cheese|yummy.stuff|
+-------------+-----------+
|         jose|          a|
|           li|          b|
|          sam|          c|
+-------------+-----------+

此代码将出错:

def dots_to_underscores(s):
return s.replace(".", "_")
df.transform(quinn.with_columns_renamed(dots_to_underscores))

这是错误消息:pyspark.sql.utils.AnalysisException: "cannot resolve 'i.like.cheese' given input columns: [i.like.cheese, yummy.stuff];;n'Project ['i.like.cheese AS i_like_cheese#242, 'yummy.stuff AS yummy_stuff#243]n+- LogicalRDD [i.like.cheese#231, yummy.stuff#232], falsen"

如何修改此解决方案以适用于带点的列名?我还假设Catalyst优化器对多个withColumnRenamed调用的优化问题与对多个withColumn调用的优化相同。如果Catalyst出于某种原因能够更好地处理多个withColumnRenamed调用,请告诉我。

你可以做一些简单的事情,比如

import pyspark.sql.functions as F
def with_columns_renamed(fun):
def _(df):
cols = list(map(
lambda col_name: F.col('`' + col_name + '`').alias(fun(col_name)),
df.columns
))
return df.select(*cols)
return _

我已经阅读了其他答案,不明白为什么这不是其中之一,如果我遗漏了什么,请随时指出!这并不是什么新鲜事,但它简洁明了,在中表现良好

def with_columns_renamed(func):
def _(df):
return df.selectExpr(*['`{}` AS `{}`'.format(c, func(c)) for c in df.columns])
return _

使用`:尝试escaping

import pyspark.sql.functions as F
def with_columns_renamed(fun):
def _(df):
cols = list(map(
lambda col_name: F.col("`{0}`".format(col_name)).alias(fun(col_name)),
df.columns
))
return df.select(*cols)
return _

或者将withColumnRenamedreduce一起使用。

from functools import reduce
reduce(lambda new_df, col: new_df.withColumnRenamed(col,col.replace('.','_')),df.columns,df)

最新更新