我希望Spark在保存到数据库时忽略坏记录



我正在使用spark JDBC保存数据库中的行。数据的保存效果良好。

问题:如果Spark遇到任何错误记录(例如,当表期望非null值时,列为null值(,则它将中止保存

我想要的:我希望Spark忽略坏行,然后继续保存下一行。这是如何实现的?我在文档中看不到太多。使用StructType不是一个选项。

有指针吗?

我的代码看起来像这样。

class DatabaseWriter {
def writeData(dataFrameTobeWritten: DataFrame, schema: String, targetTableName: String, sparkSession: SparkSession): Unit = {
val dbProperties = getSQLProperties(sparkSession, configurationProp)
dataFrameTobeWritten.write.mode(SaveMode.Overwrite)
.option("driver", dbProperties.driverName)
.option("truncate", "true")
.option("batchsize", configurationProp.WriterBatchSize())
.jdbc(dbProperties.jdbcUrl, configurationProp.sqlServerSchema(schema) + "." + targetTableName, dbProperties.connectionProp)
}
}

在方法中添加一个非null列的列表,并使用它们创建一个过滤条件来过滤坏行

class DatabaseWriter {
def writeData(dataFrameTobeWritten: DataFrame, schema: String, targetTableName: String, sparkSession: SparkSession, notNullColumns : List[String]): Unit = {
val dbProperties = getSQLProperties(sparkSession, configurationProp)
val filterCondition = notNullColumns.map(c -> s"$c IS NOT NULL").mkString(" AND ")
dataFrameTobeWritten.filter(filterCondition).write.mode(SaveMode.Overwrite)
.option("driver", dbProperties.driverName)
.option("truncate", "true")
.option("batchsize", configurationProp.WriterBatchSize())
.jdbc(dbProperties.jdbcUrl, configurationProp.sqlServerSchema(schema) + "." + targetTableName, dbProperties.connectionProp)
}
}

相关内容

  • 没有找到相关文章

最新更新