为了获得csvsourcececonnector的更高吞吐量,我正在使用producer.override覆盖生产者配置。*创建连接器时。但我无法看到任何差异,也在代码值没有反映,如批处理。大小只显示默认值
Curl命令创建连接器:
curl -i -X PUT -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/CsvSchemaSpoolDir5.6/config
-d '{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"name": "CsvSchemaSpoolDir5.6",
"topic": "topicPrefix1.7",
"tasks.max": "1",
"cleanup.policy": "MOVEBYDATE",
"schema.generation.enabled": "true",
"input.path": "/opt/csv-source-connector-poc/testdata/singleTopicMultipleCSVFiles",
"error.path": "/opt/csv-source-connector-poc/error",
"finished.path": "/opt/csv-source-connector-poc/finished",
"input.file.pattern": "^.*csv$",
"error.max.count": "5",
"fetching.schema.registry.data": "true",
"schema.registry.url": "https://psrc-znpo0.ap-southeast-2.aws.confluent.cloud",
"multiple.csv.files.to.single.topic": "true",
"basic.auth.user.info": "sdfsdfs:sdfsdfsdfsdfds",
"file.topic.mapping.path": "/opt/csv-source-connector-poc/fileTopicMapping",
"producer.override.batch.size": "200",
"producer.override.linger.ms": "50",
"producer.override.compression.type": "lz4",
"producer.override.acks": "1"**
}'
代码日志:
SpoolDirCsvSourceTask.process() : this.config.batchSize : 50
我尝试使用更新的producer.override。*价值,但它似乎不起作用。除了Curl命令,还需要更新哪些地方来获取这些值?
确保你运行的Connect版本支持这些选项;根据docs
,它们是在2.3.0版本之后才添加的。https://kafka.apache.org/documentation/连接
在connect-distributed.properties
中,确保您也有connector.client.config.override.policy=All
。