java.lang.UnsupportedOperationException: '不允许写入非空的 Cassandra Table



我有一个场景,我将接收由我的 Spark 流程序处理的流数据,并且每个间隔的输出被附加到我现有的 Cassandra 表中。

目前,我的 Spark 流程序将生成一个数据帧,我需要将其保存在我的 cassandra 表中。我目前面临的问题是,当我使用以下命令时,我无法将数据/行附加到我现有的 cassandra 表中

dff.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "xxx", "yyy" -> "retail")).save()

我在以下链接 http://rustyrazorblade.com/2015/08/migrating-from-mysql-to-cassandra-using-spark/中读到他将 mode="append" 传递到保存方法中,但它的抛出语法错误

我也无法理解我需要从以下链接修复的地方https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/rlGGWQF2wnM

需要有关如何解决此问题的帮助。我正在用 scala 编写我的火花流作业

我认为您必须通过以下方式进行操作:

dff.write.format("org.apache.spark.sql.cassandra").mode(SaveMode.Append).options(Map("table" -> "xxx", "yyy" -> "retail")).save()

Cassandra 处理数据的方式迫使您执行所谓的"更新插入" - 您必须记住,插入可能会覆盖某些行,其中已存储记录的主键与插入的 reccord 的主键相同。Cassandra是一个"快速写入"数据库,因此在写入之前不会检查数据是否存在。

相关内容

  • 没有找到相关文章

最新更新