如何使用带有 pykafka 的消息批处理或缓冲区生成 kafka 主题



如何使用 pykafka 的消息批处理或缓冲区生成 kafka 主题。我的意思是,一个生产者可以在一个制作过程中产生许多信息。我知道使用消息批处理或缓冲区消息的概念,但我不知道如何实现它。我希望有人能在这里帮助我

PyKafka 透明地处理生产者中的消息批处理 - 您不必做任何特殊的事情来确保消息是批量生成的。Producer类提供了一堆配置选项,可让您自定义批处理行为。文档中提供了这些选项的完整列表,但一些最重要的选项是:

  • max_queued_messages- 当您produce()比这更多的消息时,请立即发送批处理
  • min_queued_messages- 当您至少produce()这么多消息时,发送批处理
  • linger_ms- 自上一批以来已经过去了这么长时间,发送批次

只需使用send()方法。 您不需要自己管理它。

send(( 是异步的。 调用时,它将记录添加到 挂起的记录发送并立即返回。这允许生产者 将单个记录批处理在一起以提高效率。

你的任务只是配置两个道具:batch_sizelinger_ms

创建者为每个分区维护未发送记录的缓冲区。 这些缓冲区的大小由"batch_size"配置指定。 使其更大可能会导致更多的批处理,但需要更多 内存(因为我们通常会为每个缓冲区之一 活动分区(。

这两个道具将按以下方式完成:

一旦我们获得了一个分区batch_size记录的价值,无论此设置如何,它都会立即发送,但是如果我们为该分区累积的字节数少于此数量,我们将"徘徊"指定的时间等待更多记录出现。

最新更新