Kafka Connect将同一任务分配给多个工作人员



我在分布式模式下使用Kafka Connect。我现在多次观察到的一个奇怪行为是,在一段时间后(可以是几个小时,也可以是几天(,似乎发生了一个平衡错误:相同的任务被分配给多个工人。因此,它们同时运行,并且根据连接器的性质,会出现故障或产生"不可预测"的输出。

我能够用来重现行为的最简单配置是:两个Kafka Connect工作程序,两个连接器,每个连接器只有一个任务。Kafka Connect已部署到Kubernetes中。卡夫卡本身就在《合流的云》中。KafkaConnect和Kafka都是同一版本(5.3.1(

日志中的相关消息:

工人A:

[2019-10-30 12:44:23,925] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,926] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-hdfs-sink, some-mqtt-source], taskIds=[some-hdfs-sink-0, some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)

工人B:

[2019-10-30 12:44:23,930] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,936] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-mqtt-source], taskIds=[some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)

在上面的日志摘录中,您可以观察到相同的任务(some-mqtt-source-0(被分配给两个工人。在这条消息之后,我还可以看到两个工作线程上的任务实例的日志消息。

这种行为不依赖于连接器(我在其他任务中也观察到了这种情况(。它也不会在工人开始工作后立即发生,而是在一段时间后才发生。

我的问题是这种行为的原因是什么?

编辑1:我试着运行3个工人,而不是两个,认为这可能是一个分布式共识问题。事实似乎并非如此,拥有3名员工并不能解决问题。

编辑2:我注意到,就在一个工人a被分配一个最初在工人B上运行的任务之前,该工人(B(在加入一个组时观察到一个错误。例如,如果任务在第N代中"重复",workerB将不会在日志中显示"已成功加入第N代的组"消息。更重要的是,在N-1代和N+1代之间,workerB通常会记录类似Attempt to heartbeat failed for since member idGroup coordinator bx-xxx-xxxxx.europe-west1.gcp.confluent.cloud:9092 (id: 1234567890 rack: null) is unavailable or invalid的错误。工人B通常在第N代之后不久加入第N+1代(有时只需大约3秒(。现在已经清楚是什么触发了这种行为。但是:

  • 虽然我知道可能会有这样的临时问题,而且在一般情况下可能是正常的,但为什么在所有服务器成功加入下一代后,不重新平衡来解决问题?尽管随之而来的是更多的重新分配,但它并没有正确地分配任务,并且永远保持"重复"(直到我重新启动工人(。

  • 在某些时期,再平衡几乎每几个小时发生一次,而在其他时期,则每5分钟(精确到几秒钟(发生一次;原因可能是什么?什么是正常的?

  • 考虑到我使用的是Confluent Cloud,"Group coordinator不可用或无效"错误的原因是什么?是否有任何配置参数可以在Kafka Connect中进行调整,以使其对该错误更有弹性?我知道有session.timeout.msheartbeat.interval.ms,但文档非常简单,甚至不清楚将这些参数更改为更小或更大的值会产生什么实际影响。

编辑3:我观察到,这个问题对汇点任务来说并不重要:尽管相同的汇点任务被分配给多个工作者,但相应的消费者被分配到不同的分区,就像他们通常应该分配的那样,一切都几乎正常工作——我只是得到了比最初要求的更多的任务。但是,在源任务的情况下,行为会破坏-任务同时运行,并在源端争夺资源。

编辑4:同时,我将Kafka Connect降级到了2.2版本(Confluent Platform 5.2.3(,这是"增量合作再平衡"之前的版本。它在过去的两天里运行良好。因此,我认为这种行为与新的再平衡机制有关。

正如评论中所提到的,Jira Kafka-9184是为了解决这个问题而设计的,它已经得到了解决。

该修复程序在2.3.2及以上版本中可用。

因此,现在的答案是:升级到最新版本应该可以防止这个问题的发生。

最新更新