我正在阅读spark CSV。我正在为我读取的文件提供一个模式,我以允许模式读取它。我想将所有记录保存在列NameOfCorruptRecord中(在我的情况下是corrupted_records)。
我经历了地狱般的设置,但仍然收到警告,我无法抑制我错过的东西。
因此,首先为了拥有corrupted_records列,我需要将其作为StringType添加到模式中。这是有案可查的。但是,每当我读取一个文件时,都会收到一个警告,说明架构不匹配,因为列的数量不同。这只是一个警告,但它填满了我的日志。
此外,当有一个字段不可为null,并且有一个损坏的记录时,损坏的记录会转到corrupt_records列,它的所有字段都设置为null,因此我会得到一个异常,因为我有不可为null的字段。解决此问题的唯一方法是将列不可为null设置为可为null。这很奇怪。
我是不是错过了什么?
综述:
- 当我添加架构中的corrupted_records列
- 有办法使用吗PERMISSIVE模式和具有不可为null的字段
谢谢!
以下文档可能会有所帮助。如果您至少提供您编写的代码,那就太好了。https://docs.databricks.com/spark/latest/data-sources/read-csv.html
读取json代码片段的演示
df= self.spark.read.option("mode", "PERMISSIVE").option("primitivesAsString", True).json(self.src_path)
要回答第2点,您应该深入研究第一点。
要点1:您应该对文件进行分析,并将架构与文件中的所有字段映射。将csv文件导入DataFrame后,我会选择您感兴趣的字段,然后继续您正在做的工作。
第2点:您将解决定义模式的问题,如下所示(我将使用scala):
import pyspark.sql.types as types
yourSchema = (types.StructType()
.add('field0', types.IntegerType(), True)
# all your .add(fieldsName, fieldsType, True which let your field be nullable)
.add('corrupted_records', types.StringType(), True) #your corrupted date will be here
)
定义好后,您可以将csv文件导入DataFrame,如下所示:
df = ( spark.read.format("csv")
.schema(yourSchema)
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "corrupted_records")
load(your_csv_files)
)
还有其他方法可以做同样的操作,不同的方式可以处理坏的坏的;看看这篇有见地的文章:https://python.plainenglish.io/how-to-handle-bad-data-in-spark-sql-5e0276d37ca1