如何用点重命名列



我使用Spark 1.5。

我正在为包含其名称点的点(例如param.x.y)而苦苦挣扎。我首先有选择它们的问题,但随后我读到我需要使用`calnecon(param.x.y`)。

现在,我在尝试重命名列时有问题。我正在使用类似的方法,但似乎它不起作用:

df.withColumnRenamed("`param.x.y`", "param_x_y")

所以我想检查 - 这真的是一个错误,还是我做错了什么?

在您的代码中看起来像是在原始列名中使用``。我只是删除了它,它对我有用。示例工作代码以重命名dataframe中的列名。

import org.apache.spark._
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql._
import org.apache.spark._
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{ StructType, StructField, StringType };
object RenameColumn extends Serializable {
  val conf = new SparkConf().setAppName("read local file")
  conf.set("spark.executor.memory", "100M")
  conf.setMaster("local");
  val sc = new SparkContext(conf)
  // sc is an existing SparkContext.
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  def main(args: Array[String]): Unit = {
    // Create an RDD
    val people = sc.textFile("C:/Users/User1/Documents/test");
    // The schema is encoded in a string
    val schemaString = "name age"
    // Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    // Convert records of the RDD (people) to Rows.
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
    // Apply the schema to the RDD.
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    peopleDataFrame.printSchema()
    val renamedSchema = peopleDataFrame.withColumnRenamed("name", "name_renamed");
    renamedSchema.printSchema();
    sc.stop
  }
}

其输出:

16/12/26 16:53:48 INFO SparkContext: Created broadcast 0 from textFile at RenameColumn.scala:28
root
 root
 |-- name.rename: string (nullable = true)
 |-- age: string (nullable = true)
root
 |-- name_renamed: string (nullable = true)
 |-- age: string (nullable = true)
16/12/26 16:53:49 INFO SparkUI: Stopped Spark web UI at http://XXX.XXX.XXX.XXX:<port_number>
16/12/26 16:53:49 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

有关更多信息,您可以检查Spark DataFrame Documentation

更新:我刚刚用引号的字符串测试并获得了预期的输出。请参阅下面的代码及其输出。

val schemaString = "`name.rename` age"
    // Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    // Convert records of the RDD (people) to Rows.
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
    // Apply the schema to the RDD.
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    peopleDataFrame.printSchema()
    val renamedSchema = peopleDataFrame.withColumnRenamed("`name.rename`", "name_renamed");
    renamedSchema.printSchema();
    sc.stop

其输出:

16/12/26 20:24:24 INFO SparkContext: Created broadcast 0 from textFile at RenameColumn.scala:28
root
 |-- `name.rename`: string (nullable = true)
 |-- age: string (nullable = true)
root
 |-- name_renamed: string (nullable = true)
 |-- age: string (nullable = true)
16/12/26 20:24:25 INFO SparkUI: Stopped Spark web UI at http://xxx.xxx.xxx.x:<port_number>
16/12/26 20:24:25 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

相关内容

  • 没有找到相关文章

最新更新