我尝试使用这个Kafka Connect连接器:com.github.jcustenorder.Kafka.Connect.spooldir.SpoolDirCsvSourceConnector来公开Kafka集群上的CSV数据。
我在这里没有看到任何关于Acks的参数
但在日志中,当我创建连接器时,我可以看到:
[2022-02-03 16:03:46,551] INFO ProducerConfig values:
acks = -1**
batch.size = 16384
bootstrap.servers = ...
我曾尝试在配置中设置acks=1,但似乎不起作用。
{
"name": "CsvSpoolDirAutoAck",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
**"acks": "1",**
"tasks.max": "1",
"halt.on.error": "true",
"topic": "test-csv-auto2",
"input.path": "/kafkadata/spool/tmp/input",
"finished.path": "/kafkadata/spool/tmp/finished",
"error.path": "/kafkadata/spool/tmp/error",
"input.file.pattern": ".*\.csv",
"schema.generation.enabled":"true",
"csv.first.row.as.header":"true",
"csv.separator.char": 124
}
}
我不想更新一个";全局";这个Kafka Connect的配置只是为我的CSV连接器更新这个属性。
只需为我的CSV连接器更新此属性。
从Kafka Connect 2.3.0开始,您可以添加
producer.override.acks
还可以查看Connect worker 的connector.client.config.override.policy
https://kafka.apache.org/documentation/#connect