我正在尝试使用Kafka-connect
将JSON
数据从Kafka
推送到InfluxDB
。Kafka版本-2.2.0,使用Kafka连接的是apachekafka。
工作程序属性:
bootstrap.servers=localhost:9092
rest.port=8083
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets1
offset.storage.replication.factor=1
offset.storage.partitions=50
config.storage.topic=connect-configs1
config.storage.replication.factor=1
status.storage.topic=connect-status1
status.storage.replication.factor=1
status.storage.partitions=10
offset.storage.file.filename=/data/md0/kafka/data/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins
consumer.max.poll.records=10000
在启动RESTcurl命令和数据推送时,日志抛出以下异常:
[2019-11-14 09:41:33,294] ERROR WorkerSinkTask{id=FlinkAggsInfluxDBSink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:558)
java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at io.confluent.influxdb.sink.InfluxDBSinkTask.writeRecordsToDB(InfluxDBSinkTask.java:230)
at io.confluent.influxdb.sink.InfluxDBSinkTask.put(InfluxDBSinkTask.java:112)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-11-14 09:41:33,295] ERROR WorkerSinkTask{id=FlinkAggsInfluxDBSink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at io.confluent.influxdb.sink.InfluxDBSinkTask.writeRecordsToDB(InfluxDBSinkTask.java:230)
at io.confluent.influxdb.sink.InfluxDBSinkTask.put(InfluxDBSinkTask.java:112)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
... 10 more
[2019-11-14 09:41:33,295] ERROR WorkerSinkTask{id=FlinkAggsInfluxDBSink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
错误表明它想要cast to org.apache.kafka.connect.data.Struct
,那么您需要将模式作为数据的一部分,而不是纯JSON。
因此,您可以选择
- 生产者Avro数据到主题,并使用AvroConverter
- 将生产者更新为使用
value.converter.schemas.enable=true
的{"schema": ..., "payload": ... }
格式JSON - 找到另一个支持纯JSON的InfluxDB接收器