Datastax Spark cassandra 连接器与 RetryPolicy 将 DF 写入 cassandra



我正在尝试将一致性级别为"EACH_QUORUM"的 cassandra 写入一个火花数据帧。我的代码如下所示:

val sparkBuilder = SparkSession.builder().
config(cassandraHostPropertyProperty, cassandraHosts).
config(cassandraAuthUsernameProperty, CASSANDRA_AUTH_USER_KEY).
config(cassandraAuthPassProperty, CASSANDRA_AUTH_PASS_KEY).
config(cassandraIsSSLEnabledProperty, isSSLEnabled)...
getOrCreate();

下面是编写 DF 的代码:

df.write.cassandraFormat(tableName, keySpaceName)
.mode(SaveMode.Append)
.options(Map(
WriteConf.ParallelismLevelParam.name -> parallelism_Level.toString,
WriteConf.BatchSizeRowsParam.name -> rowsInBatch.toString
))
.save()

我想添加一个重试策略,以便在其中一个数据中心处于关闭状态时,将一致性降级为 LOCAL_QUORUM。

我知道datastax有一个类MultipleRetryPolicy.scala,我应该扩展它,覆盖方法来添加自定义逻辑并在cassandra conf中使用它的实例。

如何将此策略应用于我的 Sparksession 或保存操作?在 scala 中使用或不使用 RetryPolicy 来实现我的要求还有其他方法吗?

你不想MultipleRetryPolicy,你追求的是降级一致性RetryPolicy,它不是Spark驱动程序的一部分,所以除非你将策略移植到scala,否则作为驱动程序设置的一部分这样做是错误的。

您可以做的是将查询执行包装在 try 中并捕获UnavailableException然后通过更改 output.consistency.level 参数以较低的一致性重试。

最新更新