Atomic使用Spark Streaming写入Cassandra



我对Cassandra(2.1.11)和Spark(1.4.1)都很陌生,我很想知道是否有人见过/开发过使用Spark Streaming对两个不同Cassandra表进行原子写入的解决方案。

我目前有两个表,它们包含相同的数据集,但具有不同的分区键。为了简单起见,我将使用熟悉的User表示例来解释:

CREATE TABLE schema1.user_by_user_id
(
    user_id uuid
    ,email_address text
    ,num int //a value that is frequently updated
    ,PRIMARY KEY (user_id)
);
CREATE TABLE schema1.user_by_email_address
(
    email_address text
    ,user_id uuid
    ,num int //a value that is frequently updated
    ,PRIMARY KEY (email_address)
);

email_address列将具有高基数(实际上它将在user_id值数量的50%和100%之间)。基数高会导致二级索引性能较差,因此需要第二个表。

我使用Spark Streaming来处理num列中的更改并更新这两个表。据我所知,saveToCassandra()方法在UNLOGGED BATCH中为RDD中的每个项目执行写入,从而执行原子写入(如此处的"保存对象集合"部分所述)。但是,saveToCassandra()只能用于保存到单个表中。为了使schema1.user_by_user_idschema1.user_by_email_address表保持同步,我必须发出两个单独的saveToCassandra()调用:

rdd.saveToCassandra("schema1","user_by_user_id",SomeColumns("user_id","email"address","num"))
rdd.saveToCassandra("schema1","user_by_email_address",SomeColumns("user_id","email"address","num"))

每个调用中的写入都是以原子的方式完成的,但这两个调用在一起不是原子的。第二次调用中的某些错误将使两个表不同步。

显然,我的数据集和实际的表结构比这更复杂,但我试图用尽可能简单的方式传达问题的要点。虽然我的问题是为了能够保存到两个表,但我欢迎任何关于数据模型更改的替代建议,这些建议将完全消除这种需求。

首先要理解的是:UNLOGGED批次是而不是原子。请参阅文档。UNLOGGED批处理提供的唯一功能是使用相同的时间戳进行多次写入。

因此,如果您想对saveToCassandra进行多个调用,并让它们表现得像一个调用一样,只需为两个调用指定WRITETIME即可。完成所有操作后,所有修改后的数据都将具有相同的时间戳。

至于你关于如何对多个表进行原子更新的问题。。。你不能。Cassandra不支持。

我能想到的最好的建议是创建自己的批处理日志,您可以在崩溃后查阅该日志,以确定需要重新同步的内容。

想象一下这样的事情:

CREATE TABLE batch_log
(
    id uuid,
    updated_users set<uuid>,
    PRIMARY KEY(id)
)

启动作业时,生成一个新的uuid,该uuid将是此作业的id。然后,您将发布3个保存:

rdd.saveToCassandra("schema1", "batch_log", SomeColumns("batch_id", "user_id" append)
rdd.saveToCassandra("schema1","user_by_user_id",SomeColumns("user_id","email"address","num"))
rdd.saveToCassandra("schema1","user_by_email_address",SomeColumns("user_id","email"address","num"))

如果您的批处理在没有任何崩溃的情况下完成,您只需删除创建的batch_log行即可。然而,如果系统在中途崩溃,那么一旦恢复在线,您可以查阅batch_log以获得更新的用户列表。请查询这些用户的电子邮件地址,然后更新user_by_email_address表。完成此修复后,您可以删除batch_log

实际上,您正在"手动"执行CassandraLOGGED批次。

相关内容

  • 没有找到相关文章

最新更新