我有一个有700万条记录(3个分区(的主题,并部署了一个Elasticsearch接收器,其中有一个任务,主要使用默认配置。接收器首先在Elasticsearch中创建索引,然后以每秒10000条消息的速度开始写入。如果我对连接器的任务进行任何更改
- 暂停连接器,重新启动任务,启动连接器
- 保持连接器运行,但重新启动任务
吞吐量降至400消息/秒,并且永远不会恢复到最初的10000/秒。
如果我停止连接器,从Elasticsearch中删除索引,然后恢复连接器,它会返回到每秒发送10k条消息。
我尝试过将连接器配置从默认值更改为其他配置,但没有结果。
connection.timeout.ms=1000
batch.size=2000
max.retries=5
max.in.flight.requests=5
retry.backoff.ms=100
max.buffered.records=20000
flush.timeout.ms=10000
read.timeout.ms=3000
我的连接器配置
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
type.name=logdata
errors.log.include.messages=true
tasks.max=1
topics=d8.qa.id.log.sso.transformed.0
key.ignore=true
schema.ignore=true
value.converter.schemas.enable=false
elastic.security.protocol=PLAINTEXT
name=elasticsearch-sink-d8.qa.id.log.transformed
connection.url=http://172.30.2.23:9200,http://172.30.0.158:9200,http://172.30.1.63:9200
client.id=elasticsearch-sink-d8.qa.id.log.transformed
环境详细信息
Elasticsearch 6.8 (10 data nodes, 3 master)
Elasticsearch connector (version 2.2.1)
Kafka Connect (2 workers with 16GB memory, version 2.2.1)
Kafka Broker (3 brokers with 32GB memory, version 2.2.1)
注意:
- 与ES 7.2和Elasticsearch连接器版本2.3.1的行为相同
- 这是上部署到连接群集的唯一连接器
这是Confluent Platform 5.3.x及以下版本的已知问题,因为如果索引不是由JestElasticsearchClient
创建的,则不会缓存索引。修复程序PR-340和PR-309已合并,并将与Confluent Platform 5.4一起部署。