更新卡桑德拉行火花卡桑德拉



我使用spark 1.2与spark Cassandra连接器1.2.3,我正在尝试更新表的一些行:

的例子:

CREATE TABLE myTable ( 
a text, 
b text, 
c text, 
date timestamp, 
d text, 
e text static, 
f text static, 
PRIMARY KEY ((a, b, c), date, d) 
) WITH CLUSTERING ORDER BY (date ASC, d ASC)
val interactions = sc.cassandraTable[(String, String, String, DateTime, String, String)]("keySpace", "myTable"). 
select("a","b","c","date", "d", "e","f") 
val empty = interactions.filter(r => r._6 == null).cache() 
empty.count()

我只是计算"e"包含null的行数并用"b"的值替换它们

 val update_inter = empty.map( r =>  (r._1,r._2, r._3, r._4, r._5, r._2)) 
 update_inter.saveToCassandra("keySpace", "myTable", SomeColumns("a","b","c","date", "d", "e", "f"))

当我检查CQLSH时,这是有效的,但是当我通过spark Cassandra请求相同的行时,我仍然得到值null。

这是一个错误在火花卡桑德拉连接器?谢谢你的帮助。

当插入/更新发生时,Cassandra将插入或更新的数据的新时间戳版本写入另一个SSTable中,而不是覆盖现有的行。

你的Spark作业不是在更新现有的行,而是在写新的行,或者你的sstable还没有把更改写到磁盘上。如果要将结果写入一个新表,空"e"列的计数将为零。

尝试nodeool flush命令并读取以下内容:Cassandra Compaction

.mode('append')用于追加我猜。我正面临着一个类似的问题,但使用java连接器,但它似乎在python这个选项是可用的

最新更新