PySpark 中的高效列处理



我有一个包含大量列的数据帧 (>30000(。

我根据第一列填充了10,如下所示:

for column in list_of_column_names:
df = df.withColumn(column, when(array_contains(df['list_column'], column), 1).otherwise(0))

但是,此过程需要花费大量时间。有没有办法更有效地做到这一点?有人告诉我,列处理可以并行化。

编辑:

示例输入数据

+----------------+-----+-----+-----+
|  list_column   | Foo | Bar | Baz |
+----------------+-----+-----+-----+
| ['Foo', 'Bak'] |     |     |     |
| ['Bar', Baz']  |     |     |     |
| ['Foo']        |     |     |     |
+----------------+-----+-----+-----+

你的代码没有什么特别的问题,除了非常宽的数据:

for column in list_of_column_names:
df = df.withColumn(...)

仅生成执行计划。

一旦评估结果,实际的数据处理将并发和并行化。

然而,这是一个昂贵的过程,因为它需要O(NMK(操作,列表中有 N 行、M列和K值。

此外,对非常广泛的数据的执行计划的计算成本非常高(尽管就记录数而言,成本是恒定的(。如果它成为一个限制因素,你可能会更好地RDDs

  • 使用sort_array函数对列数组进行排序。
  • 将数据转换为RDD
  • 使用二叉搜索对每一列应用搜索。

你可能会这样处理,

import pyspark.sql.functions as F
exprs = [F.when(F.array_contains(F.col('list_column'), column), 1).otherwise(0).alias(column)
for column in list_column_names]
df = df.select(['list_column']+exprs)

withColumn已经分发,因此除了您已经拥有的方法之外,很难获得更快的方法。 您可以尝试定义udf函数,如下所示

from pyspark.sql import functions as f
from pyspark.sql import types as t
def containsUdf(listColumn):
row = {}
for column in list_of_column_names:
if(column in listColumn):
row.update({column: 1})
else:
row.update({column: 0})
return row
callContainsUdf = f.udf(containsUdf, t.StructType([t.StructField(x, t.StringType(), True) for x in list_of_column_names]))
df.withColumn('struct', callContainsUdf(df['list_column']))
.select(f.col('list_column'), f.col('struct.*'))
.show(truncate=False)

应该给你

+-----------+---+---+---+
|list_column|Foo|Bar|Baz|
+-----------+---+---+---+
|[Foo, Bak] |1  |0  |0  |
|[Bar, Baz] |0  |1  |1  |
|[Foo]      |1  |0  |0  |
+-----------+---+---+---+

注:list_of_column_names = ["Foo","Bar","Baz"]

最新更新