如何在Spark SQL中压缩两个数组列



我有一个Pandas数据帧。我尝试先将包含字符串值的两列连接到一个列表中,然后使用zip将列表中的每个元素与"_"连接。我的数据集如下:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'

我想将这两列连接到第三列中,如下所示,用于我的数据帧的每一行。

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]

我已经使用下面的代码在python中成功地做到了这一点,但数据帧相当大,并且需要很长时间才能为整个数据帧运行它。为了提高效率,我想在PySpark中做同样的事情。我已经成功地读取了spark数据帧中的数据,但我很难确定如何用PySpark等效函数复制Pandas函数。如何在PySpark中获得想要的结果?

df['column_3'] = df['column_2']
for index, row in df.iterrows():
while index < 3:
if isinstance(row['column_1'], str):      
row['column_1'] = list(row['column_1'].split(','))
row['column_2'] = list(row['column_2'].split(','))
row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]

我已经在PySpark中使用以下代码将这两列转换为数组

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split
crash.withColumn("column_1",
split(col("column_1"), ",s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
split(col("column_2"), ",s*").cast(ArrayType(StringType())).alias("column_2")
)

现在,我只需要使用"_"压缩两列中数组的每个元素。这个怎么用拉链?感谢您的帮助。

类似于Python的Spark SQL是pyspark.sql.functions.arrays_zip:

pyspark.sql.functions.arrays_zip(*cols)

集合函数:返回一个结构的合并数组,其中第N个结构包含输入数组的所有第N个值。

因此,如果您已经有两个数组:

from pyspark.sql.functions import split
df = (spark
.createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
.toDF("column_1", "column_2")
.withColumn("column_1", split("column_1", "s*,s*"))
.withColumn("column_2", split("column_2", "s*,s*")))

您可以将其应用于结果

from pyspark.sql.functions import arrays_zip
df_zipped = df.withColumn(
"zipped", arrays_zip("column_1", "column_2")
)
df_zipped.select("zipped").show(truncate=False)
+------------------------------------+
|zipped                              |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+

现在要组合结果,您可以transform(如何使用转换高阶函数?,TypeError:Column不可迭代-如何迭代ArrayType()?):

df_zipped_concat = df_zipped.withColumn(
"zipped_concat",
expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
) 
df_zipped_concat.select("zipped_concat").show(truncate=False)
+---------------------------+
|zipped_concat              |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+

注意

Apache Spark 2.4中引入了高阶函数transformarrays_zip

对于Spark 2.4+,这可以只使用zip_with函数同时压缩连接:

df.withColumn("column_3", expr("zip_with(column_1, column_2, (x, y) -> concat(x, '_', y))")) 

高阶函数使用lambda函数(x, y) -> concat(x, '_', y)对2个数组进行元素合并。

您还可以UDF来压缩拆分的数组列,

df = spark.createDataFrame([('abc,def,ghi','1.0,2.0,3.0')], ['col1','col2']) 
+-----------+-----------+
|col1       |col2       |
+-----------+-----------+
|abc,def,ghi|1.0,2.0,3.0|
+-----------+-----------+ ## Hope this is how your dataframe is
from pyspark.sql import functions as F
from pyspark.sql.types import *
def concat_udf(*args):
return ['_'.join(x) for x in zip(*args)]
udf1 = F.udf(concat_udf,ArrayType(StringType()))
df = df.withColumn('col3',udf1(F.split(df.col1,','),F.split(df.col2,',')))
df.show(1,False)
+-----------+-----------+---------------------------+
|col1       |col2       |col3                       |
+-----------+-----------+---------------------------+
|abc,def,ghi|1.0,2.0,3.0|[abc_1.0, def_2.0, ghi_3.0]|
+-----------+-----------+---------------------------+

对于Spark 3.1+,他们现在为pyspark.sql.functions.zip_with()提供了Python lambda函数,因此可以这样做:

import pyspark.sql.functions as F
df = df.withColumn("column_3", F.zip_with("column_1", "column_2", lambda x,y: F.concat_ws("_", x, y)))

相关内容

  • 没有找到相关文章

最新更新