Cassandra 接收器连接器:尝试创建/查找主题时出错 '_confluent-command'



无法使用 Ksqldb 创建 Kafka -> Cassandra Sink 连接器:

CREATE SINK CONNECTOR cassandra WITH( "connector.class" = 'io.confluent.connect.cassandra.CassandraSinkConnector', "tasks.max" = '1', "topics" = 'tst', "cassandra.contact.points" = 'cassandra', "cassandra.keyspace" = 'test', ">

cassandra.write.mode" = 'Update', "confluent.topic.bootstrap.servers" = 'kafka:9092' (;

ERROR [CASS|worker] WorkerConnector{id=CASS} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:118)
org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) '_confluent-command'
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:262)
at io.confluent.license.LicenseStore$1.run(LicenseStore.java:161)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:128)
at io.confluent.license.LicenseStore.start(LicenseStore.java:190)
at io.confluent.license.LicenseManager.<init>(LicenseManager.java:155)
at io.confluent.license.LicenseManager.<init>(LicenseManager.java:140)
at io.confluent.connect.utils.licensing.ConnectLicenseManager$Builder.lambda$build$0(ConnectLicenseManager.java:210)
at io.confluent.connect.utils.licensing.ConnectLicenseManager.registerOrValidateLicense(ConnectLicenseManager.java:255)
at io.confluent.connect.cassandra.CassandraSinkConnector.doStart(CassandraSinkConnector.java:50)
at io.confluent.connect.cassandra.CassandraSinkConnector.start(CassandraSinkConnector.java:45)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:110)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:135)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:257)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1190)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:126)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1206)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1202)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:229)
... 21 more
Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

融合卡桑德拉接收器连接器复制因子的默认值 = 3。 修改连接器配置中的默认值解决了问题!

"confluent.topic.replication.factor" = '1',

最新更新