将RDD添加到DataFrame Column PySpark



我想创建一个包含两个RDD列的数据帧。第一个是我从CSV获得的RDD,第二个是另一个RDD,每行都有集群预测。

我的架构是:

customSchema = StructType([ 
StructField("Area", FloatType(), True), 
StructField("Perimeter", FloatType(), True), 
StructField("Compactness", FloatType(), True), 
StructField("Lenght", FloatType(), True), 
StructField("Width", FloatType(), True), 
StructField("Asymmetry", FloatType(), True), 
StructField("KernelGroove", FloatType(), True)])

映射我的 rdd 并创建数据帧:

FN2 = rdd.map(lambda x: (float(x[0]), float(x[1]),float(x[2]),float(x[3]),float(x[4]),float(x[5]),float(x[6])))
 df = sqlContext.createDataFrame(FN2, customSchema)

我的聚类预测:

result = Kmodel.predict(rdd)

因此,总而言之,我希望在我的数据帧中包含 CSV 的行及其最后的聚类预测。

我尝试使用 .WithColumn(( 但我一无所获。

谢谢。

如果两个数据框上都有公共字段,则使用键联接,否则创建一个唯一的 ID 并联接两个数据帧,以获取单个数据帧中的 CSV 行及其聚类预测

Scala 代码为每一行生成一个唯一的 id,尝试为 pyspark 进行转换。您需要生成一个递增的行 ID 并使用行 ID 连接它们

import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(("abc", 2), ("def", 1), ("hij", 3))).toDF("word", "count")
val wcschema = df.schema
val inputRows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
val wcID = sqlContext.createDataFrame(inputRows, StructType(StructField("id", LongType, false) +: wcschema.fields))

或使用 SQL 查询

val tmpTable1 = sqlContext.sql("select row_number() over (order by count) as rnk,word,count from wordcount")
tmpTable1.show()

最新更新