如何在pyspark中解析嵌套列表的JSON字符串以激发数据帧?
输入数据帧:
+-------------+-----------------------------------------------+
|url |json |
+-------------+-----------------------------------------------+
|https://url.a|[[1572393600000, 1.000],[1572480000000, 1.007]]|
|https://url.b|[[1572825600000, 1.002],[1572912000000, 1.000]]|
+-------------+-----------------------------------------------+
root
|-- url: string (nullable = true)
|-- json: string (nullable = true)
预期输出:
+---------------------------------------+
|col_1 | col_2 | col_3 |
+---------------------------------------+
| a | 1572393600000 | 1.000 |
| a | 1572480000000 | 1.007 |
| b | 1572825600000 | 1.002 |
| b | 1572912000000 | 1.000 |
+---------------------------------------+
示例代码:
import pyspark
import pyspark.sql.functions as F
spark = (pyspark.sql.SparkSession.builder.appName("Downloader_standalone")
.master('local[*]')
.getOrCreate())
sc = spark.sparkContext
from pyspark.sql import Row
rdd_list = [('https://url.a','[[1572393600000, 1.000],[1572480000000, 1.007]]'),
('https://url.b','[[1572825600000, 1.002],[1572912000000, 1.000]]')]
jsons = sc.parallelize(rdd_list)
df = spark.createDataFrame(jsons, "url string, json string")
df.show(truncate=False)
df.printSchema()
(df.withColumn('json', F.from_json(F.col('json'),"array<string,string>"))
.select(F.explode('json').alias('col_1', 'col_2', 'col_3')).show())
例子很少,但我不知道怎么做:
如何在pyspark中从spark数据帧行解析和转换json字符串
如何转换JSON字符串与多个键,从spark数据帧行在pyspark?
在字符串中进行一些替换并通过分割,您可以获得所需的结果:
from pyspark.sql import functions as F
df1 = df.withColumn(
"col_1",
F.regexp_replace("url", "https://url.", "")
).withColumn(
"col_2_3",
F.explode(
F.expr("""transform(
split(trim(both '][' from json), '\],\['),
x -> struct(split(x, ',')[0] as col_2, split(x, ',')[1] as col_3)
)""")
)
).selectExpr("col_1", "col_2_3.*")
df1.show(truncate=False)
#+-----+-------------+------+
#|col_1|col_2 |col_3 |
#+-----+-------------+------+
#|a |1572393600000| 1.000|
#|a |1572480000000| 1.007|
#|b |1572825600000| 1.002|
#|b |1572912000000| 1.000|
#+-----+-------------+------+
解释:
trim(both '][' from json)
:删除尾随和前导字符[
和]
,得到类似于:1572393600000, 1.000],[1572480000000, 1.007
现在你可以分割
],[
(\
是转义括号)transform
从拆分中获取数组,对于每个元素,它以逗号分隔并创建结构体col_2
和col_3
爆炸从转换中获得的结构数组并星形展开结构列
df.select(df.url, F.explode(F.from_json(df.json,"array<string>")))
.select("url",F.from_json((F.col("col")),"array<string>").alias("col"))
.select("url",F.col("col").getItem(0),F.col("col").getItem(1))
.show(truncate=False)
+-------------+-------------+------+
|url |col[0] |col[1]|
+-------------+-------------+------+
|https://url.a|1572393600000|1.0 |
|https://url.a|1572480000000|1.007 |
|https://url.b|1572825600000|1.002 |
|https://url.b|1572912000000|1.0 |
+-------------+-------------+------+
df.select(col("url"),
explode(from_json(col("json"), ArrayType(StringType))))
.select(col("url"),
from_json(col("col"), ArrayType(StringType)).alias("col"))
.select(col("url"),
col("col").getItem(0).alias("code"),
col("col").getItem(1).alias("value"))