我们正在尝试将 Kafka KSQL 迁移到我们的系统中,并希望分享一些在此过程中我们无法解决的问题。 我们的集群中有 3 个 Kafka 节点,每个服务器都有:
8 CORE
50G+ RAM
100G ssd
在每台服务器上,我们都有 zookeeper 来管理集群。 所有操作系统限制都会增加,以便节点可以使用比所需更多的资源:
Xmx: 10G
Xms: 10G
nofiles: 500000
目前,来自生产者到集群的流量很小(~每秒 10 条消息(。现在我们只有一个生产者,消息格式为:
{"user_id": <id|INT>, "action_id": <id|INT>, "amount": <amount|FLOAT>}
Kafka 中的主题分为 6 个分区,有 1 个复制:
Topic:<some_topic> PartitionCount:6 ReplicationFactor:1 Configs:
Topic: <some_topic> Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: <some_topic> Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: <some_topic> Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: <some_topic> Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: <some_topic> Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: <some_topic> Partition: 5 Leader: 2 Replicas: 2 Isr: 2
现在,当然,节点没有得到充分利用,在 kafka 方面一切都很好(
我们希望在 Kafka 之上使用 KSQL 来过滤使用 SQL 进入我们系统的数据。 以下是 KSQL 服务器资源:
32 CORE
100G+ RAM
50G+ ssd
我们只有一个表:
Field | Type
-------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ACTION_ID | INTEGER
USER_ID | INTEGER
AMOUNT | DOUBLE
以下是创建表时使用的命令:
create table <some_table> (action_id INT, user_Id INT, amount DOUBLE) with (KAFKA_TOPIC='<some_topic>', VALUE_FORMAT='JSON', KEY = 'user_id');
在我们的应用程序中,我们需要按user_id订阅表,如下所示:
SELECT * FROM <some_table> WHERE USER_ID=<some_user_id>;
对于生产 KSQL 服务器配置,我们使用 confluent 的官方建议: https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html#recommended-ksql-production-settings
KSQL 服务器的操作系统和软件限制也有所增加:
Xmx: 10G (we have tried till 50G)
Xms: 10G (we have tried till 50G)
nofiles: 500000
如果我们只使用一个订阅,我们不会遇到任何问题(在这种情况下一切都很好(。
但是我们总共需要超过200000+订阅。因此,当我们尝试获取 100-200 个并行订阅时,客户端中会出现"读取超时"。在服务器中,我们没有看到任何可能影响 KSQL 的异常负载。
我们假设该问题仅与 KSQL 有关,因为当我们尝试使用另一台 KSQL 服务器(在不同的机器中(时,同时我们可以看到第二台服务器工作正常并且可以处理一些 1-20 个订阅。
我在与 KSQL 服务器连接的互联网上找不到任何基准测试,在文档中,我也找不到任何关于 KSQL 用例的提及,也许它的设计只是为了提供具有大量数据的少数连接,或者我们的系统配置错误,所以我们应该修复它以使用该软件来实现我们的目标。
任何建议都会有所帮助。
提前致谢(
您在使用ksqlDB 时遇到可伸缩性的原因是您使用推送查询的方式不是为使用它们而设计的。还!
推送查询:
SELECT * FROM <some_table> WHERE USER_ID=<some_user_id>;
您用来订阅特定用户的更新似乎是一件完全明智的事情。
但是,在 ksql 版本中,您使用的此类推送查询仅供在 CLI 中执行命令的人员使用。 每个这样的查询将在内部消耗大量服务器资源并消耗源主题中的所有行。
基本上,推送查询不可扩展。
ksqlDB 团队正在积极致力于增强 ksql 以支持这种确切风格的用例,因为我们认识到这是一件很常见的事情。 (见 https://github.com/confluentinc/ksql/issues/5517(。
同时,实现此目的的方法是使用自己的使用者直接从 Kafka 使用数据,并在本地进行过滤。