KTable和KStream Space考虑因素理解



我们正在使用KSQLDB和一些疑问执行POC:-

我有一个名为USERPROFILE的Kafka主题,它有大约1亿条唯一记录和10天的保留策略。这个Kafka主题继续从其底层RDBMS表实时接收INSERT/UPDATE类型的事件。

以下是本卡夫卡主题中收到的记录的简单结构:-

{"userid":1001,"firstname":"Hemant","lastname":"Garg","countrycode":"IND","rating":3.7} 

1.(我们已经在上述主题上开通了Kafka Stream:-

create STREAM userprofile_stream (userid INT, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'USERPROFILE')>;

2.(因为,给定的userId可以有更新,并且我们只想要唯一的记录(对于每个userId(,我们还打开了关于上述主题的另一个Kafka表:-

ksql> create TABLE userprofile_table(userid VARCHAR PRIMARY KEY, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) WITH (KAFKA_TOPIC = 'USERPROFILE', VALUE_FORMAT = 'DELIMITED');

问题是:-

  • 打开KTable是否需要磁盘上的额外空间?例如,卡夫卡主题有1亿条记录,KTable中是否也存在相同的记录,或者这只是底层卡夫卡主题的一些虚拟视图?

  • 对于我们打开的流,同样的问题。打开KStream是否需要(Brokers服务器的(磁盘上的额外空间?例如,Kafka主题有1亿条记录,KStream中是否也存在相同的记录,或者这只是底层Kafka话题的一些虚拟视图?

  • 比方说,我们在5月1日收到了id为1001的记录,然后在5月11日,该记录将不再出现在卡夫卡主题上,但该记录是否仍会出现在kstream/Ktable上?KStream/KTable是否也有一些保留政策,就像我们对Topic一样?

非常感谢您的回答。

--最好的aditya

ksqlDB服务器由Kafka Streams提供支持。因此,当您创建流或表时,服务器将分别创建KStream或KTable。

除此之外,KStream和KTables都有卡夫卡中的主题作为支持。因此,在ksqlDB服务器上创建流和表将在Kafka集群上创建实际的主题。

话虽如此,ksqlDB中的流和表是根据需要实现的,并且经过了相当优化,Confluent的这两篇文章通过良好的视觉帮助对内部行为提供了更多的见解:

  • 关于流:https://www.confluent.io/blog/how-real-time-stream-processing-works-with-ksqldb/
  • 关于表格:https://www.confluent.io/blog/how-real-time-materialized-views-work-with-ksqldb/

您甚至可以自己查看创建的数据。为了举例说明,我创建了:

  • 来自原始主题的MESSAGES_STREAM
  • 来自以上流的MATERIALIZED_MESSAGES_STTREAM
  • 来自第一个流的MESSAGES

以下是可供参考的创建命令:

ksql> CREATE STREAM messages_stream (user_id BIGINT KEY, message VARCHAR) 
WITH (KAFKA_TOPIC = 'hello_topic_json', VALUE_FORMAT='JSON');
ksql> CREATE STREAM materialized_messages_stream AS 
SELECT user_id, UCASE(message) 
FROM messages_stream 
EMIT CHANGES;
ksql> CREATE TABLE messages AS
SELECT user_id, count(*) as msg_count
FROM messages_stream
GROUP BY user_id
EMIT CHANGES;

通过查看ksqlDB中的详细信息,我们可以看到第一个流使用原始主题作为源:

ksql> describe extended MESSAGES_STREAM;
Name                 : MESSAGES_STREAM
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : hello_topic_json (partitions: 1, replication: 1)
-- […]
ksql> describe extended MATERIALIZED_MESSAGES_STREAM;
Name                 : MATERIALIZED_MESSAGES_STREAM
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : MATERIALIZED_MESSAGES_STREAM (partitions: 1, replication: 1)
-- […]
ksql> describe extended MESSAGES;
Name                 : MESSAGES
Type                 : TABLE
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : MESSAGES (partitions: 1, replication: 1)
-- […]

查看集群上声明的主题,我们可以看到第二个流、表及其在后台创建的changelog主题:

$ ./kafka-topics.sh --bootstrap-server localhost:29092 --list
MATERIALIZED_MESSAGES_STREAM
MESSAGES
__consumer_offsets
__transaction_state
_confluent-ksql-ksql_docker_command_topic
_confluent-ksql-ksql_dockerquery_CTAS_MESSAGES_1-Aggregate-Aggregate-Materialize-changelog
_schemas
hello_topic_json

您还可以看到流和表之间的保留策略是不同的。前者将删除旧记录,而后者将压缩数据:

$ ./kafka-topics.sh --bootstrap-server localhost:29092 --topic MATERIALIZED_MESSAGES_STREAM --describe
Topic: MATERIALIZED_MESSAGES_STREAM PartitionCount: 1   ReplicationFactor: 1    Configs: cleanup.policy=delete
$ ./kafka-topics.sh --bootstrap-server localhost:29092 --topic MESSAGES --describe
Topic: MESSAGES PartitionCount: 1   ReplicationFactor: 1    Configs: cleanup.policy=compact

TL;DR,回到你的问题:

  1. 是的,打开KTable需要空间,但很可能不会对已使用的字节进行1到1的映射
  2. 在您的情况下,流很可能会使用主题作为参考,并且不会占用更多空间,因为您没有进行任何数据转换
  3. 表上的保留策略是压缩,因此您的条目在表上仍然可用。然而,在您的流中,数据将与参考主题中的数据一样可用

相关内容

  • 没有找到相关文章

最新更新