配置如下:
- 来源:AWS SQS来源
- 经纪人:卡夫卡经纪人
- 触发器:带水槽的触发器是Knative服务
- Knative服务:一个containerConcurrent=1的Knative service(确保一次有1个请求/pod(
向SQS队列发送1000条消息,Knative服务只创建大约100-150个pod来处理所有请求
我检查了一下,发现Kafka broker一次只向Knative Service发送大约100-180个事件
因此,Knative服务只创建了大约1xx个pod来处理这些事件
剩余的事件将无法发送到Knative服务,broker将重试发送
Kafka broker中是否有任何配置可以增加一次发送到Knative服务的事件数量?
FYI:
- 使用基于MTchannel-based的broker(内存通道(,它可以一次向Knative服务发送400-1000个事件
- 我使用Knative v1.7.1和Knative Kafka broker v1.7.0
这里的Kafka主题和交付似乎达到了并发限制;特别是,Kafka传递并行性(一次可以传递多少个事件(可能受partitions * event-batch-size
的限制。分区的数量似乎由kafka-broker-config
ConfigMap控制,后者由config-kafka-broker-data-plane
ConfigMap中的max.poll.records
控制。
Kafka实现的另一个限制因素是来自单个调度器的传出HTTP请求的数量,这似乎由数据平面的maxPoolSize
参数控制。
您在InMemory通道中没有看到这种行为,因为它不从分区执行有序传递,只是尝试并行传递所有消息,而不跟踪顺序或阻止旧消息上的新消息。
难道不是因为您有containerConcurrent=1吗?
Broker从SQS接收事件,并提交接收到的事件的偏移量xx秒(auto.commit.interval.ms(。
现在,使用containerConcurrency意味着每个发送到kservice的事件都可以被整个使用者使用一个。如果您要增加这个数字,您也将增加可能发送的事件数量。
如果你关心每个分区的顺序,你可以播放并使用kafka.eventing.knactive.dev/delivery.order设置,这将使每个分区按顺序执行事件,分区中的一个事件被成功处理,下一个事件将被发送。其他分区是异步处理的。
我确认我们可以使用两个参数:
- 最大.poll.records
- 最大池大小
https://github.com/knative-sandbox/eventing-kafka-broker/issues/2597#issuecomment-1243665444