我有一个数据帧,它是用一个Pipeline对象创建的,看起来像这样:
df.show()
+--------------------+-----+
| features|label|
+--------------------+-----+
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
|[-0.0775219322931...| 0|
+--------------------+-----+
我已经成功地提取了这样的特征向量:
df_table = df.rdd.map(lambda x: [float(y) for y in x['features']]).toDF(cols)
上面的问题是它没有保留标签列。作为一种变通方法,我成功地使用了一个Join来恢复标签列,但我发现它太复杂了。
我该如何使用像上面这样的一行来提取特征向量,并从中生成Spark DF,同时将标签列附加到它上?
您在这里有很好的选择,尤其是如果您有Spark
>=3.0.0 版本
假设你没有这么新的版本,你的问题来自于你在地图上丢失了钥匙。你可以做:
df_table = df.rdd.map(lambda l: tuple([l['label']] + [float(y) for y in l['features']])).toDF()
最终得到的是一个格式较宽的数据帧。如果你想要一个长格式的矢量,你有更多的选择。
如果您想要长格式的数据
首先,使用rdd:
df.rdd.flatMapValues(lambda l: l).toDF(['label','feature'])
或者,更好的是,直接使用DataFrame API
:(未经测试的解决方案(
import pyspark.sql.functions as psf
df.select('label', psf.explode(psf.col('label')))