Kafka ksql 简单连接不起作用



我已经重新键入了我使用的流和表中的数据 Confluent 4.1

1) 创建流

CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');

2)创建重新键入的流,因为此脚本不起作用,但在此之前它起作用,为什么?

CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by root;

然后我创建下一个脚本

CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update  partition by root;

session_details_stream_rekeyed的结果还可以:

ksql> select * from session_details_stream_rekeyed;
1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001

3)为主题创建流;

CREATE STREAM voip_details_stream (SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'voipDetails', value_format = 'json');
CREATE STREAM voip_details_stream_update as select SessionIdTime ,SessionIdSeq, CONCAT(SESSIONIDTIME,SESSIONIDSEQ) as root from voip_details_stream  partition by SessionIdTime;
CREATE STREAM voip_details_stream_rekeyed6 as select SessionIdTime ,SessionIdSeq,root from voip_details_stream_update  partition by root;

ksql> select * from voip_details_stream_rekeyed6;
1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

4) 创建表格

CREATE TABLE voipDetails_table_test(SessionIdTime varchar,SessionIdSeq long,root varchar) WITH (kafka_topic='VOIP_DETAILS_STREAM_REKEYED6', value_format='JSON', KEY='root');
ksql> select * from voip_details_table;
1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

5)然后我创建一个左连接

select  c.root,u.root from session_details_stream_rekeyed c LEFT JOIN voipDetails_table_test u On c.root  = u.root;
1526411477780 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:001 | null

问题出在哪里?

tl;dr执行流表联接时,消息必须在流消息之前已存在(并且必须带有时间戳)。如果重新发出源流消息,则在填充表主题后,联接将成功。

示例数据

使用kafkacat填充主题(将数据粘贴到stdin)

cat > /tmp/msgs <<EOF
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
EOF
kafkacat -b localhost:9092 -P -t sessionDetails /tmp/msgs

cat > /tmp/msgs <<EOF
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
EOF
kafkacat -b localhost:9092 -P -t voipDetails /tmp/msgs

验证主题内容:

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t sessionDetails
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t voipDetails
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}

声明源流

ksql> CREATE STREAM session_details_stream 
(Media varchar ,SessionIdTime varchar,SessionIdSeq long) 
WITH (KAFKA_TOPIC = 'sessionDetails', VALUE_FORMAT = 'json');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM voip_details_stream 
(SessionIdTime varchar,SessionIdSeq long, Details varchar) 
WITH (KAFKA_TOPIC = 'voipDetails', VALUE_FORMAT = 'json');
Message
----------------
Stream created
----------------
ksql> select * from session_details_stream;
1526553130864 | null | Foo | 2018-05-17 11:25:33 BST | 1
1526553130865 | null | Foo | 2018-05-17 11:26:33 BST | 2
^CQuery terminated
ksql> select * from voip_details_stream;
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1a
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1b
1526553143176 | null | 2018-05-17 11:26:33 BST | 2 | Bar2
^CQuery terminated

对 SessionIdTime+SessionIdSeq 上的每个主题进行重新分区

ksql> CREATE STREAM SESSION AS 
SELECT Media, CONCAT(SessionIdTime,SessionIdSeq) AS root 
FROM session_details_stream 
PARTITION BY root;
Message
----------------------------
Stream created and running
----------------------------

ksql> SELECT ROWTIME, ROWKEY, root, media FROM SESSION;
1526553130864 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Foo
1526553130865 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Foo

ksql> CREATE STREAM VOIP AS 
SELECT CONCAT(SessionIdTime,SessionIdSeq) AS root, details 
FROM voip_details_stream 
PARTITION BY root;
Message
----------------------------
Stream created and running
----------------------------
ksql>

声明表

ksql> CREATE TABLE VOIP_TABLE (root VARCHAR, details VARCHAR) 
WITH (KAFKA_TOPIC='VOIP', VALUE_FORMAT='JSON', KEY='root');
Message
---------------
Table created
---------------
ksql> SELECT ROWTIME, ROWKEY, root, details FROM VOIP;
1526553143176 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Bar2
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1a
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1b

将会话流加入 VOIP 表

ksql> SELECT s.ROWTIME, s.root, s.media, v.details 
FROM SESSION s 
LEFT OUTER JOIN VOIP_TABLE v ON S.root = V.root;
1526553130864 | 2018-05-17 11:25:33 BST1 | Foo | null
1526553130865 | 2018-05-17 11:26:33 BST2 | Foo | null

使上述 JOIN 查询保持运行状态。向源主题重新发出 SESSION 消息(使用kafkacatsessionDetails发送与上述相同的消息):

1526553862403 | 2018-05-17 11:25:33 BST1 | Foo | Bar1a
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2

Per Rohan Desai on the Confluent Community Slack:

问题是流中记录的行时间

早于表中希望它联接的记录的行时间。因此,在处理流记录时,表中没有相应的记录

使用ROWTIME查看源表上的消息以获取其中一个连接键,以查看消息时间戳(不要与基于时间戳的root混淆):

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, details from VOIP WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:23 | 1526553143176 | 2018-05-17 11:26:33 BST2 | Bar2

将此消息与源会话流主题上的消息进行比较:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, media from SESSION WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:10 | 1526553130865 | 2018-05-17 11:26:33 BST2 | Foo
2018-05-17 11:46:28 | 1526553988639 | 2018-05-17 11:26:33 BST2 | Foo

其中第一个(在11:32:10/1526553130865)在相应的VOIP消息(如上所示)之前,并导致了我们第一次看到的null连接结果。其中第二个日期较晚(11:46:28/1526553988639)产生了我们随后看到的成功连接:

1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2

最新更新