Kafka Connect:没有为连接器创建任务



我们正在使用Debezium(MongoDB(和Confluent S3连接器以分布式模式运行Kafka Connect(Confluent Platform 5.4,即Kafka 2.4(。当通过REST API添加新连接器时,连接器将在RUNNING状态下创建,但不会为连接器创建任务。

暂停并恢复连接器没有帮助。当我们停止所有工人,然后重新启动他们时,任务就会创建,一切都会正常运行。

这个问题不是由连接器插件引起的,因为我们看到Debezium和S3连接器都有相同的行为。此外,在调试日志中,我可以看到Debezium正确地从Connector.taskConfig((方法返回了任务配置。

有人能告诉我该怎么做吗?如果我们可以在不重新启动工人的情况下添加连接器?谢谢

配置详细信息

集群有3个节点,具有以下连接分布式。属性:

bootstrap.servers=kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092,kafka-broker-004:9092
group.id=tdp-QA-connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets-qa
offset.storage.replication.factor=3
offset.storage.partitions=5
config.storage.topic=connect-configs-qa
config.storage.replication.factor=3
status.storage.topic=connect-status-qa
status.storage.replication.factor=3
status.storage.partitions=3
offset.flush.interval.ms=10000
rest.host.name=tdp-QA-kafka-connect-001
rest.port=10083
rest.advertised.host.name=tdp-QA-kafka-connect-001
rest.advertised.port=10083
plugin.path=/opt/kafka-connect/plugins,/usr/share/java/
security.protocol=SSL
ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
ssl.truststore.password=<secret>
ssl.endpoint.identification.algorithm=
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
producer.ssl.truststore.password=<secret>
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
consumer.ssl.truststore.password=<secret>
max.request.size=20000000
max.partition.fetch.bytes=20000000

连接器配置

Debezium示例:

{
"name": "qa-mongodb-comp-converter-task|1",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
"mongodb.name": "qa-debezium-comp",
"mongodb.ssl.enabled": true,
"collection.whitelist": "converter[.]task",
"tombstones.on.delete": true
}
}

S3示例:

{
"name": "qa-s3-sink-task|1",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "qa-debezium-comp.converter.task",
"topics.dir": "data/env/qa",
"s3.region": "eu-west-1",
"s3.bucket.name": "<bucket-name>",
"flush.size": "15000",
"rotate.interval.ms": "3600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"transforms": "ExtractDocument",
"transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
}
}

连接器是使用卷曲创建的:curl -X POST -H "Content-Type: application/json" --data @<json_file> http:/<connect_host>:10083/connectors

我也遇到了同样的问题,所以我更改了连接器的名称并创建了一个新的连接器,它起了作用,但我不知道这个问题的来源,因为我们在kafka连接日志中没有信息。

删除连接器并使用不同的database.server.id重新创建它。重复此过程,直到任务出现为止。

经过6-7次试验,它对我有效,不知道为什么。暂停和恢复,重新启动连接器/任务对我没有帮助。

在部署ElasticsearchSinkConnector 后,任务为空时,我得到了空的tasks

在部署连接器时将这两个添加到config将有助于找到任务失败的原因。

"errors.log.include.messages": "true",
"errors.log.enable": "true"

在我的情况下,它将显示失败的原因,而不是空的tasks

获取/连接器/弹性搜索接收器/状态

{
"name": "elasticsearch-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.xxx.xxx.xxx:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.xxx.xxx.xxx:8083",
"trace": "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: connect-elasticsearch-sinkn"
}
],
"type": "sink"
}

问题是由连接器名称中的|字符引起的。通常,在旧版Kafka Connect中,连接器名称中的特殊字符在工作人员之间的通信过程中没有正确的url编码。中的更多详细信息https://issues.apache.org/jira/browse/KAFKA-9747

最新更新