我正在使用spark JDBC保存数据库中的行。数据的保存效果良好。
问题:如果Spark遇到任何错误记录(例如,当表期望非null值时,列为null值(,则它将中止保存
我想要的:我希望Spark忽略坏行,然后继续保存下一行。这是如何实现的?我在文档中看不到太多。使用StructType
不是一个选项。
有指针吗?
我的代码看起来像这样。
class DatabaseWriter {
def writeData(dataFrameTobeWritten: DataFrame, schema: String, targetTableName: String, sparkSession: SparkSession): Unit = {
val dbProperties = getSQLProperties(sparkSession, configurationProp)
dataFrameTobeWritten.write.mode(SaveMode.Overwrite)
.option("driver", dbProperties.driverName)
.option("truncate", "true")
.option("batchsize", configurationProp.WriterBatchSize())
.jdbc(dbProperties.jdbcUrl, configurationProp.sqlServerSchema(schema) + "." + targetTableName, dbProperties.connectionProp)
}
}
在方法中添加一个非null列的列表,并使用它们创建一个过滤条件来过滤坏行
class DatabaseWriter {
def writeData(dataFrameTobeWritten: DataFrame, schema: String, targetTableName: String, sparkSession: SparkSession, notNullColumns : List[String]): Unit = {
val dbProperties = getSQLProperties(sparkSession, configurationProp)
val filterCondition = notNullColumns.map(c -> s"$c IS NOT NULL").mkString(" AND ")
dataFrameTobeWritten.filter(filterCondition).write.mode(SaveMode.Overwrite)
.option("driver", dbProperties.driverName)
.option("truncate", "true")
.option("batchsize", configurationProp.WriterBatchSize())
.jdbc(dbProperties.jdbcUrl, configurationProp.sqlServerSchema(schema) + "." + targetTableName, dbProperties.connectionProp)
}
}