我们正在用Spark加载文件目录的层次结构,并将它们转换为Parquet。数百个管道分隔的文件中有几十GB。有些本身就相当大。
例如,每个第100个文件都有一两行,其中有一个额外的分隔符,使整个过程(或文件)中止。
我们正在使用加载
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", format("header"))
.option("delimiter", format("delimeter"))
.option("quote", format("quote"))
.option("escape", format("escape"))
.option("charset", "UTF-8")
// Column types are unnecessary for our current use cases.
//.option("inferschema", "true")
.load(glob)
Spark是否有任何扩展或事件处理机制可以附加到读取行的逻辑中,如果遇到格式错误的行,则跳过该行,而不是使处理失败?
(我们计划做更多的预处理,但这将是最直接和关键的解决方案。)
在您的案例中,失败的可能不是Spark解析部分,而是默认值实际上是PERMISSIVE
,因此它将尽最大努力解析为一个格式错误的记录,然后在处理逻辑的下游引发问题。
您应该能够简单地添加选项:
.option("mode", "DROPMALFORMED")
像这样:
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", format("header"))
.option("delimiter", format("delimeter"))
.option("quote", format("quote"))
.option("escape", format("escape"))
.option("charset", "UTF-8")
// Column types are unnecessary for our current use cases.
//.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load(glob)
并且它会跳过分隔符数量不正确或与模式不匹配的行,而不是让它们在以后的代码中导致错误。