Spark:如何解析JSON字符串的嵌套列表火花数据框架?



如何在pyspark中解析嵌套列表的JSON字符串以激发数据帧?

输入数据帧:

+-------------+-----------------------------------------------+
|url          |json                                           |
+-------------+-----------------------------------------------+
|https://url.a|[[1572393600000, 1.000],[1572480000000, 1.007]]|
|https://url.b|[[1572825600000, 1.002],[1572912000000, 1.000]]|
+-------------+-----------------------------------------------+
root
|-- url: string (nullable = true)
|-- json: string (nullable = true)

预期输出:

+---------------------------------------+
|col_1 | col_2               | col_3    |
+---------------------------------------+
| a    | 1572393600000       |  1.000   | 
| a    | 1572480000000       |  1.007   |
| b    | 1572825600000       |  1.002   |
| b    | 1572912000000       |  1.000   |
+---------------------------------------+

示例代码:

import pyspark
import pyspark.sql.functions as F
spark = (pyspark.sql.SparkSession.builder.appName("Downloader_standalone")
.master('local[*]')
.getOrCreate())
sc = spark.sparkContext
from pyspark.sql import Row
rdd_list  = [('https://url.a','[[1572393600000, 1.000],[1572480000000, 1.007]]'),
('https://url.b','[[1572825600000, 1.002],[1572912000000, 1.000]]')]
jsons = sc.parallelize(rdd_list) 
df = spark.createDataFrame(jsons, "url string, json string")
df.show(truncate=False)
df.printSchema()

(df.withColumn('json', F.from_json(F.col('json'),"array<string,string>"))
.select(F.explode('json').alias('col_1', 'col_2', 'col_3')).show())

例子很少,但我不知道怎么做:

  • 如何在pyspark中从spark数据帧行解析和转换json字符串

  • 如何转换JSON字符串与多个键,从spark数据帧行在pyspark?

在字符串中进行一些替换并通过分割,您可以获得所需的结果:

from pyspark.sql import functions as F
df1 = df.withColumn(
"col_1",
F.regexp_replace("url", "https://url.", "")
).withColumn(
"col_2_3",
F.explode(
F.expr("""transform(
split(trim(both '][' from json), '\],\['), 
x -> struct(split(x, ',')[0] as col_2, split(x, ',')[1] as col_3)
)""")
)
).selectExpr("col_1", "col_2_3.*")
df1.show(truncate=False)
#+-----+-------------+------+
#|col_1|col_2        |col_3 |
#+-----+-------------+------+
#|a    |1572393600000| 1.000|
#|a    |1572480000000| 1.007|
#|b    |1572825600000| 1.002|
#|b    |1572912000000| 1.000|
#+-----+-------------+------+

解释:

  1. trim(both '][' from json):删除尾随和前导字符[],得到类似于:1572393600000, 1.000],[1572480000000, 1.007

  2. 现在你可以分割],[(\是转义括号)

  3. transform从拆分中获取数组,对于每个元素,它以逗号分隔并创建结构体col_2col_3

  4. 爆炸从转换中获得的结构数组并星形展开结构列

df.select(df.url, F.explode(F.from_json(df.json,"array<string>")))
.select("url",F.from_json((F.col("col")),"array<string>").alias("col"))
.select("url",F.col("col").getItem(0),F.col("col").getItem(1))
.show(truncate=False)
+-------------+-------------+------+
|url          |col[0]       |col[1]|
+-------------+-------------+------+
|https://url.a|1572393600000|1.0   |
|https://url.a|1572480000000|1.007 |
|https://url.b|1572825600000|1.002 |
|https://url.b|1572912000000|1.0   |
+-------------+-------------+------+
df.select(col("url"), 
explode(from_json(col("json"), ArrayType(StringType))))      
.select(col("url"), 
from_json(col("col"), ArrayType(StringType)).alias("col"))      
.select(col("url"),
col("col").getItem(0).alias("code"),        
col("col").getItem(1).alias("value"))

最新更新