>我有一个Kafka主题"事件",它记录用户图像投票,并具有以下结构的json:
{"category":"image","action":"vote","label":"amsterdam","ip":"1.1.1.1","value":2}
我需要在另一个主题上收到标签(例如阿姆斯特丹(的所有投票的总和,但仅使用最后一次投票删除来自同一 IP 地址的任何投票。本主题应具有以下格式的 json:
{label:”amsterdam”,SCORE:8,TOTAL:3}
SCORE 是所有选票的总和,总票数是计票数。
我所做的解决方案从主题事件创建一个流:
CREATE STREAM st_events
(CATEGORY STRING, ACTION STRING, LABEL STRING, VALUE BIGINT, IP STRING)
WITH (KAFKA_TOPIC='events', VALUE_FORMAT='JSON');
然后,我创建一个表tb_votes,用于计算每个标签和 IP 地址的分数和总数:
CREATE TABLE tb_votes WITH (KAFKA_TOPIC='tb_votes', PARTITIONS=1, REPLICAS=1) AS SELECT
st_events.LABEL "label", SUM(st_events.VALUE-1) "score", CAST(COUNT(*) AS BIGINT) "total"
FROM st_events
WHERE
st_events.category='image' AND st_events.action='vote'
GROUP BY st_events.label, st_events.ip
EMIT CHANGES;
问题在于,Kafka 不会删除来自同一 IP 地址的同一图像的所有先前投票,而是使用所有这些投票。这是有道理的,因为它是一个分组依据。
知道如何"删除"所有以前的投票并仅对图像/IP使用最新值吗?
您需要一个两阶段聚合。
第一阶段应该构建一个表,其中包含一个包含ip
和label
的主键,以及另一个保存value
的列。
从第一个表生成第二个表,以获取所需的每个标签count
和sum
。
如果同一ip
对同一label
进行另一次投票,则第一个表将使用新value
进行更新,第二个表将正确更新。它将首先从count
和sum
中删除旧value
,然后应用新value
。
ksqlDB 尚不支持多个主键列(尽管它很快就会到来! 因此,当您按两列分组时,它只会执行时髦的字符串连接。但我们现在可以解决这个问题。
CREATE TABLE BY_IP_AND_LABEL AS
SELECT
label + '-' + ip AS ipAndLabel,
value
FROM st_events
GROUP BY ip + '@' + label;
CREATE TABLE BY_LABEL AS
SELECT
SUBSTRING(labelAndIp, INSTR(labelAndIp, '@')) AS label,
SUM(VALUE-1) AS score,
COUNT(*) AS total
FROM BY_IP_AND_LABEL
GROUP BY SUBSTRING(ipAndLabel, INSTR(ipAndLabel, '@'));
第一个表创建一个组合键,其中@
和 作为分隔符。第二个表使用INSTR
和SUBSTRING
来查找分隔符并提取label
。
注意:我还没有测试过这个 - 逻辑中可能会有一些"逐个关闭"的错误。
这应该可以满足您的需求。