如何使用组合键从主题创建 KSQL 表?



我有一些带有字段stringAstringB的主题数据,我只是在从主题创建 KSQL 表时尝试将其用作键。

只是对@Robin莫法特的更新。 使用以下

CREATE STREAM TEST_REKEY AS 
SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY, 
STRINGA, 
STRINGB, 
COL3 
FROM TEST 
PARTITION BY STRINGA + STRINGB ;

而不是

CREATE STREAM TEST_REKEY AS 
SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY, 
STRINGA, 
STRINGB, 
COL3 
FROM TEST 
PARTITION BY MY_COMPOSITE_KEY ;

注意:列排序很重要为我工作!(CLI v0.10.1,服务器 v0.10.1(

下面是一个示例。首先,我将创建并填充测试流

ksql> CREATE STREAM TEST (STRINGA VARCHAR, 
STRINGB VARCHAR, 
COL3 INT) 
WITH (KAFKA_TOPIC='TEST',
PARTITIONS=1,
VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',1);
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',2);
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('C','D',3);
ksql>
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM TEST EMIT CHANGES LIMIT 3;
+--------------+--------+---------+----------+------+
|ROWTIME       |ROWKEY  |STRINGA  |STRINGB   |COL3  |
+--------------+--------+---------+----------+------+
|1578569329184 |null    |A        |B         |1     |
|1578569331653 |null    |A        |B         |2     |
|1578569339177 |null    |C        |D         |3     |

请注意,ROWKEY为空。

现在,我将创建一个新流,从第一个流填充,并创建将其设置为键的复合列。我也包括原始字段本身,但如果你不需要它们,这是可选的:

ksql> CREATE STREAM TEST_REKEY AS 
SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY, 
STRINGA, 
STRINGB, 
COL3 
FROM TEST 
PARTITION BY MY_COMPOSITE_KEY ;
Message
------------------------------------------------------------------------------------------
Stream TEST_REKEY created and running. Created by query with query ID: CSAS_TEST_REKEY_9
------------------------------------------------------------------------------------------

现在,您有一个数据流,其中的键设置为您的组合键:

ksql> SELECT ROWKEY , COL3 FROM TEST_REKEY EMIT CHANGES LIMIT 3;
+---------+-------+
|ROWKEY   |COL3   |
+---------+-------+
|AB       |1      |
|AB       |2      |
|CD       |3      |
Limit Reached
Query terminated

您还可以检查基础 Kafka 主题以验证密钥:

ksql> PRINT TEST_REKEY LIMIT 3;
Format:JSON
{"ROWTIME":1578569329184,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":1}
{"ROWTIME":1578569331653,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":2}
{"ROWTIME":1578569339177,"ROWKEY":"CD","MY_COMPOSITE_KEY":"CD","STRINGA":"C","STRINGB":"D","COL3":3}
ksql>

完成此操作后,我们现在可以在重新键入的主题之上声明一个表:

CREATE TABLE TEST_TABLE (ROWKEY VARCHAR KEY, 
COL3 INT) 
WITH (KAFKA_TOPIC='TEST_REKEY', VALUE_FORMAT='JSON');

从此表中,我们可以查询状态。请注意,组合键AB仅显示最新值,这是表语义的一部分(与上面的流相比,您可以在其中看到两个值 - 流和表都是相同的 Kafka 主题(:

ksql> SELECT * FROM TEST_TABLE EMIT CHANGES;
+----------------+---------+------+
|ROWTIME         |ROWKEY   |COL3  |
+----------------+---------+------+
|1578569331653   |AB       |2     |
|1578569339177   |CD       |3     |

相关内容

  • 没有找到相关文章

最新更新