我正在使用Spark
作为 JSON 从MongoDB
读取数据:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
....
..
Dataset<Row> ds = MongoSpark.load(jsc).toDF();
我需要将此 DF 存储到 Hive。
问题是MongoDB中的一个列名是Timestamp
这是Hive中的保留字。 因此,来自MongoDB的JSON数据包含一个键作为timestamp
。
我需要替换这个 json 密钥"timestamp"
以"timestamp_"
.
如何将Dataset<Row> ds
中的列名"timestamp"
替换为"timestamp_"
?
如果要重命名嵌套列,可以执行以下操作:
- 展平/分解所有结构列
- 重命名列
- 向后折叠所有列
假设数据集架构如下所示:
root
|-- col1
|-- col2
|-- struct1
| |-- timestamp
| |-- a
| |-- b
因此,您可以执行以下操作
ds = ds
.select(col("*"), col("struct1.*"))
.withColumnRenamed("timestamp", "timestamp_")
.select(
col("col1"),
col("col2"),
struct("timestamp_", "a", "b").as("struct1")
);
如果要重命名数组中的嵌套列,首先应使用explode
函数扩展数组。
ds = ds.select(col("a"), col("b"), explode(col("struct1")))
然后,您可以如上所述重命名嵌套列。如果在重命名后要将结构折叠回数组下,请使用groupBy(...).agg(collect_list(...))
。