无法从 Kafka Connect 向 Elasticsearch 插入数据



我一直试图将Topic消息发送到Elasticsearch索引,但无论我更改了什么参数,当我用空主题运行此配置时,它实际上在另一边创建索引,但当我将消息写入主题时,而不是将它们复制到弹性索引中,它失败了下面的错误,我添加了参数来忽略模式,模式。忽略,关键。忽略等,我也尝试使用内置价值。转换器等,但就是不能让它运行。

连接器我运行:

`curl -X POST -H "Content-Type: application/json" --data '{
"name": "pageviews3",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "pageviews2",
"connection.url": "http://xxxxxxxxxx:9200",
"type.name": "kafka-connect",
"connection.username": "elastic",
"connection.password": "xxxxxxxxxxxx"
}
}' http://xxxxx:8083/connectors`

另一个连接器,我忽略模式注册表等

curl -X POST -H "Content-Type: application/json" --data '{
"name": "page-views34",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://xxxx:9200",
"tasks.max": "1",
"topics": "pageviews2",
"type.name": "_doc",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"schema.ignore": "true",
"key.ignore": "true",
"connection.username": "elastic",
"connection.password": "xxxxxx"
}
}' http://xxxxx:8083/connectors

我使用默认的confluent dataggenerator,它也为自己创建模式。

curl -i -X PUT http://localhost:8083/connectors/datagen_local_02/config 
-H "Content-Type: application/json" 
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "pageviews2",
"quickstart": "pageviews",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}'

这是错误输出:

"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handlerntat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)ntat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)ntat java.base/java.lang.Thread.run(Thread.java:833)nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pageviews2 to Avro: ntat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)ntat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)ntat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)ntat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)nt... 13 morenCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!ntat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:244)ntat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:334)ntat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:202)ntat io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)ntat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)nt... 17 moren"

我建议你创建一个全新的主题,然后重新开始。你的错误是说至少有一个事件具有非Avro键或值

在部署任何接收器连接器之前,您可以使用kafka-avro-console-consumer来验证您有Avro数据

然后,我总是建议在您的连接器配置中添加键和值转换器,即使您将通过设置忽略键,因为Kafka Connect仍然需要反序列化数据(或不,如果您设置ByteArrayConverter)

相关内容

  • 没有找到相关文章

最新更新