我正在尝试使用Mongo-Kafka连接器捕获MongoDb更改数据。当我放置集合名称(即collection=collection1
,pipeline
为[{"$match":{"operationType":{"$in":["insert","update","replace","delete"]}}}]
(时,它可以工作,但当我将集合留空并使用以下pipeline=[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]
时,我无法使它工作
这就是属性文件的样子:
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://mongo1:27017,mongo2:27017,mongo3:27017
database=test
collection=
topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
# pipeline=[{"$match":{"operationType":{"$in":["insert","update","replace","delete"]}}}]
pipeline=[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]
batch.size=0
publish.full.document.only=true
change.stream.full.document=updateLookup
collation=
我从运行bin/connect-standalone.sh
中得到以下消息:
WARN Failed to resume change stream: {aggregate: 1} is not valid for '$changeStream'; a collection is required. 73
我正在使用mongodbv3.6
如果指定了database
参数,连接器希望您也提供collection
参数。
事实上,如果使用Mongo 3.6,我相信您一次只能监听单个数据库/集合组合。这在Mongo 4.0中发生了变化,如下所示:https://docs.mongodb.com/manual/release-notes/4.0/#change-流。