PySpark 如何使用引用另一列值的 col 函数



我有一个数据帧,它下面有列

field1 , field2 , field3 , field_name

样本数据

"a1", "b1", "c1", "field1"
"a2", "b2", "c2", "field2"
"a3", "b3", "c3", "field3"

我想添加新的列";字段值";使得它在对应于列"的内容的列中包含值;字段名";

因此上面的第一行将具有字段值="0";a1";,由于字段名包含";字段1";

输出数据帧应该看起来像

field1, field2, field3 , fieldname, fieldvalue

数据

"a1", "b1", "c1", "field1", "a1"
"a2", "b2", "c2", "field2", "b2"
"a3", "b3", "c3", "field3", "c3"

我尝试使用以下语法

df1 = df1.withColumn("fieldValue", func.col(func.col("fieldName")))

但它失败了,并出现以下错误,因为func.col需要一个列,而不是字符串

方法col([class org.apache.spark.sql.Column](不存在

使用col,您只能使用一个常量值作为参数,该参数不依赖于实际行的值。原因是执行该函数的DAG是在Spark查看数据之前创建的。col只是从数据帧中选择一列,并且该列对于所有行都必须相同。

相反,每一行都可以转换为一个数组,然后使用find_in_set计算所需列的数组中的索引,最后从数组中获取所需值:

from pyspark.sql import functions as F
df.withColumn("array", F.array(df.columns)) 
.withColumn("index", F.expr(f"find_in_set(field_name, '{','.join(df.columns)}') - 1")) 
.withColumn("fieldvalue", F.expr("array[index]")) 
.show()

输出:

+------+------+------+----------+--------------------+-----+----------+
|field1|field2|field3|field_name|               array|index|fieldvalue|
+------+------+------+----------+--------------------+-----+----------+
|    a1|    b1|    c1|    field1|[a1, b1, c1, field1]|    0|        a1|
|    a2|    b2|    c2|    field2|[a2, b2, c2, field2]|    1|        b2|
|    a3|    b3|    c3|    field3|[a3, b3, c3, field3]|    2|        c3|
+------+------+------+----------+--------------------+-----+----------+

使用udf也可以获得相同的结果,但Spark SQL函数通常比udf更快,尤其是在使用Python时。

编辑:如果array不起作用,同样的想法可以用于地图:

m = "map(" + ",".join([f"'{c}', {c}" for c in df.columns]) + ")"
df.withColumn("map", F.expr(m)) 
.withColumn("fieldvalue", F.expr("map[field_name]")) 
.show()

相关内容

  • 没有找到相关文章