无法获取producer.override的值.*创建连接器时



为了获得csvsourcececonnector的更高吞吐量,我正在使用producer.override覆盖生产者配置。*创建连接器时。但我无法看到任何差异,也在代码值没有反映,如批处理。大小只显示默认值

Curl命令创建连接器:

curl -i -X PUT -H "Accept:application/json" 
-H  "Content-Type:application/json" http://localhost:8083/connectors/CsvSchemaSpoolDir5.6/config     
-d '{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"name": "CsvSchemaSpoolDir5.6",
"topic": "topicPrefix1.7",
"tasks.max": "1",
"cleanup.policy": "MOVEBYDATE",
"schema.generation.enabled": "true",
"input.path": "/opt/csv-source-connector-poc/testdata/singleTopicMultipleCSVFiles",
"error.path": "/opt/csv-source-connector-poc/error",
"finished.path": "/opt/csv-source-connector-poc/finished",
"input.file.pattern": "^.*csv$",
"error.max.count": "5",
"fetching.schema.registry.data": "true",
"schema.registry.url": "https://psrc-znpo0.ap-southeast-2.aws.confluent.cloud",
"multiple.csv.files.to.single.topic": "true",
"basic.auth.user.info": "sdfsdfs:sdfsdfsdfsdfds",
"file.topic.mapping.path": "/opt/csv-source-connector-poc/fileTopicMapping",
"producer.override.batch.size": "200",
"producer.override.linger.ms": "50",
"producer.override.compression.type": "lz4",
"producer.override.acks": "1"**
}'

代码日志:

SpoolDirCsvSourceTask.process() : this.config.batchSize : 50

我尝试使用更新的producer.override。*价值,但它似乎不起作用。除了Curl命令,还需要更新哪些地方来获取这些值?

确保你运行的Connect版本支持这些选项;根据docs

,它们是在2.3.0版本之后才添加的。https://kafka.apache.org/documentation/连接

connect-distributed.properties中,确保您也有connector.client.config.override.policy=All

相关内容

  • 没有找到相关文章

最新更新