KSQL 分组依据可删除以前的值并仅使用 LAST

  • 本文关键字:LAST 删除 KSQL ksqldb
  • 更新时间 :
  • 英文 :


>我有一个Kafka主题"事件",它记录用户图像投票,并具有以下结构的json:

{"category":"image","action":"vote","label":"amsterdam","ip":"1.1.1.1","value":2}

我需要在另一个主题上收到标签(例如阿姆斯特丹(的所有投票的总和,但仅使用最后一次投票删除来自同一 IP 地址的任何投票。本主题应具有以下格式的 json:

{label:”amsterdam”,SCORE:8,TOTAL:3}

SCORE 是所有选票的总和,总票数是计票数。

我所做的解决方案从主题事件创建一个流:

CREATE STREAM st_events
(CATEGORY STRING, ACTION STRING, LABEL STRING, VALUE BIGINT, IP STRING)
WITH (KAFKA_TOPIC='events', VALUE_FORMAT='JSON');

然后,我创建一个表tb_votes,用于计算每个标签和 IP 地址的分数和总数:

CREATE TABLE tb_votes WITH (KAFKA_TOPIC='tb_votes', PARTITIONS=1, REPLICAS=1) AS SELECT
st_events.LABEL "label", SUM(st_events.VALUE-1) "score", CAST(COUNT(*) AS BIGINT) "total"
FROM st_events
WHERE
st_events.category='image' AND st_events.action='vote'
GROUP BY st_events.label, st_events.ip
EMIT CHANGES;

问题在于,Kafka 不会删除来自同一 IP 地址的同一图像的所有先前投票,而是使用所有这些投票。这是有道理的,因为它是一个分组依据。

知道如何"删除"所有以前的投票并仅对图像/IP使用最新值吗?

您需要一个两阶段聚合。

第一阶段应该构建一个表,其中包含一个包含iplabel的主键,以及另一个保存value的列。

从第一个表生成第二个表,以获取所需的每个标签countsum

如果同一ip对同一label进行另一次投票,则第一个表将使用新value进行更新,第二个表将正确更新。它将首先从countsum中删除旧value,然后应用新value

ksqlDB 尚不支持多个主键列(尽管它很快就会到来! 因此,当您按两列分组时,它只会执行时髦的字符串连接。但我们现在可以解决这个问题。

CREATE TABLE BY_IP_AND_LABEL AS
SELECT
label + '-' + ip AS ipAndLabel,
value
FROM st_events
GROUP BY ip + '@' + label;

CREATE TABLE BY_LABEL AS
SELECT
SUBSTRING(labelAndIp, INSTR(labelAndIp, '@')) AS label,
SUM(VALUE-1) AS score,
COUNT(*) AS total
FROM BY_IP_AND_LABEL
GROUP BY SUBSTRING(ipAndLabel, INSTR(ipAndLabel, '@'));

第一个表创建一个组合键,其中@和 作为分隔符。第二个表使用INSTRSUBSTRING来查找分隔符并提取label

注意:我还没有测试过这个 - 逻辑中可能会有一些"逐个关闭"的错误。

这应该可以满足您的需求。

最新更新