我正在尝试在 Kafka connect 中使用 Json 转换器,但它抛出以下错误:
{"type":"log", "host":"connecttest6-ckaf-connect-84866788d4-p8lkh", "level":"ERROR", "neid":"kafka-connect-4d9495b82e1e420992ec44c433d733ad", "system":"kafka-connect", "time":"2019-04-08T11:55:14.254Z", "timezone":"UTC", "log":"pool-5-thread-1 - org.apache.kafka.connect.runtime.WorkerTask - WorkerSinkTask{id=hive-sink6-1} Task threw an uncaught and unrecoverable exception"}
java.lang.ClassCastException: org.apache.kafka.connect.json.JsonConverter cannot be cast to io.confluent.connect.hdfs.Format
at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:242)
at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:103)
at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:98)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:302)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:191)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我尝试在源代码中为 Json (key.converter.enable.schema=false 和 value.converter.enable.schema=false) 进行以下配置,并在 HDFSSinkConnector 配置中相同。
连接配置:
ConnectKeyConverter: "org.apache.kafka.connect.json.JsonConverter"
ConnectValueConverter: "org.apache.kafka.connect.json.JsonConverter"
ConnectKeyConverterSchemasEnable: "true"
ConnectValueConverterSchemasEnable: "true"
"http(s)://schemareg_headless_service_name.namespace.svc.cluster.local:port"
ConnectSchemaRegistryUrl: "http://kafka-schema-registry-ckaf-schema-registry-headless.ckaf.svc.cluster.local:8081"
ConnectInternalKeyConverter: "org.apache.kafka.connect.json.JsonConverter"
ConnectInternalValueConverter: "org.apache.kafka.connect.json.JsonConverter"
用于添加接收器的 REST API 命令(接收器配置):
curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector","tasks.max":"2","topics":"topic-test","hdfs.url": "hdfs://localhost/tmp/jsontest4","flush.size": "3","name": "thive-sink6","format.class":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","key.converter.schemas.enable":"false"}' connecttest6-ckaf-connect.ckaf.svc.cluster.local:8083/connectors/hive-sink6/config
将接收器添加到 Kafka Connect 后,我将数据发送到相应的 Kafka 主题。以下是我尝试的数据:
{"name":"test"}
{"schema":{"type":"struct","fields":[{"type":"string","field":"name"}]},"payload":{"name":"value1"}}
数据应写入上述接收器配置中提供的 HDFS 位置。
需要有关上述情况以及如何解决错误的建议。
看起来好像在你的配置(对于 Kubernetes??)中的某个地方,你已经分配了format.class=org.apache.kafka.connect.json.JsonConverter
,这是无效的。
也许你打算使用io.confluent.connect.hdfs.json.JsonFormat