自动将 kafka 中创建的主题下沉到 elasticsearch



我在 kafka 中创建主题(test1、test2、test3(,我想在创建时将它们下沉到弹性。我尝试了topics.regex,但它只为已经存在的主题创建索引。动态创建新主题时,如何将新主题下沉到索引中?

这是我用于 kafka-sink 的连接器配置:

{
"name": "elastic-sink-test-regex",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics.regex": "test[0-9]+",
"type.name": "kafka-connect",
"connection.url": "http://192.168.0.188:9200",
"key.ignore": "true",
"schema.ignore": "true",
"schema.enable": "false",
"batch.size": "100",
"flush.timeout.ms": "100000",
"max.buffered.records": "10000",
"max.retries": "10",
"retry.backoff.ms": "1000",
"max.in.flight.requests": "3",
"is.timebased.indexed": "False",
"time.index": "at"
}
}

接收器连接器不会读取新主题,直到重新启动此连接器(或发生计划的重新平衡(。您可以运行 Kafka 流,从新主题读取消息并将其放入类似结果的主题中。接收器连接器从类似结果的主题中读取。

要保存"消息 - 主题"匹配,您可以使用 Kafka 记录标头。

确保它符合您的要求!

最新更新