Kafka Mongo Conector 未按预期工作



我正在将Kafka connect(source(与mongo一起使用,连接工作线程正在运行,但它没有将数据写入Kafka主题,我正在使用源连接器和以下配置文件作为连接器:

name=mongo-ff
tasks.max=1
connector.class =com.mongodb.kafka.connect.MongoSourceConnector
database=haifa
collection=alerts
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter
topic.prefix=someprefix
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup

但是当我尝试使用主题中的数据时:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic someprefix  --from-beginning

我没有得到任何数据。

好像没有按预期写

。日志堆栈的一部分是:

... 3 more
[2020-01-22 17:19:52,727] INFO Opened connection [connectionId{localValue:3, serverValue:20}] to localhost:27017 (org.mongodb.driver.connection:71)
[2020-01-22 17:19:52,732] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 3]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=2854274} (org.mongodb.driver.cluster:71)
[2020-01-22 17:19:52,732] INFO Discovered cluster type of STANDALONE (org.mongodb.driver.cluster:71)
[2020-01-22 17:19:52,733] ERROR Expecting a single STANDALONE, but found more than one.  Removing localhost:27017 from client view of cluster. (org.mongodb.driver.cluster:101)
[2020-01-22 17:19:52,735] INFO Cluster ID: sZ64WgvDRBmrJnawsRJ_7A (org.apache.kafka.clients.Metadata:379)
[2020-01-22 17:19:52,756] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster:71)
[2020-01-22 17:20:02,647] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2020-01-22 17:20:02,648] INFO WorkerSourceTask{id=mongo-fff-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
[2020-01-22 17:20:12,649] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2020-01-22 17:20:12,649] INFO WorkerSourceTask{id=mongo-fff-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
getting some error:
ERROR WorkerSourceTask{id=mongo-fff-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27018, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connexion refusée (Connection refused)}}, {address=localhost:27019, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connexion refusée (Connection refused)}}]
at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
at com.mongodb.internal.connection.AbstractMultiServerCluster.getDescription(AbstractMultiServerCluster.java:54)
at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:152)
at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:103)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:284)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:188)
at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:203)
at com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:53)
at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:128)
at com.mongodb.client.internal.ChangeStreamIterableImpl$1.iterator(ChangeStreamIterableImpl.java:123)
at com.mongodb.kafka.connect.source.MongoSourceTask.createCursor(MongoSourceTask.java:236)
at com.mongodb.kafka.connect.source.MongoSourceTask.start(MongoSourceTask.java:136)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-01-22 18:29:09,590] ERROR WorkerSourceTask{id=mongo-fff-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2020-01-22 18:29:09,593] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1145)
[2020-01-22 18:29:19,482] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)

我的猜测是你的期望不太正确。

在配置文件中,您已经设置了主题的前缀:

topic.prefix=someprefix

但是您正在尝试从一个名为someprefix的主题中消费:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic someprefix  --from-beginning

请注意,topic.prefix配置参数不是将要创建的主题的名称,而只是主题的前缀:

topic.prefix

前缀在表名前面以生成名称 要将数据发布到的 Kafka 主题,或者在自定义查询的情况下, 要发布到的主题的全名。

类型:string

默认值:“”

重要性:

因此,如果您的数据库haifa有一个名为users的表,那么基于您的配置文件创建的主题将被命名为someprefixusers(我建议您使用像topic.prefix=someprefix-这样的连字符,以便结束主题名称更具可读性(。因此,您必须使用该主题中的记录:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic someprefixusers  --from-beginning

编辑:

您收到一个Connection Refused错误,这意味着您的mongodb未启动并运行,或者您未与数据库正确连接。

首先,确保mongodb已启动并运行mongoclient

mongo mongoHost:mongoPort/dbname

其次,连接器的配置文件中似乎缺少connection.uri

相关内容

  • 没有找到相关文章

最新更新