假设,我将获得多个MySQL实例,并且必须流式传输实时数据。因此,为此,我使用以下列出的命令在 Docker 容器上启动了一些服务,该命令将我连接到示例库存数据库,并且当我在 MySQL 命令行客户端中执行任何 DML 命令时,能够监控这些数据更改。
但是需要什么更改或等效命令才能连接到实际的MySQL数据库并在观察者窗口中流式传输相同的内容。
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.1
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.1
docker run -it --rm --name mysql -p 3308:3308 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.1
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c "exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"debezium""
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.1
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d "{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }"
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.1 watch-topic -a -k dbserver1.inventory.customers
让我们假设MySQL连接详细信息如下:
Host address of MySQL : XXX.XXX.X.XXX
Databse/Schema : YYYYY
Username : ZZZZZ
Password : PPPPP
port : 3306
任何线索/解释将不胜感激!
当您使用此配置连接器时
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d "{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }"
您修改它以包含实际源数据库的详细信息,例如
curl -i -X PUT -H "Content-Type:application/json"
http://localhost:8083/connectors/source-mysql-01/config
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "XXX.XXX.X.XXX",
"database.port": "3306",
"database.user": "ZZZZZ",
"database.password": "PPPPP",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "YYYYY",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.YYYYY"
}'
请注意,我还更改为PUT
而不是POST
,因为这意味着您可以重新运行该命令来修改同名的现有连接器(如果不存在,则创建它(。