我正在将Alpakkakafka与scala应用程序一起使用。我的Kafka正在docker中运行,我正试图使用我的代码在Kafka生产者上发布消息。我的代码如下
def sendMsg(xmlFile: String): Future[Done] = {
futureToFutureTry {
val producer = SendProducer(producerSettings)
producer.send(new ProducerRecord("topic_name", "Key", xmlFile)).map(result => {
producer.close()
})
} flatMap {
case Success(v) => v
case Failure(e) =>
Future.failed(e)
}
}
代码很好,但当我发送大型xml文件时,它会给我an error that org.apache.kafka.common.errors.RecordTooLargeException: The message is 22093081 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
我可以在stackoverflow上看到一些解决方案,但没有一个解释我需要在哪里进行更改。如何增加最大请求大小。我是卡夫卡的新手。当应用程序启动时,我可以看到它正在打印默认的max.request.size,但不知道它是如何打印的,以及如何、在哪里和我需要做什么来解决它。请帮助
如果您计划向Kafka生成大于1048588字节的默认大小的消息,则需要更改三个不同位置的设置:
- 主题配置
- 生产者配置
- 消费者配置
主题配置
创建主题时,需要确保将max.message.bytes
设置为更大的值。在创建Kafka主题时设置此配置可以使用kafka-topics
脚本完成:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-name --partitions 1 --replication-factor 1 --config max.message.bytes=50000000
此配置的描述如下:
Kafka允许的最大记录批大小(如果启用压缩,则在压缩之后(。如果增加了这个值,并且存在0.10.2以上的使用者,则使用者的提取大小也必须增加,以便他们能够提取如此大的记录批次。在最新的消息格式版本中,为了提高效率,总是将记录分组为批。在以前的消息格式版本中,未压缩的记录不会分组为批,在这种情况下,此限制仅适用于单个记录。
生产者配置
在变量producerSettings
中,您需要增加max.request.size
的值,例如:
val producerSettingsNew = producerSettings + ("max.request.size" -> "50000000")
此设置的描述如下:
请求的最大大小(以字节为单位(。此设置将限制生产者在单个请求中发送的记录批数,以避免发送大量请求。这实际上也是对最大未压缩记录批大小的限制。请注意,服务器对记录批量大小有自己的上限(如果启用了压缩,则在压缩之后(,这可能与此不同。
消费者配置
此外,您的消费者需要通过增加max.partition.fetch.bytes
在设置中进行配置。
描述如下:
服务器将返回的每个分区的最大数据量。记录由消费者分批提取。如果提取的第一个非空分区中的第一个记录批大于此限制,则仍将返回该批,以确保使用者能够取得进展。broker接受的最大记录批处理大小是通过message.max.bytes(broker配置(或max.message.bytes(主题配置(定义的。有关限制使用者请求大小的信息,请参阅fetch.max.bytes。