我有一个包含大量列的数据帧 (>30000(。
我根据第一列填充了1
和0
,如下所示:
for column in list_of_column_names:
df = df.withColumn(column, when(array_contains(df['list_column'], column), 1).otherwise(0))
但是,此过程需要花费大量时间。有没有办法更有效地做到这一点?有人告诉我,列处理可以并行化。
编辑:
示例输入数据
+----------------+-----+-----+-----+
| list_column | Foo | Bar | Baz |
+----------------+-----+-----+-----+
| ['Foo', 'Bak'] | | | |
| ['Bar', Baz'] | | | |
| ['Foo'] | | | |
+----------------+-----+-----+-----+
你的代码没有什么特别的问题,除了非常宽的数据:
for column in list_of_column_names:
df = df.withColumn(...)
仅生成执行计划。
一旦评估结果,实际的数据处理将并发和并行化。
然而,这是一个昂贵的过程,因为它需要O(NMK(操作,列表中有 N 行、M列和K值。
此外,对非常广泛的数据的执行计划的计算成本非常高(尽管就记录数而言,成本是恒定的(。如果它成为一个限制因素,你可能会更好地RDDs
:
- 使用
sort_array
函数对列数组进行排序。 - 将数据转换为
RDD
。 - 使用二叉搜索对每一列应用搜索。
你可能会这样处理,
import pyspark.sql.functions as F
exprs = [F.when(F.array_contains(F.col('list_column'), column), 1).otherwise(0).alias(column)
for column in list_column_names]
df = df.select(['list_column']+exprs)
withColumn
已经分发,因此除了您已经拥有的方法之外,很难获得更快的方法。 您可以尝试定义udf
函数,如下所示
from pyspark.sql import functions as f
from pyspark.sql import types as t
def containsUdf(listColumn):
row = {}
for column in list_of_column_names:
if(column in listColumn):
row.update({column: 1})
else:
row.update({column: 0})
return row
callContainsUdf = f.udf(containsUdf, t.StructType([t.StructField(x, t.StringType(), True) for x in list_of_column_names]))
df.withColumn('struct', callContainsUdf(df['list_column']))
.select(f.col('list_column'), f.col('struct.*'))
.show(truncate=False)
应该给你
+-----------+---+---+---+
|list_column|Foo|Bar|Baz|
+-----------+---+---+---+
|[Foo, Bak] |1 |0 |0 |
|[Bar, Baz] |0 |1 |1 |
|[Foo] |1 |0 |0 |
+-----------+---+---+---+
注:list_of_column_names = ["Foo","Bar","Baz"]