我在运行 kafka-mongodb-source-connect 时出错 我试图使用connect-avro-standalone.properties
和MongoSourceConnector.properties
运行独立连接,以便 Connect 将写入 MongoDB 的数据写入 Kafka 主题。
这就是我想做的
bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties share/confluent-hub-components/mongodb-kafka-connect-mongodb/etc/MongoSourceConnector.properties
connect-avro-standalone.properties
# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets
# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=share/java,/Users/anton/Downloads/confluent-5.3.2/share/confluent-hub-components
MongoSourceConnecor.properties
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=test
collection=test
这是错误:
[2020-01-02 18:55:11,546] ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
com.mongodb.MongoCommandException: Command failed with error 40573 (Location40573): 'The $changeStream stage is only supported on replica sets' on server localhost:27017. The full response is {"ok": 0.0, "errmsg": "The $changeStream stage is only supported on replica sets", "code": 40573, "codeName": "Location40573"}
MongoDB 更改流选项仅在副本集设置中可用。但是,您可以按照以下步骤将独立安装更新为单节点副本集。
- 找到
mongodb.conf
文件并添加副本集详细信息
将以下副本集详细信息添加到mongodb.conf
文件
replication: replSetName: "<replica-set name>"
示例
复制: replSetName: "rs0">
注意:brew中的位置安装了MongoDB/usr/local/etc/mongod.conf
- 使用
rs.initiate()
启动副本集
登录到 MongoDB shell 并运行命令rs.initiate()
这将启动您的副本集。成功启动时的日志如下所示
> rs.initiate()
{
"info2" : "no configuration specified. Using a default configuration for the set",
"me" : "127.0.0.1:27017",
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1577545731, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1577545731, 1)
}
通过这两个简单的步骤,您只需使用一个节点运行MongoDB副本集即可完成所有操作。
参考: https://onecompiler.com/posts/3vchuyxuh/enabling-replica-set-in-mongodb-with-just-one-node
阶段仅在副本集上受支持
你需要让你的Mongo数据库成为一个副本集才能读取oplog。
https://dba.stackexchange.com/questions/243780/converting-mongodb-instance-from-standalone-to-replica-set-and-backing-up
这在我的情况下很有帮助(macOS
env(,查看更多:
I. 安装零配置 MongoDB 运行器。启动一个没有非节点依赖项的副本集,甚至没有 MongoDB。
npm install -g run-rs // OR yarn global add run-rs
II. 使用连接字符串
mongodb://localhost:27017,localhost:27018,localhost:27019/YOUR_DB_NAME?replicaSet=rs&retryWrites=false