在我们的应用程序中,我们的大多数代码只是在DataFrame
上应用filter
,group by
和aggregate
操作,然后将DF保存到Cassandra数据库中。
喜欢下面的代码,我们有几种方法在不同数量的字段上执行相同类型的操作[filter, group by, join, agg
]并返回DF,并将其保存到Cassandra表中。
示例代码是:
val filteredDF = df.filter(col("hour") <= LocalDataTime.now().getHour())
.groupBy("country")
.agg(sum(col("volume")) as "pmtVolume")
saveToCassandra(df)
def saveToCassandra(df: DataFrame) {
try {
df.write.format("org.apache.spark.sql.cassandra")
.options(Map("Table" -> "tableName", "keyspace" -> keyspace)
.mode("append").save()
}
catch {
case e: Throwable => log.error(e)
}
}
由于我通过将DF保存到Cassandra来调用该动作,因此我希望我只能按照此线程在该行上处理例外。
如果我得到任何例外,默认情况下可以在火花详细日志中看到例外。
我必须真正包围过滤器,与Try
或try , catch?
我在SPARK SQL数据帧API示例上没有看到任何例外处理的示例。
如何在saveToCassandra
方法上使用Try
?它返回Unit
毫无意义地包装懒惰dry catch中的懒惰。
您需要将lambda函数包裹在try()中。
不幸的是,AFAIK无法在数据范围内进行行级异常处理。
您可以使用下面的本文回答中提到的RDD或数据集Spache Spark异常处理
您实际上不需要包围filter
,group by
代码与Try
或try
,catch
。由于所有这些操作都是转换,因此在您的情况下像saveToCassandra
一样执行 action ,它们才能执行。
但是,如果发生错误时过滤,分组或 congregating dataFrame,saveToCassandra
函数中的捕获子句将将其登录为Action IT AS Action正在那里进行。