SPARK SQL DataFrame-异常处理



在我们的应用程序中,我们的大多数代码只是在DataFrame上应用filtergroup byaggregate操作,然后将DF保存到Cassandra数据库中。

喜欢下面的代码,我们有几种方法在不同数量的字段上执行相同类型的操作[filter, group by, join, agg]并返回DF,并将其保存到Cassandra表中。

示例代码是:

 val filteredDF = df.filter(col("hour") <= LocalDataTime.now().getHour())
.groupBy("country")
.agg(sum(col("volume")) as "pmtVolume")
saveToCassandra(df)
def saveToCassandra(df: DataFrame) {
    try {
        df.write.format("org.apache.spark.sql.cassandra")
        .options(Map("Table" -> "tableName", "keyspace" -> keyspace)
        .mode("append").save()
    }
    catch {
        case e: Throwable => log.error(e)
    }
}

由于我通过将DF保存到Cassandra来调用该动作,因此我希望我只能按照此线程在该行上处理例外。

如果我得到任何例外,默认情况下可以在火花详细日志中看到例外。

我必须真正包围过滤器,与Trytry , catch?

组成

我在SPARK SQL数据帧API示例上没有看到任何例外处理的示例。

如何在saveToCassandra方法上使用Try?它返回Unit

毫无意义地包装懒惰dry catch中的懒惰。
您需要将lambda函数包裹在try()中。
不幸的是,AFAIK无法在数据范围内进行行级异常处理。

您可以使用下面的本文回答中提到的RDD或数据集Spache Spark异常处理

您实际上不需要包围filtergroup by代码与Trytrycatch。由于所有这些操作都是转换,因此在您的情况下像saveToCassandra一样执行 action ,它们才能执行。

但是,如果发生错误时过滤分组 congregating dataFrame,saveToCassandra函数中的捕获子句将将其登录为Action IT AS Action正在那里进行。

相关内容

  • 没有找到相关文章

最新更新