弹性搜索接收器连接器需要读取多个主题近 1000 多个来自 kafka 需要编写的弹性搜索



我是kafka connect的新手。 我有一个要求,它需要动态读取 kafka 中的主题,并且需要写入弹性搜索。 有什么办法可以实现吗? 有没有办法使用类似的主题与模式应用程序*,测试*(实际主题将应用程序日志记录,应用程序位置1,应用程序服务((测试应用程序,测试应用程序2( 我使用的示例配置,它正在写入索引位置.service-2020.06.11,如果我想包含上面提到的带有通配符的其他主题(我不想写每个主题名称(我该如何实现它。

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name" : "kafka-elastic-test-01",
"config" : {
"connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url" : "https://localhost:9200",
"connection.username": "admin",
"connection.password":"******",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"type.name" : "_doc",
"topics.regex": "location.service*",
"key.ignore" : "true",
"schema.ignore" : "true",
"transforms": "TimestampRouter",
"transforms.TimestampRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.TimestampRouter.topic.format": "${topic}-${timestamp}",
"transforms.TimestampRouter.timestamp.format": "yyyy.MM.dd"
}
}'

编辑于6月9 2020

感谢您@Iskuskov亚历山大的回复

我尝试了您的建议,这是输出。欢迎您的任何建议。

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name" : "kafka-elastic-test-03",
"config" : {
"connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url" : "http://localhost:9200",
"connection.username": "admin",
"connection.password":"******",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"type.name" : "_doc",
"topics.regex": "(app|test).*",
"key.ignore" : "true",
"schema.ignore" : "true",
"transforms": "TimestampRouter",
"transforms.TimestampRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.TimestampRouter.topic.format": "${topic}-${timestamp}",
"transforms.TimestampRouter.timestamp.format": "yyyy.MM.dd"
}
}'
curl -w 'n' 'http://localhost:8083/connectors/kafka-elastic-test-03/status' {
"name": "kafka-elastic-test-03",
"connector": {
"state": "RUNNING",
"worker_id":"xxxx:8083"
},
"tasks": [{
"id":0,
"state":"FAILED",
"worker_id":"xxxx:8083",
"trace":
"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handlernt
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)nt
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)nt
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)nt
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)nt
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)nt
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)nt
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)nt
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)nt
at java.base/java.lang.Thread.run(Thread.java:834)n
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: nt
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:355)nt
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)nt
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)nt
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)nt
... 13 moren
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: (byte[])"{"; line: 1, column: 1])n
at [Source: (byte[])"{"; line: 1, column: 3]n
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: (byte[])"{"; line: 1, column: 1])n
at [Source: (byte[])"{"; line: 1, column: 3]nt
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:618)nt
at com.fasterxml.jackson.core.base.ParserBase._handleEOF(ParserBase.java:485)nt
at com.fasterxml.jackson.core.base.ParserBase._eofAsNextChar(ParserBase.java:497)nt
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(UTF8StreamJsonParser.java:2933)nt
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:964)nt
at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:246)nt
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68)nt
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)nt
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4057)nt
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2572)nt
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:58)nt
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:353)nt
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)nt
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)nt
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)nt
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)nt
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)nt
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)nt
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)nt
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)nt
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)nt
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)nt
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)nt
at java.base/java.lang.Thread.run(Thread.java:834)n"
}],
"type": "sink"
}

是的,您可以使用 Kafka Connect 接收器配置topics.regex参数。

在您的情况下,它可能看起来像:"topics.regex": "(app|test).*"

最新更新