将 Spark 数据帧写为 JSON 数组 (PySpark)



我想将我的 Spark 数据帧编写为一组 JSON 文件,特别是每个文件都作为 JSON 数组。 让我用一个简单的(可重现的(代码来解释。

我们有:

import numpy as np
import pandas as pd
df = spark.createDataFrame(pd.DataFrame({'x': np.random.rand(100), 'y': np.random.rand(100)}))

将数据框另存为:

df.write.json('s3://path/to/json')

刚刚创建的每个文件每行都有一个 JSON 对象,如下所示:

{"x":0.9953802385540144,"y":0.476027611419198}
{"x":0.929599290575914,"y":0.72878523939521}
{"x":0.951701684432855,"y":0.8008064729546504}

但我想每个文件都有一个JSON 数组:

[
{"x":0.9953802385540144,"y":0.476027611419198},
{"x":0.929599290575914,"y":0.72878523939521},
{"x":0.951701684432855,"y":0.8008064729546504}
]

目前不可能让 Spark "本机"以您想要的格式写入单个文件,因为 Spark 以分布式(并行(方式工作,每个执行程序独立写入其数据部分。

但是,由于您可以让每个文件成为 json 数组而不仅仅是 [一个] 文件,因此您可以使用以下解决方法来实现所需的输出:

from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct
df.select(to_json(struct(*df.columns)).alias("json"))
.groupBy(spark_partition_id())
.agg(collect_list("json").alias("json_list"))
.select(col("json_list").cast("string"))
.write.text("s3://path/to/json")

首先,从df中的所有列创建一个json。然后按火花分区 ID 分组并使用collect_list进行聚合。这会将该分区上的所有json放入一个列表中。由于在分区内聚合,因此不需要随机排列数据。

现在选择列表列,转换为字符串,并将其写入文本文件。

下面是一个文件的外观示例:

[{"x":0.1420523746714616,"y":0.30876114874052263}, ... ]

请注意,您可能会得到一些空文件。


如果您指定了一个空groupBy,大概您可以强制 Spark 将数据写入一个文件中,但这会导致将所有数据强制到单个分区中,这可能会导致内存不足错误。

如果数据不是很大,并且可以将列表作为一个 JSON 文件,则以下解决方法也是有效的。首先,将 Pyspark 数据框转换为 Pandas,然后转换为字典列表。然后,可以将列表转储为 JSON。

list_of_dicts = df.toPandas().to_dict('records')
json_file = open('path/to/file.json', 'w')
json_file.write(json.dumps(list_of_dicts))
json_file.close()

最新更新