我们有一个流处理器应用程序,它使用具有n个分区(n>1(的主题中的数据
从新开始(没有变更日志主题(,开发环境总是创建一个具有n个分区的变更日志主题。
在相同的场景中,在生产中,分区数总是等于1,然后我们手动更改为n,以匹配主题的分区数。
我检查了所有的文档,试图为变更日志设置分区数,但我找不到任何方法。我的最后一个选择是检查变更日志主题是否不存在,然后我用n个分区创建它。
由于框架会自动创建该主题,有没有任何方法可以在不手动或在代码中创建该主题的情况下设置变更日志的分区数?
附言:我们使用的是Kafka客户端版本2.3.1。
谢谢,
奥斯汀
我刚刚查看了源代码以了解此功能的详细信息,在撰写本文时,发现不允许设置change-logs
主题的分区。
解释
change-logs
主题被归类为内部主题,在以下两类(InternalTopicConfig
和InternalTopicManager
(中有证据表明这一点:
-
类InternalTopicConfig的源代码包含以下方法,该方法还表示强制执行此类内部主题上的分区数:
public void setNumberOfPartitions(final int numberOfPartitions) { if (hasEnforcedNumberOfPartitions()) { throw new UnsupportedOperationException("number of partitions are enforced on topic " + "" + name() + " and can't be altered."); ...
-
类InternalTopicManager的源代码中的嵌入文档清楚地说明了
makeReady()
方法的这一点。/** * Prepares a set of given internal topics. * * If a topic does not exist creates a new topic. * If a topic with the correct number of partitions exists ignores it. * If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again. * @return the set of topics which had to be newly created */ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) ...
正如您在评论中看到的,如果存在分区计数正确的主题,它将被忽略,如果分区计数不正确,那么您将看到错误,建议使用应用程序重置工具。
希望这能有所帮助!
目前我们正在连接到支持SSL的MSK主题,因此我们没有通过应用程序创建内部主题的写访问权限。因此,作为一项变通方案,我们要求MSK管理员手动创建具有所需名称的变更日志主题,以便应用程序可以读取它。
此外,目前我们所有的用户主题都有3个分区,创建的变更日志主题也有3个具有以下更新设置的分区。这些设置将派上用场,以防您试图手动创建变更日志主题(启用压缩以节省空间(:
Changelog 的配置
此外,变更日志主题名称如下所示:(您的应用程序id(-(具体化As下的userDefined属性(-changelog