我需要一些帮助,以将一系列的JSON文件从S3存储桶中获取到Pyspark DataFrame。
此存储桶中的文件都有.json
扩展名,但不幸的是,每行都没有一个JSON对象的通常SPARK要求,相反,它们都在Square Brackets中的一行。
所以:
{"first_column": 12, "second_column": {"nested_column": "value"}}
{"first_column": 24, "second_column": {"nested_column": "value2"}}
我有:
[{"first_column": 12, "second_column": {"nested_column": "value"}},{"first_column": 24, "second_column": {"nested_column": "value2"}}]
实际上,我们以这种格式接收文件,并且有很多文件,以至于不幸的是,进行任何手动调整是不可行的。
我到目前为止尝试的方法如下:
我尝试使用spark.read.json
方法,使用以下语法与通配符*
同时加载多个文件。在这种情况下,spark
是sqlContext
df = spark.read.json("s3://path_to_bucket_*.json")
这在不引起任何错误或警告的情况下运行,并返回所需的模式:
df.printSchema()
root
|-- first_column: long (nullable = true)
|-- second_column: struct (nullable = true)
| |-- nested_column: string (nullable = true)
但是,当我尝试查看数据时,我会得到以下内容:
+------------+-------------+
|first_column|second_column|
+------------+-------------+
| null| null|
+------------+-------------+
我找到了一种实际上从Databricks加载数据的方法,该数据使用Spark上下文sc
在配对的RDD中读取:
dfRDD = sc.wholeTextFiles("s3://path_to_bucket_*.json")
这将返回配对与文件名和文件主体。但是,并不是真的让我感到困惑的是,当我使用此RDD的身体信息调用以下行时,它可以正常工作,并且根本没有空值:
df = spark.read.json(dfRDD.map(lambda x: x[1]))
因此,我对为什么会发生这种情况感到非常困惑,因为我认为这与RDD中文本的正文相同的信息不包含任何线路断裂,而是在方括号内包含JSON对象(如我上面显示的第二个示例)。
虽然这是一种解决方法,但不幸的是缺乏。首先,使用RDD方法要慢得多,更重要的是,我需要从中获取此信息的文件名称。我知道,从文件直接加载pyspark.sql.functions
模块的input_file_name
函数是可能的,但是在使用RDD方法时,这是不起作用的。我设法编写了一个纯净的python函数,该函数将每个配对的第一个元素从每个配对的第一个元素中获取fileName信息到JSON字符串,但这很慢。
如果有人可以帮助我解决这个问题,我将非常感激。我很感激我可能必须使用RDD方法,但是我很困惑spark.read.json
在一种情况下而不是另一种情况。
尽管我不确定是什么原因导致一个解决方案工作而另一种解决方案,但我只能通过仅使用sql.read.json在某种程度上解决问题。
设置参数允许COMMENTS,ALLICUNQUOTEDFIELDNAME,允许Quotes,AllumericLeadingZero,lassebackslashescapinganycharacter in read.json as true。这样,我能够删除空值,而90%的数据在没有任何空值的数据框中成功转换了。
在此处查看其他参数