我正试图为点火连接器运行一个分布式设置。遗憾的是,它不起作用。我能够通过api获取连接器创建的日志。
API POST payload to/connectors
{
"name": "ignite-connector",
"config": {
"connector.class": "org.apache.ignite.stream.kafka.connect.IgniteSinkConnector",
"tasks.max": "2",
"topics": "someTopic1",
"cacheName": "myCache",
"cacheAllowOverwrite": true,
"igniteCfg":"/opt/ignite/examples/config/example-cache.xml"}
}
}
我将点火连接器设置为插件。我从repo中构建了一个uber-jar,并将其放在一个单独的目录中,并将其作为插件包含在我用来启动connect-distributed.sh
的.properties
文件中。
我用systemd为连接器和kafka设置了作业的类路径:
Environment=CLASSPATH=/opt/kafka/ignite-connector/*
完整错误日志之后:
[2022-11-17 19:49:30,268] INFO [ignite-connector|worker] SinkConnectorConfig values:
config.action.reload = restart
connector.class = org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = ignite-connector
predicates = []
tasks.max = 2
topics = [someTopic1]
topics.regex =
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SinkConnectorConfig:376)
[2022-11-17 19:49:30,272] INFO [ignite-connector|worker] EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = ignite-connector
predicates = []
tasks.max = 2
topics = [someTopic1]
topics.regex =
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:376)
[2022-11-17 19:49:30,276] INFO [ignite-connector|worker] Instantiated connector ignite-connector with version 3.3.1 of type class org.apache.ignite.stream.kafka.connect.IgniteSinkConnector (org.apache.kafka.connect.runtime.Worker:322)
[2022-11-17 19:49:30,276] INFO [ignite-connector|worker] Finished creating connector ignite-connector (org.apache.kafka.connect.runtime.Worker:347)
[2022-11-17 19:49:30,277] ERROR [ignite-connector|worker] WorkerConnector{id=ignite-connector} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:201)
java.lang.NoClassDefFoundError: org/apache/ignite/internal/util/typedef/internal/A
at org.apache.ignite.stream.kafka.connect.IgniteSinkConnector.start(IgniteSinkConnector.java:55)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:193)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:218)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:363)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:346)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:146)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-11-17 19:49:30,277] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1687)
[2022-11-17 19:49:30,280] ERROR [ignite-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'ignite-connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1811)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: ignite-connector
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$35(DistributedHerder.java:1782)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:349)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:146)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to transition connector ignite-connector to state STARTED
... 8 more
Caused by: java.lang.NoClassDefFoundError: org/apache/ignite/internal/util/typedef/internal/A
at org.apache.ignite.stream.kafka.connect.IgniteSinkConnector.start(IgniteSinkConnector.java:55)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:193)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:218)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:363)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:346)
... 7 more
上面提到的类(A
)包含在Plugin目录下的uberJar中绑定的ignite-core- 2.1.1 .jar中。
欢迎指教
似乎对什么是"插件"存在误解。是这样的。它们只是定义为转换器、转换器和连接器的实现的类。
内部的Ignite类都不是这些,所以它们不会被加载到plugin.path
类加载器中。
要解决这个问题,您需要确保在运行Connect进程之前使用export CLASSPATH=/path/to/ignite-files/*.jar
和jar -tf
命令来验证类是否存在于任何特定的JAR中。
这不是一个hack;Java Classloader就是这样工作的