如何在 WSO2bam 中接收事件后存档或删除 Cassandra 数据



我在 2.3.0 版本中使用了WSO2BAM,其中我定义了一个在 Cassandra 数据源中保存大量数据的流。目前,我的 Hive 脚本处理来自密钥空间的所有事件,其中 99% 的数据是不必要的。而且它也需要磁盘空间。

我的想法是在不需要这些数据后清除这些数据。

流的格式为: {"streamId":"kroki_i_kolejki_zlecen:1.0.0","name":"kroki_i_kolejki_zlecen","version":"1.0.0","nickName":"Kroki i kolejki zlecen","description":"Wyniki i daty zamkniecia zlecen","payloadData":[{"name":"casenum","type":"STRING"},{"name":"type_id","type":"STRING"},{"name":"id_zlecenie","type":"STRING"},{"name":"sid","type":"STRING"},{"name":"step_name","type":"STRING"},{"name":"proc_name","type":"STRING"},{"name":"step_desc","type":"

STRING"},{"name":"audit_date","type":"STRING"},{"name":"audit_usecs","type":"STRING"},{"name":"user_name","type":"STRING"}]}

我的目的是在收到具有特定payload_type_id的事件后删除具有相同列payload_id_zlecenie的数据。在关系数据库中,它等于查询:

delete from kroki_i_kolejki_zlecen where payload_id_zlecenie = [argument];

可以做到吗?

根据我的知识,在Hive中您无法删除Cassandra数据。Inosh 给出的 [1] 链接描述了如何归档超过特定持续时间的 Cassandra 记录。(例如超过 3 个月的记录)所有存档的数据将存储在带有后缀"_arch"的列系列中。在该功能中,在生成的 Hive 脚本中使用自定义分析器来删除 Cassandra 行。另请注意,删除的记录大约需要 10 天才能使用其行键完全删除整行。在此之前,您将看到一些与 Cassandra 行 ID 关联的空字段。

Inosh的[2]是你问题的真正解决方案。启用增量处理后,hive 脚本将仅处理在上一个 hive 脚本执行中未处理的 Cassandra 行。这意味着,Hive 将聚合每次执行中处理的值,并保留它们以备将来使用。下次 hive 将使用该值,以及之前处理的最后一个时间戳,并处理该时间戳之后的所有记录。新的聚合值和旧的聚合值将用于获取整体值。

[1] - http://docs.wso2.org/display/BAM240/Archive+Cassandra+Data

[2] - http://docs.wso2.org/pages/viewpage.action?pageId=32345660

您可以使用 Cassandra 数据存档功能 [1] 来存档 Cassandra 数据。

另请参阅增量分析 [2],这是随 BAM 2.4.0 一起发布的新功能。使用该功能,可以增量分析接收的数据,而无需处理 CF 中的所有事件。

[1] - http://docs.wso2.org/display/BAM240/Archive+Cassandra+Data

[2] - http://docs.wso2.org/pages/viewpage.action?pageId=32345660