Spark在创建数据集时无法反序列化记录



我正在从S3中读取大量CSV(所有内容都在密钥前缀下(,并创建一个强类型Dataset

val events: DataFrame = cdcFs.getStream()
events
.withColumn("event", lit("I"))
.withColumn("source", lit(sourceName))
.as[TradeRecord]

其中CCD_ 2是通常可以由SparkSession隐含器反序列化为的case类。但是,对于某个批处理,记录无法反序列化。以下是错误(省略堆栈跟踪(

Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "deal")
- root class: "com.company.trades.TradeRecord"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

dealTradeRecord的一个字段,在源数据(S3对象(中永远不应该为null,因此它不是Option

不幸的是,错误消息没有给我任何关于CSV数据的线索,甚至没有告诉我它来自哪个CSV文件。该批由数百个文件组成,因此我需要一种方法将其缩小到最多几个文件,以调查该问题。

根据用户10465355的建议,您可以加载数据:

val events: DataFrame = ???

过滤器

val mismatched = events.where($"deal".isNull)

添加文件名

import org.apache.spark.sql.functions.input_file_name
val tagged = mismatched.withColumn("_file_name", input_file_name)

可选添加区块和区块以及偏移量:

import org.apache.spark.sql.functions.{spark_partition_id, monotonically_increasing_id, shiftLeft, shiftRight
df
.withColumn("chunk", spark_partition_id())
.withColumn(
"offset",
monotonically_increasing_id - shiftLeft(shiftRight(monotonically_increasing_id, 33), 33))

以下是我提出的解决方案(我使用的是Spark Structured Streaming(:

val stream = spark.readStream
.format("csv")
.schema(schema) // a StructType defined elsewhere
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "corruptRecord")
.load(path)
// If debugging, check for any corrupted CSVs
if (log.isDebugEnabled) { // org.apache.spark.internal.Logging trait 
import spark.implicits._
stream
.filter($"corruptRecord".isNotNull)
.withColumn("input_file", input_file_name)
.select($"input_file", $"corruptRecord")
.writeStream
.format("console")
.option("truncate", false)
.start()
}
val events = stream
.withColumn("event", lit("I"))
.withColumn("source", lit(sourceName))
.as[TradeRecord]

基本上,如果Spark日志级别设置为Debug或更低,则会检查DataFrame是否有损坏的记录,并将任何此类记录及其文件名一起打印出来。最终,程序试图将此DataFrame强制转换为强类型的Dataset[TradeRecord],但失败了。

最新更新