S3中的Json结构如下。我已经成功地将其爬网到数据目录表中,并将其导入到DynamicFrame中。
{
"ColumnA": "Value",
"ColumnB": [
"Value"
],
"ColumnC": "Value",
"ColumnD": "Value"
}
DynamicFrame的模式
root
|-- columnA: string
|-- columnB: array
| |-- element: string
|-- columnC: string
|-- columnD: string
虽然columnB是一个数组类型,但其中只有1个值。我无法控制生成这些JSON文件的源,所以我必须使用这种格式。
我需要将其推送到具有以下模式的Redshift表中。
+--------+-------+-------+-------+
| ColumnA|ColumnB|ColumnC|ColumnD|
+--------+-------+-------+-------+
虽然列A/C/D相当简单,但如何从DynamicFrame中的"ColumnB"数组中提取第一个值以便能够写入Redshift表?
From Spark-2.4+:
使用element_at
函数从数组中获取第一个值
Example:
df=spark.createDataFrame([("value",["value"],"value","value")],["ColumnA","ColumnB","ColumnC","ColumnD"])
df.printSchema()
#root
# |-- ColumnA: string (nullable = true)
# |-- ColumnB: array (nullable = true)
# | |-- element: string (containsNull = true)
# |-- ColumnC: string (nullable = true)
# |-- ColumnD: string (nullable = true)
from pyspark.sql.functions import *
df.withColumn("ColumnB",element_at(col("ColumnB"),1)).show()
#+-------+-------+-------+-------+
#|ColumnA|ColumnB|ColumnC|ColumnD|
#+-------+-------+-------+-------+
#| value| value| value| value|
#+-------+-------+-------+-------+
For spark < 2.4:
#Using .getItem(0)
df.withColumn("ColumnB",col("ColumnB").getItem(0)).show()
#+-------+-------+-------+-------+
#|ColumnA|ColumnB|ColumnC|ColumnD|
#+-------+-------+-------+-------+
#| value| value| value| value|
#+-------+-------+-------+-------+
#using index
df.withColumn("ColumnB",col("ColumnB")[0]).show()
#+-------+-------+-------+-------+
#|ColumnA|ColumnB|ColumnC|ColumnD|
#+-------+-------+-------+-------+
#| value| value| value| value|
#+-------+-------+-------+-------+