如何使用kafka主题中的大型消息


Kafka Version 1.1.0

我有一个单节点kafka代理,在config/server.properties中有以下配置:

Broker配置:

message.max.bytes=100000000
max.message.bytes=100000000
replica.fetch.max.bytes=150000000
log.segment.bytes=1073741824 (Default)

控制台使用者属性文件具有以下配置:

消费者财产:

receive.buffer.bytes=100000000
max.partition.fetch.bytes=100000000
fetch.max.bytes=52428800

我正在生成一条大小约为20KB的消息。我使用控制台生成器生成一个主题。然后启动一个关于该主题的控制台使用者,它不会使用完整的消息(介于两者之间(。

我看过这篇文章,并试图设置相同的设置,但似乎没有成功。

我在这里错过了什么?请帮帮我。

更新:

> echo | xargs --show-limits
Your environment variables take up 3891 bytes
POSIX upper limit on argument length (this system): 2091213
POSIX smallest allowable upper limit on argument length (all systems): 4096
Maximum length of command we could actually use: 2087322
Size of command buffer we are actually using: 131072
Maximum parallelism (--max-procs must be no greater): 2147483647

更新1:

我测试了另一个场景。这一次,我使用java生产者而不是控制台生产者来生成相同的消息,现在当我消费时,我得到了完整的消息。

可能会出现问题,因为您使用控制台生产者并将消息复制到终端(linux(,但终端将长消息截断为最大固定长度。

您可以尝试使用echo | xargs --show-limits或其他外壳程序或术语设置来查找。

它也可以来自操作系统,例如ARG_MAX:

getconf ARG_MAX

对于您的邮件来说可能太小。

最简单的方法是直接将文件写入kafka控制台生产商,例如:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt

如果它工作正确,就意味着这确实是问题所在。


对于记录,还应测试这些设置:

  • 使用者端:fetch.message.max.bytes-这将确定使用者可以获取的消息的最大大小
  • 代理端:replica.fetch.max.bytes-这将允许代理中的副本在集群中发送消息,并确保消息被正确复制。如果这太小,那么消息将永远不会被复制,因此,消费者将永远不会看到消息,因为消息永远不会被提交(完全复制(
  • Broker端:message.max.bytes-这是Broker可以从生产者接收到的最大消息大小
  • Broker端(每个主题(:max.message.bytes-这是Broker允许附加到主题的最大消息大小。此尺寸经过压缩前验证。(默认为经纪人的message.max.bytes。(

最新更新