Pyspark:从JSON文件创建模式



我正在处理来自非常长的嵌套JSON文件的数据。问题是,这些文件的结构并不总是相同的,因为其中一些文件缺少其他文件具有的列。我想从包含所有列的空JSON文件创建一个自定义模式。如果我稍后将JSON文件读取到这个预定义的模式中,则不存在的列将被空值填充(至少计划是这样)。到目前为止我所做的:

  1. 将测试JSON(不包含所有可以预期的列)加载到数据帧
  2. 将schema写入JSON文件
  3. 在文本编辑器中打开这个JSON文件并手动添加缺失的列

我想做的下一件事是通过将JSON文件读取到我的代码中来创建一个新的模式,但我与语法斗争。我可以直接从文件本身读取模式吗?我试过了

schemaFromJson = StructType.fromJson(json.loads('filepath/spark-schema.json'))

但是它给了我TypeError:init()缺少2个必需的位置参数:'doc'和'pos'

你知道我现在的代码有什么问题吗?谢谢你。

编辑:我看到了这个链接sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield。第7章大致描述了我所遇到的问题。我只是不明白我如何解析json文件我手动增强schemaFromJson = structttype . fromjson (json.loads(schema.json))

当我这样做的时候:

jsonDF = spark.read.json(filesToLoad)
schema = jsonDF.schema.json()
schemaNew = StructType.fromJson(json.loads(schema))
jsonDF2 = spark.read.schema(schemaNew).json(filesToLoad)

代码运行,但它显然没有用,因为jsonDF和jsonDF2确实具有相同的内容/模式。我想要实现的是,在'schema'中添加一些列,然后在' schemnew '中反映出来。

我想我明白了。Schemapath包含已经增强的模式:

schemapath = '/path/spark-schema.json'
with open(schemapath) as f:
d = json.load(f)
schemaNew = StructType.fromJson(d)
jsonDf2 = spark.read.schema(schmaNew).json(filesToLoad)
jsonDF2.printSchema()

为什么不定义一个包含JSON文件可以拥有的所有列的空DF ?然后将json加载到其中。这是一个想法:

Spark 3.1.0:

from pyspark.sql.types import *
schema = StructType([
StructField("fruit",StringType(),True),
StructField("size",StringType(),True),
StructField("color",StringType(),True)
])
df = spark.createDataFrame([], schema)
json_file_1 = {"fruit": "Apple","size": "Large"}
json_df_1 = spark.read.json(sc.parallelize([json_file_1]))
df = df.unionByName(json_df_1, allowMissingColumns=True)
json_file_2 = {"fruit": "Banana","size": "Small","color": "Yellow"}
df = df.unionByName(json_file_2, allowMissingColumns=True)
display(df)

您可以查看这个从JSON输入生成pyspark模式的工具https://github.com/PreetRanjan/pyspark-schema-generator它有助于生成可以在脚本中使用的PySpark模式,并且可以根据需要添加或删除列。它有几个bug,但对我来说工作得很好。

相关内容

  • 没有找到相关文章