我有一个数据帧,我想在上面添加一个标题和第一列手动地。这是数据帧:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.master("local").appName("my-spark-app").getOrCreate()
val df = spark.read.option("header",true).option("inferSchema",true).csv("C:\gg.csv").cache()
数据帧的内容
12,13,14
11,10,5
3,2,45
预期输出为
define,col1,col2,col3
c1,12,13,14
c2,11,10,5
c3,3,2,45
你想做的是:
df.withColumn("columnName", column) //here "columnName" should be "define" for you
现在您只需要创建所述列(这可能会有所帮助(
下面是一个依赖于 Spark 2.4 的解决方案:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.Row
//First off the dataframe needs to be loaded with the expected schema
val spark = SparkSession.builder().appName().getOrCreate()
val schema = new StructType()
.add("col1",IntegerType,true)
.add("col2",IntegerType,true)
.add("col3",IntegerType,true)
val df = spark.read.format("csv").schema(schema).load("C:\gg.csv").cache()
val rddWithId = df.rdd.zipWithIndex
// Prepend "define" column of type Long
val newSchema = StructType(Array(StructField("define", StringType, false)) ++ df.schema.fields)
val dfZippedWithId = spark.createDataFrame(rddWithId.map{
case (row, index) =>
Row.fromSeq(Array("c" + index) ++ row.toSeq)}, newSchema)
// Show results
dfZippedWithId.show
显示:
+------+----+----+----+
|define|col1|col2|col3|
+------+----+----+----+
| c0| 12| 13| 14|
| c1| 11| 10| 5|
| c2| 3| 2| 45|
+------+----+----+----+
这是此处文档和此示例的混合。