使用Kafka处理大型消息



如何在Kafka中处理大型消息,例如超过20MB等。

[2019-03-13 08:59:10,923]将消息发送到主题测试的错误错误:13字节:13字节,值:11947696字节,带有错误:(org.apache.kafkace.kafka.clients.producer.producer。内部。org.apache.kafka.common.errors.recordtoolargeexception:请求包含的消息大于服务器将接受的最大消息大小。

[2019-03-13 03:59:1478]将消息发送到主题测试的错误错误:13字节:13字节,值:11947696字节带有错误:(org.apache.kafka.kafka.clients.producer.producer。内部。org.apache.kafka.common.errors.recordtoolargeexception:序列化时该消息是11947797字节,该字节大于使用Max.Request.size配置配置的最大请求大小。

div class =" ans">

我们需要设置以下配置

Broker

replica.fetch.max.bytes :此属性的更改将允许经纪人中的副本在群集中发送消息并确保正确复制消息。如果这太小,那么消息将永远不会复制,因此,消费者将永远不会看到该消息,因为消息将永远不会被提交(完全复制)。

message.max.bytes :这是经纪人可以从生产者那里收到的消息的最大大小。

经纪人(主题)

max.message.bytes :Kafka允许的最大记录批次大小。如果增加了,并且有超过0.10.2的消费者,那么消费者的提取尺寸也必须增加,以便他们可以获取这么大的记录批次。在最新的消息格式版本中,记录始终分为批处理以提高效率。在上一个消息格式版本中,未压缩记录未分组为批处理,此限制仅适用于该情况下的单个记录(默认为Broker的Message.max.bytes)。

生产者

max.request.size:字节中请求的最大大小。此设置将限制生产者将在单个请求中发送的记录批次数量,以避免发送巨大的请求。这也是有效的最大记录批量大小的上限。请注意,服务器在记录批处理大小上具有自己的上限,这可能与此不同。

compression.type:设置为活泼,这将增加可以随着单个请求发送的数据总量,并应与较大的批次配对。尺寸。

buffer.memory:如果启用了压缩,则也应提高缓冲区大小。

batch.size:批次大小应至少为10 kb,在300kb左右的回报率降低(对于远程客户端而言较少)。较大的批次也会导致更好的压缩比。

Linger.ms: Linger.ms抢占批处理大小的任何界限。增加此值,以确保在较慢的生产时间

中不会发送较小的批次

消费者

fetch.message.max.bytes:这将确定消费者可以获取的消息的最大大小。

max.partition.fetch.bytes:服务器将返回的最大数据量。

最新更新