Azure Event hub在与kafka connect、debeizum connector和postgrsql集



我正在使用Azure Event Hub(启用kafka surface(实现一个更改数据捕获场景,以从postgresql捕获数据。数据库是Postgresql,使用debizum连接器(https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.2.0.Final/debezium-connector-postgres-1.2.0.Final-plugin.tar.gz)。我还使用了azurepostgres数据库(单服务器逻辑复制(。kafka-connect在本地的docker上运行,可以在azure事件中心创建主题(附加docker compose文件(。此外,一旦我将rest请求发送到kafka连接,它就会显示连接器已启动并正在运行。然而,当我将数据插入postgres表时,它无法在eventhub中创建主题,我也不知道原因是什么?

为eventhub 中的kafka连接创建的主题的图像

连接器状态:

localhost:8083/connectors/postgres-connector/status
{
"name": "postgres-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect:8083"
}
],
"type": "source"
}

在我看来,这也很奇怪,一旦创建源连接器,kafka连接就会显示以下关于连接到eventhub的警告:

docker-file-connect-1|[2022-09-22 08:24:1152]信息[Productor clientId=connector-productor-postgres-connector-0]由于节点-1断开连接,已取消关联id为32338的飞行中API_VERSIONS请求(自创建以来的运行时间:8ms,自发送以来的运行时间:8ms;请求超时:30000ms((org.apache.cafka.clients.NetworkClient(docker-file-connect-1|[2022-09-22 08:24:11152]WARN[Productor clientId=connector-productor-postgres-connector-0]引导代理eventhubstandard.servicebus.windows.net:9093(id:-1 rack:null(已断开连接(org.apache.kafka.clients.NetworkClient(

创建连接器的后期请求:

localhost:8083/connectors
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"database.server.name": "todos-server",
"plugin.name": "wal2json",
"table.whitelist": "public.todos"
}
}

用于connect和postgres 的docker文件

version: '2'
services:
connect:
image: mtpatter/debezium-connect  # built from debezium/connect:0.10 
hostname: connect
image: confluentinc/cp-server-connect-base:latest
ports:
- '8083:8083'
environment:
CONNECT_BOOTSTRAP_SERVERS: 'eventhubstandard.servicebus.windows.net:9093'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_CONFIG_STORAGE_TOPIC: connect_config_topic
CONNECT_OFFSET_STORAGE_TOPIC: connect_offset_topic
CONNECT_STATUS_STORAGE_TOPIC: connect_status_topic
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
# Connect Worker
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_SASL_JAAS_CONFIG: "XXXXX"
# Connect Producer
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "XXXXXX"
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "XXXXXXX"
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
command:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:latest
confluent-hub install --no-prompt debezium/debezium-connector-postgresql:latest
#
echo "launching kafka connector worker"
/etc/confluent/docker/run &
#
sleep infinity
volumes:
- './kafka_connect:/connect'

有点无关,但我建议使用debezium服务器(https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_azure_event_hubs)。通过kafkaconnect连接到活动在未来会让你头疼。

最新更新