如何使用 Datastax 连接器从 Spark 数据帧更新特定的 Cassandra 列集



我有一个包含几列的Cassandra表,我想从Spark 2.4.0更新其中一列(以及多列的内容?但是,如果我不提供所有列,则记录不会更新。

卡桑德拉模式:

rowkey,message,number,timestamp,name
1,hello,12345,12233454,ABC

关键是Spark DataFrame由具有更新时间戳的rowkey组成,必须在Cassandra表中更新。

我尝试在选项之后立即选择列,但似乎没有这样的方法。

finalDF.select("rowkey","current_ts")
  .withColumnRenamed("current_ts","timestamp")
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "table_data", "keyspace" -> "ks_data"))
  .mode("overwrite")
  .option("confirm.truncate","true")
  .save()

finalDF=
rowkey,current_ts
1,12233999

那么 Cassandra 表应该保存这样的值,比如更新后,

rowkey,message,number,timestamp,name
1,hello,12345,12233999,ABC

我正在使用数据帧 API。所以不能使用rdd方法。我该怎么做?Cassandra 版本 3.11.3,Datastax 连接器 2.4.0-2.11

说明是SaveMode用于指定将数据帧保存到数据源的预期行为。不仅对于 C*,而且对于任何数据源)。可用选项包括

  1. SaveMode.ErrorIfExists
  2. 保存模式.追加
  3. 保存模式.覆盖
  4. 保存模式.忽略

在这种情况下,由于您已经有数据并且想要追加,因此必须使用SaveMode.Append

import org.apache.spark.sql.SaveMode
finalDF.select("rowkey","current_ts")
  .withColumnRenamed("current_ts","timestamp")
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "table_data", "keyspace" -> "ks_data"))
  .mode(SaveMode.Append)
  .option("confirm.truncate","true")
  .save()

在保存模式上查看 Spark 文档

相关内容

  • 没有找到相关文章

最新更新