我有一个数据帧,它下面有列
field1 , field2 , field3 , field_name
样本数据
"a1", "b1", "c1", "field1"
"a2", "b2", "c2", "field2"
"a3", "b3", "c3", "field3"
我想添加新的列";字段值";使得它在对应于列"的内容的列中包含值;字段名";
因此上面的第一行将具有字段值="0";a1";,由于字段名包含";字段1";
输出数据帧应该看起来像
field1, field2, field3 , fieldname, fieldvalue
数据
"a1", "b1", "c1", "field1", "a1"
"a2", "b2", "c2", "field2", "b2"
"a3", "b3", "c3", "field3", "c3"
我尝试使用以下语法
df1 = df1.withColumn("fieldValue", func.col(func.col("fieldName")))
但它失败了,并出现以下错误,因为func.col需要一个列,而不是字符串
方法col([class org.apache.spark.sql.Column](不存在
使用col,您只能使用一个常量值作为参数,该参数不依赖于实际行的值。原因是执行该函数的DAG是在Spark查看数据之前创建的。col
只是从数据帧中选择一列,并且该列对于所有行都必须相同。
相反,每一行都可以转换为一个数组,然后使用find_in_set计算所需列的数组中的索引,最后从数组中获取所需值:
from pyspark.sql import functions as F
df.withColumn("array", F.array(df.columns))
.withColumn("index", F.expr(f"find_in_set(field_name, '{','.join(df.columns)}') - 1"))
.withColumn("fieldvalue", F.expr("array[index]"))
.show()
输出:
+------+------+------+----------+--------------------+-----+----------+
|field1|field2|field3|field_name| array|index|fieldvalue|
+------+------+------+----------+--------------------+-----+----------+
| a1| b1| c1| field1|[a1, b1, c1, field1]| 0| a1|
| a2| b2| c2| field2|[a2, b2, c2, field2]| 1| b2|
| a3| b3| c3| field3|[a3, b3, c3, field3]| 2| c3|
+------+------+------+----------+--------------------+-----+----------+
使用udf也可以获得相同的结果,但Spark SQL函数通常比udf更快,尤其是在使用Python时。
编辑:如果array
不起作用,同样的想法可以用于地图:
m = "map(" + ",".join([f"'{c}', {c}" for c in df.columns]) + ")"
df.withColumn("map", F.expr(m))
.withColumn("fieldvalue", F.expr("map[field_name]"))
.show()