我正在尝试将Kafka Connect与弹性搜索接收器连接起来。我不是在融合模式下使用,而是在独立模式下使用。这是我的弹性搜索连接器配置。
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=mysql-jdbc-mall
key.ignore=true
schema.ignore=true
connection.url=http://172.**.*.**:5601
type.name=kafka-connect
elastic.security.protocol=SSL
key.converter.schemas.enable=false
value.converter.schemas.enable=false
我的连接独立属性是
bootstrap.servers=Ni****ing:9092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
当我运行连接器时,我得到了问题。
[2020-01-21 09:31:03,676] ERROR Failed to start task elasticsearch-sink-0 (org.apache.kafka.connect.runtime.Worker:464)
io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.connect.avro.AvroConverterConfig.<init>(AvroConverterConfig.java:27)
at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:58)
at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:268)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:440)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:311)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:336)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:214)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
[2020-01-21 09:31:03,677] INFO Created connector elasticsearch-sink (org.apache.kafka.connect.cli.ConnectStandalone:112)
更新
因为我没有"/etc/schema-registry"文件。我将连接独立属性更改为
bootstrap.servers=Nifi-Staging:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
但是当我使用 JSONConverter 时,我收到此错误。
[2020-01-21 16:12:04,939] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
java.lang.NullPointerException
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:231)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:142)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:133)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-01-21 16:12:04,946] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-01-21 16:12:04,946] INFO Stopping ElasticsearchSinkTask (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:190)
需要io.confluent.connect.avro.AvroConverter
来定义schema.registry.url
删除这两个schemas.enable
道具,因为它们仅适用于 JSON ;Avro始终具有架构,然后添加 URL
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://...
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://...
可以在etc/schema-registry
文件夹下找到示例 Connect 属性文件
如果您不使用 Avro,请更改转换器以匹配您的数据。键和值也可以是完全不同的类型
此外,elasticsearch url 应该不同;例如在端口 9200 上运行的东西,而不是在 5601 上运行的 kibana
我不是在融合模式下使用,而是在独立模式下使用。
我假设你的意思是confluent
命令?这只是为您运行kafka-connect-distributed
,分布式模式实际上是首选。