我们正在使用KSQLDB和一些疑问执行POC:-
我有一个名为USERPROFILE
的Kafka主题,它有大约1亿条唯一记录和10天的保留策略。这个Kafka主题继续从其底层RDBMS表实时接收INSERT/UPDATE类型的事件。
以下是本卡夫卡主题中收到的记录的简单结构:-
{"userid":1001,"firstname":"Hemant","lastname":"Garg","countrycode":"IND","rating":3.7}
1.(我们已经在上述主题上开通了Kafka Stream:-
create STREAM userprofile_stream (userid INT, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'USERPROFILE')>;
2.(因为,给定的userId可以有更新,并且我们只想要唯一的记录(对于每个userId(,我们还打开了关于上述主题的另一个Kafka表:-
ksql> create TABLE userprofile_table(userid VARCHAR PRIMARY KEY, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) WITH (KAFKA_TOPIC = 'USERPROFILE', VALUE_FORMAT = 'DELIMITED');
问题是:-
打开KTable是否需要磁盘上的额外空间?例如,卡夫卡主题有1亿条记录,KTable中是否也存在相同的记录,或者这只是底层卡夫卡主题的一些虚拟视图?
对于我们打开的流,同样的问题。打开KStream是否需要(Brokers服务器的(磁盘上的额外空间?例如,Kafka主题有1亿条记录,KStream中是否也存在相同的记录,或者这只是底层Kafka话题的一些虚拟视图?
比方说,我们在5月1日收到了id为1001的记录,然后在5月11日,该记录将不再出现在卡夫卡主题上,但该记录是否仍会出现在kstream/Ktable上?KStream/KTable是否也有一些保留政策,就像我们对Topic一样?
非常感谢您的回答。
--最好的aditya
除此之外,KStream和KTables都有卡夫卡中的主题作为支持。因此,在ksqlDB服务器上创建流和表将在Kafka集群上创建实际的主题。
话虽如此,ksqlDB中的流和表是根据需要实现的,并且经过了相当优化,Confluent的这两篇文章通过良好的视觉帮助对内部行为提供了更多的见解:
- 关于流:https://www.confluent.io/blog/how-real-time-stream-processing-works-with-ksqldb/
- 关于表格:https://www.confluent.io/blog/how-real-time-materialized-views-work-with-ksqldb/
您甚至可以自己查看创建的数据。为了举例说明,我创建了:
- 来自原始主题的
MESSAGES_STREAM
流 - 来自以上流的
MATERIALIZED_MESSAGES_STTREAM
- 来自第一个流的
MESSAGES
表
以下是可供参考的创建命令:
ksql> CREATE STREAM messages_stream (user_id BIGINT KEY, message VARCHAR)
WITH (KAFKA_TOPIC = 'hello_topic_json', VALUE_FORMAT='JSON');
ksql> CREATE STREAM materialized_messages_stream AS
SELECT user_id, UCASE(message)
FROM messages_stream
EMIT CHANGES;
ksql> CREATE TABLE messages AS
SELECT user_id, count(*) as msg_count
FROM messages_stream
GROUP BY user_id
EMIT CHANGES;
通过查看ksqlDB中的详细信息,我们可以看到第一个流使用原始主题作为源:
ksql> describe extended MESSAGES_STREAM;
Name : MESSAGES_STREAM
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : hello_topic_json (partitions: 1, replication: 1)
-- […]
ksql> describe extended MATERIALIZED_MESSAGES_STREAM;
Name : MATERIALIZED_MESSAGES_STREAM
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : MATERIALIZED_MESSAGES_STREAM (partitions: 1, replication: 1)
-- […]
ksql> describe extended MESSAGES;
Name : MESSAGES
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : MESSAGES (partitions: 1, replication: 1)
-- […]
查看集群上声明的主题,我们可以看到第二个流、表及其在后台创建的changelog
主题:
$ ./kafka-topics.sh --bootstrap-server localhost:29092 --list
MATERIALIZED_MESSAGES_STREAM
MESSAGES
__consumer_offsets
__transaction_state
_confluent-ksql-ksql_docker_command_topic
_confluent-ksql-ksql_dockerquery_CTAS_MESSAGES_1-Aggregate-Aggregate-Materialize-changelog
_schemas
hello_topic_json
您还可以看到流和表之间的保留策略是不同的。前者将删除旧记录,而后者将压缩数据:
$ ./kafka-topics.sh --bootstrap-server localhost:29092 --topic MATERIALIZED_MESSAGES_STREAM --describe
Topic: MATERIALIZED_MESSAGES_STREAM PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=delete
$ ./kafka-topics.sh --bootstrap-server localhost:29092 --topic MESSAGES --describe
Topic: MESSAGES PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact
TL;DR,回到你的问题:
- 是的,打开KTable需要空间,但很可能不会对已使用的字节进行1到1的映射
- 在您的情况下,流很可能会使用主题作为参考,并且不会占用更多空间,因为您没有进行任何数据转换
- 表上的保留策略是压缩,因此您的条目在表上仍然可用。然而,在您的流中,数据将与参考主题中的数据一样可用