Spark-Java:如何将数据集中的列名替换为<Row>新名称?



我正在使用Spark作为 JSON 从MongoDB读取数据:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
....
..    
Dataset<Row> ds = MongoSpark.load(jsc).toDF(); 

我需要将此 DF 存储到 Hive。
问题是MongoDB中的一个列名是Timestamp这是Hive中的保留字。 因此,来自MongoDB的JSON数据包含一个键作为timestamp
我需要替换这个 json 密钥"timestamp""timestamp_".
如何将Dataset<Row> ds中的列名"timestamp"替换为"timestamp_"

如果要重命名嵌套列,可以执行以下操作:

  1. 展平/分解所有结构列
  2. 重命名列
  3. 向后折叠所有列

假设数据集架构如下所示:

root  
|-- col1
|-- col2
|-- struct1
|    |-- timestamp
|    |-- a  
|    |-- b

因此,您可以执行以下操作

ds = ds
.select(col("*"), col("struct1.*"))
.withColumnRenamed("timestamp", "timestamp_")
.select(
col("col1"),
col("col2"),
struct("timestamp_", "a", "b").as("struct1")
);

如果要重命名数组中的嵌套列,首先应使用explode函数扩展数组。

ds = ds.select(col("a"), col("b"), explode(col("struct1")))

然后,您可以如上所述重命名嵌套列。如果在重命名后要将结构折叠回数组下,请使用groupBy(...).agg(collect_list(...))

相关内容

最新更新