如何在Cassandra中实现一次流处理



我有一个Cassandra表,它看起来像这个

CREATE TABLE tmp.inventory (
t_id text,
is_available boolean,
modified_at bigint,
price double,
available_units bigint,
PRIMARY KEY(t_id, modified_at)
) WITH CLUSTERING ORDER BY (modified_at);

我有一个流媒体管道,可以更新Cassandra中的项目。流式传输管道每隔一段时间进行检查点检查。因此,当管道失败时,它将重新处理自上次成功检查点以来的源数据。当它在失败后重新处理时,它将尝试覆盖Cassandra中已经成功写入的数据(即在上次成功检查点之后但在失败之前(。我想利用modified_at专栏来实现这一点。类似的东西

UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? AND modified_at < ?

这是我试图只在Cassandra中的modified_at小于管道中的modified _at的情况下进行更新。然而,这会抛出InvalidQueryException: Slice restrictions are not supported on the clustering columns in UPDATE statements

我认为IF条件在这种情况下会有所帮助。

UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? IF modified_at < ?

但这会抛出InvalidQueryException: PRIMARY KEY column 'modified_at' cannot have IF conditions

那么,处理这个问题的理想方法是什么呢?

编辑如果我在这个表中只有这些字段,那么重新处理事件可能没有那么大的问题,因为当管道赶上实时流时,它最终会变得一致,但说有另一个流作业用当前价格、可用单位等更新同一个表。在这种情况下,如果其中一个作业失败并重新启动,表可能处于一致状态。

为了避免一个线程可以在另一个线程已经插入新数据后写入旧数据的情况,您可以在执行INSERT或UPDATE时使用USING TIMESTAMP(在Cassandra中,任何东西都是UPSERT,因此从语法角度来看,使用INSERT可能更容易,imho(。其想法是明确指定记录的时间戳,因此当另一个线程比前一个线程晚插入旧数据时,数据将被插入,但它们不会获胜,因为Cassandra使用时间戳(明确指定(来检测最新版本。类似这样的东西:

INSERT INTO tmp.inventory (t_id, is_available, modified_at)
VALUES (?, ?,?)
USING TIMESTAMP <modified_at*1000>

唯一需要记住的是,在USING TIMESTAMP中指定的值使用微秒而不是毫秒,并且您需要计算<modified_at*1000>的值-您不能在那里使用表达式(这里只是示例(。

相关内容

  • 没有找到相关文章

最新更新