KSQL DB collect_list和collect_set不支持多列、结构和映射



我目前正在ConfluentCloud上为我的公司构建POC。当前版本的KSQLDB还不支持多列、结构体或映射上的collect_list/collect_set;因此,我正在想一个变通办法。

我正在消费SQL CDC流,并试图组成一个嵌套的对象模型与父子关系,而不需要构建一个自托管的JAVA UDF或KSTREAM应用程序。

流demo_games, demo_players和demo_teams最终应该产生以下模型写入kafka主题。

{
teamId: bigint,
teamName: string,
teamPlayers: [
{
playerid: bigint,
playername:string
}
],
teamGames: [
{
gameid: bigint,
gamename: string
}
]
}

让我们从一小段代码开始,来说明我想要实现的目标。

CREATE STREAM DEMO_GAMES( GAMEID BIGINT KEY, TEAMID BIGINT, GAMENAME STRING ) 
WITH (KAFKA_TOPIC='DEMO.GAMES',VALUE_FORMAT='JSON', PARTITIONS=1);
INSERT INTO DEMO_GAMES( GAMEID, TEAMID, GAMENAME) VALUES (1,1,'SUNDAY');
INSERT INTO DEMO_GAMES( GAMEID, TEAMID, GAMENAME) VALUES (2,1,'MONDAY');
INSERT INTO DEMO_GAMES( GAMEID, TEAMID, GAMENAME) VALUES (3,1,'FRIDAY');
CREATE STREAM DEMO_PLAYERS( PLAYERID BIGINT KEY, TEAMID BIGINT, PLAYERNAME STRING )
WITH (KAFKA_TOPIC='DEMO.PLAYERS',VALUE_FORMAT='JSON', PARTITIONS=1);
INSERT INTO DEMO_PLAYERS( PLAYERID, TEAMID, PLAYERNAME) VALUES (1,1,'PLAYER 1');
INSERT INTO DEMO_PLAYERS( PLAYERID, TEAMID, PLAYERNAME) VALUES (2,1,'PLAYER 2');
INSERT INTO DEMO_PLAYERS( PLAYERID, TEAMID, PLAYERNAME) VALUES (3,1,'PLAYER 3');
INSERT INTO DEMO_PLAYERS( PLAYERID, TEAMID, PLAYERNAME) VALUES (4,1,'PLAYER 4');
CREATE STREAM DEMO_TEAMS( TEAMID BIGINT KEY,TEAMNAME STRING )
WITH (KAFKA_TOPIC='DEMO.TEAMS',VALUE_FORMAT='JSON',  PARTITIONS=1);
INSERT INTO DEMO_TEAMS(  TEAMID, TEAMNAME) VALUES (1,'THE TEAM');
#create a few persistent queries...
create stream demo_team_players as
select
teamid,
playerid,
struct(playerid:=playerid, 
playername:=playername ) `model` 
from  DEMO_PLAYERS emit changes;

create stream demo_team_games as
select
teamid,
gameid,
struct(gameid:=gameid, 
gamename:=gamename) `model` 
from  DEMO_games emit changes;

上面的两个持久查询将我想要包含在collect_list中的数据包装到一个结构体中。现在我可以执行下面的查询了。

select teamid, transform(collect_list( cast(`model` as string)), t=>t) as teamplayers from  DEMO_TEAM_GAMES  group by teamid emit changes;
#yields
{
"TEAMID": 1,
"TEAMPLAYERS": [
"Struct{GAMEID=1,GAMENAME=SUNDAY}",
"Struct{GAMEID=2,GAMENAME=MONDAY}",
"Struct{GAMEID=3,GAMENAME=FRIDAY}"
]
}

我的问题是这样的。有没有办法取一个"序列化的"STRUCT字符串并将其转换回STRUCT内的转换lambda?我还尝试构建一个动态JSON字符串作为collection_list函数的参数,然后使用EXTRACT_JSON_FIELD构建一个结构。这看起来非常脆弱,为了获得JSON字符串,我被迫将所有值强制转换为string。

我也在为我目前的公司做一些POC,我得到了与你在这里提到的相同的问题。我认为这是ksql数据库的一个关键特性,在ksql存储库中有这个特性的pull请求,并且已经合并了。您可以等待新版本发布或自己构建映像。

https://github.com/confluentinc/ksql/pull/8877

我最终意识到KSQL中的分组与T-SQL中的分组是不一样的。如果你设法对这些关系进行分组,你最终会得到所有相关的数据,而不仅仅是最近的数据,因为你只能对一个流进行分组。但我还是做出了这颗宝石。强制AS_MAP序列化然后替换一些分隔符生成有效JSON然后传递给SPLIT_TO_MAP

set 'auto.offset.reset' = 'earliest';
create table EVENTTABLE
WITH( KAFKA_TOPIC='EventsGroupedByTeamTable', VALUE_FORMAT='AVRO', PARTITIONS=1) 
as 
select  HOMETEAMID, 
ARRAY_LENGTH (COLLECT_SET(HOMETEAMID) ) AS COUNT,
TRANSFORM(COLLECT_SET( CAST( AS_MAP(
ARRAY[
'AWAYTEAMNAME',
'HOMETEAMNAME'

],
ARRAY[
CAST (AWAYTEAMNAME AS STRING),
CAST (HOMETEAMNAME AS STRING)

]

) AS STRING ) ), P=>SPLIT_TO_MAP(REPLACE(REPLACE(P,'{',''),'}',''),',','=') ) AS TEAMSCHEDULES
from  EVENTSTREAM
GROUP BY HOMETEAMID
emit changes;

相关内容

  • 没有找到相关文章