如何添加到卡桑德拉中的现有值?



我正在编写一个流应用程序,它要求我在 cassandra 表中的预先存在的值中添加或减去传入值。我已经看到了 cql 的批处理文档,但还没有找到解决我的问题的方法。下面是一个用于更多 insite 的小示例:

现有前表

table:{
word:{'hello',
neg:{-0.5},
neu:{0.3},
pos:{0.2},
comp:{0.7}
}
}

传入值:

word:{'hello',
neu:{0.4}
}

在这里我需要添加 0.4 和 0.3 并重新插入表中。

您可以使用leftJoinWithCassandraTable将输入数据与 Cassandra 中的数据联接(在行的主键上(,然后使用获取的数据执行输入数据的加法,并将数据写回 Cassandra。

像这样的东西(从我自己的代码中采用(:

// this case class is matching to Cassandra table & input data...
case class Data(....)
val data = ...data_that_you_received_and_casted_to_Data_case_class...
val joined = data.leftJoinWithCassandraTable[Data]("ks", "table")
// perform update of existing values, and prepare new data
val summed = joined.map({ case (n: Data, c: Option[Data]) =>
c match {
// if there is no data in Cassandra, just return input data
case None => Data(n)
// there is data in Cassandra, do the sum
case Some(s) =>
Data(..., n.neu + s.neu)}
})
// and write updated/new values
summed.saveToCassandra("ks", "table")

附言我建议使用Spark Cassandra Connector 2.5.0直接支持的Spark Structured Streaming,以及所谓的数据帧直接连接。使用数据帧时,您的代码会更简单,并且可能更优化。

相关内容

  • 没有找到相关文章

最新更新