Kafka-connect:在连接器接收器Cassandra的分布式配置中获取错误



我获得了连接器接收器Cassandra的分布式配置的任务错误。我正在运行命令:

curl -s localhost:8083/connectors/cassandrasinkconnector2/status |JQ

获得状态

{
  "name": "cassandraSinkConnector2",
  "connector": {
    "state": "RUNNING",
    "worker_id": localhost:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "localhost:8083",
      "trace": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumerntat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)ntat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)ntat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)ntat org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:505)ntat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:441)ntat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)ntat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)ntat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)ntat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)ntat java.util.concurrent.FutureTask.run(FutureTask.java:266)ntat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)ntat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)ntat java.lang.Thread.run(Thread.java:748)nCaused by: org.apache.kafka.common.KafkaException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor ClassNotFoundException exception occurredntat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)ntat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:332)ntat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:319)ntat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)nt... 12 morenCaused by: java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptorntat java.net.URLClassLoader.findClass(URLClassLoader.java:382)ntat java.lang.ClassLoader.loadClass(ClassLoader.java:424)ntat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)ntat java.lang.ClassLoader.loadClass(ClassLoader.java:357)ntat java.lang.Class.forName0(Native Method)ntat java.lang.Class.forName(Class.java:348)ntat org.apache.kafka.common.utils.Utils.loadClass(Utils.java:338)ntat org.apache.kafka.common.utils.Utils.newInstance(Utils.java:327)ntat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)nt... 15 moren"
    }
  ],
  "type": "sink"

堆栈跟踪:

"trace": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)
    at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:505)
    at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:441)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor ClassNotFoundException exception occurred
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:332)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:319)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)
    ... 12 more
Caused by: java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:338)
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:327)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)
    ... 15 more

您可以在连接器的配置下找到。


{
  "name": "cassandraSinkConnector2",
  "config": {
    "connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "appartenance_de",
    "cassandra.contact.points": "localhost",
    "cassandra.kcql": "INSERT INTO app_test SELECT * FROM app_de",
    "cassandra.port": "9042",
    "cassandra.keyspace": "dev_dkks",
    "cassandra.username": "superuser",
    "cassandra.password": "password",
    "cassandra.write.mode": "insert",
    "value.converter.schemas.enable": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "name": "cassandraSinkConnector2"
  },
  "tasks": [
    {
      "connector": "cassandraSinkConnector2",
      "task": 0
    }
  ],
  "type": "sink"
}

新错误:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Record with a null key was encountered.  This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.
    at io.confluent.connect.cassandra.CassandraSinkTask.put(CassandraSinkTask.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    ... 10 more
"

根错误是

java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

监视拦截器是Confluent平台的一部分。您可以禁用它们在Kafka Connect Worker配置中的使用,也可以更好地确保您的Kafka Connect Worker可用/usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar JAR。


您看到的新错误是

org.apache.kafka.connect.errors.DataException: 
Record with a null key was encountered.  This connector requires that records from Kafka contain the keys for the Cassandra table. 
Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.

我建议使用错误中建议的单个消息变换,以正确键入数据。您可以在此处看到一个示例,以及此处转换的文档。

相关内容

最新更新