Spark SQL-加载带有一些格式错误记录的csv/psv文件



我们正在用Spark加载文件目录的层次结构,并将它们转换为Parquet。数百个管道分隔的文件中有几十GB。有些本身就相当大。

例如,每个第100个文件都有一两行,其中有一个额外的分隔符,使整个过程(或文件)中止。

我们正在使用加载

sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", format("header"))
        .option("delimiter", format("delimeter"))
        .option("quote", format("quote"))
        .option("escape", format("escape"))
        .option("charset", "UTF-8")
        // Column types are unnecessary for our current use cases.
        //.option("inferschema", "true")
        .load(glob)

Spark是否有任何扩展或事件处理机制可以附加到读取行的逻辑中,如果遇到格式错误的行,则跳过该行,而不是使处理失败?

(我们计划做更多的预处理,但这将是最直接和关键的解决方案。)

在您的案例中,失败的可能不是Spark解析部分,而是默认值实际上是PERMISSIVE,因此它将尽最大努力解析为一个格式错误的记录,然后在处理逻辑的下游引发问题。

您应该能够简单地添加选项:

.option("mode", "DROPMALFORMED")

像这样:

sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", format("header"))
        .option("delimiter", format("delimeter"))
        .option("quote", format("quote"))
        .option("escape", format("escape"))
        .option("charset", "UTF-8")
        // Column types are unnecessary for our current use cases.
        //.option("inferschema", "true")
        .option("mode", "DROPMALFORMED")
        .load(glob)

并且它会跳过分隔符数量不正确或与模式不匹配的行,而不是让它们在以后的代码中导致错误。

相关内容

  • 没有找到相关文章

最新更新