我试图在我的本地环境中设置一个sftpcsvsourcececonnector,我在为连接器设置模式时遇到了一些麻烦。这就是我要做的
curl -i -X PUT -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/nc-csv-02/config
-d '{
"tasks.max" : "1",
"connector.class" : "io.confluent.connect.sftp.SftpCsvSourceConnector",
"kafka.topic": "sftp-csv-00",
"cleanup.policy":"NONE",
"behavior.on.error":"IGNORE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"input.path" : "/",
"csv.separator.char" : 59,
"finished.path" : "/finished",
"error.path" : "/error",
"schema.generation.key.fields" : "msisdn",
"input.file.pattern" : ".*\.dat",
"schema.generation.enabled" : "false",
"csv.first.row.as.header" : "true",
"key.schema":"{"fields":[{"default":null,"name":"msisdn","type":["null","string"]}],"name":"NCKeySchema","type":"record"}",
"value.schema":"{"name":"NCPortabilityMovementEvent","type":"record","fields":[{"default":null,"name":"action","type":["null","string"]},{"default":null,"name":"msisdn","type":["null","string"]},{"default":null,"name":"previousNRN","type":["null","string"]},{"default":null,"name":"newNRN","type":["null","string"]},{"default":null,"name":"effectiveDate","type":["null","string"]},{"default":null,"name":"referenceID","type":["null","string"]}]}",
"sftp.username":"tester",
"sftp.password":"password",
"sftp.host":"192.168.1.2",
"sftp.port":"22"
}'
我在worker任务中看到的异常是
org.apache.kafka.common.config.ConfigException: Invalid value com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "fields" (class com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage), not marked as ignorable (10 known properties: "defaultValue", "valueSchema", "doc", "type", "name", "keySchema", "version", "parameters", "isOptional", "fieldSchemas"])
at [Source: (String)"{"fields":[{"default":null,"name":"msisdn","type":["null","string"]}],"name":"NCKeySchema","type":"record"}"; line: 1, column: 12] (through reference chain: com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage["fields"]) for configuration Could not read schema from 'key.schema'
at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.readSchema(SftpSourceConnectorConfig.java:334)
at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.<init>(SftpSourceConnectorConfig.java:117)
at io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.<init>(SftpCsvSourceConnectorConfig.java:156)
at io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:44)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:185)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:210)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:349)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:332)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:141)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:118)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
键和值的模式是
{
"fields": [
{
"default": null,
"name": "msisdn",
"type": [
"null",
"string"
]
}
],
"name": "NCKeySchema",
"type": "record"
}
和
{
"name" : "NCPortabilityMovementEvent",
"type" : "record",
"fields" : [
{
"default" : null,
"name" : "action",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "msisdn",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "previousNRN",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "newNRN",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "effectiveDate",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "referenceID",
"type" : [
"null",
"string"
]
}
]
}
我哪里做错了?
我用schema.generation.enabled=true
试过了,去掉key.schema
和value.schema
,连接器工作得很好。
您提供的Avro模式是不正确的。您将需要定义Connect模式,使用fieldSchemas
定义它为type=STRUCT。格式本身没有很好的文档,但是这里有一些示例https://docs.confluent.io/kafka-connect-sftp/current/source-connector/csv_source_connector.html#sftp-connector-csv-with-schema-example
你可以在这里找到模式json反序列化器的源代码- https://github.com/jcustenborder/connect-utils/tree/master/connect-utils-jackson/src/main/java/com/github/jcustenborder/kafka/connect/utils/jackson