Kafka Connect fetch.max.wait.ms & fetch.min.bytes 合并不被接受?



我正在使用Kafka Connect(2.3.0(创建自定义SinkConnector,需要针对吞吐量而不是延迟进行优化。理想情况下,我想要的是:

~ 20 MB 或 100k 的批处理记录先到的任何东西,但如果消息速率较低,请至少每分钟处理一次(避免小批量,但最小 MySinkTask.put(( 速率为每分钟一次(。

这是我为使用者设置而设置的,以尝试实现它:

  • consumer.max.poll.records=100000
  • consumer.fetch.max.bytes=20971520
  • consumer.fetch.max.wait.ms=60000
  • consumer.max.poll.interval.ms=120000
  • consumer.fetch.min.bytes=1048576

    我需要这个 fetch.min.bytes 设置,否则尽管有其他设置,但我每秒仍调用 MySinkTask.put(( 多次......?

现在,我在低速率情况下观察到的是,MySinkTask.put(( 多次以 0 记录调用,几分钟过去了,直到达到 fetch.min.bytes,然后我一次将它们全部获取。

到目前为止,我无法理解:

  • 为什么 fetch.max.wait.ms=60000 没有从使用者向下推送到连接器的 put(( 调用?这不应该优先于fetch.min.bytes吗?
  • 如果 fetch.min.bytes=1(默认值(,什么设置控制每秒 ~ 2x 对 MySinkTask.put(( 的调用?我不明白为什么它会这样做,即使是 Connect 运行时设置的详细输出也不会显示低于秒倍数的任何间隔。

我已经仔细检查了日志输出,连接运行时打印的INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:行显示了我传递的consumer.前缀值时的预期值。

"至少每个间隔处理一次"部分似乎是不可能的,因为fetch.min.bytes使用者设置优先,并且 Connect 不允许你在任务运行时动态调整 ConsumerConfig。

目前的解决方法是手动在任务中批处理;将fetch.min.bytes设置为 1 (yikes(,在 put(( 调用时在任务中缓冲记录,并在必要时刷新。这不是很理想,因为它推断出我希望避免的连接器的一些开销。

Connect 如何从其消费者的轮询到 SinkTask.put(( 每秒进行 ~ 2x 批处理的逻辑对我来说仍然是一个谜,但它比每条消息都被调用要好。

相关内容

  • 没有找到相关文章

最新更新