如何为 confluent rabbitmq 源连接器设置 cloudkarafka 代理?



我正在使用 kafka connector docker image confluentinc/cp-kafka-connect:5.4.0-beta1 from confluent.

我想按照本文中描述的方式设置 RabbitMQ 源连接器:

https://rmoff.net/2020/01/08/streaming-messages-from-rabbitmq-into-kafka-with-kafka-connect/,

但我想使用我设置的代理,而不是使用本地 Kafka 代理 https://www.cloudkarafka.com/

在docker-compose.yml中,我设法通过以下设置建立了从连接器容器到cloudkarafka代理的连接:

kafka-connect-01:
image: confluentinc/cp-kafka-connect:5.4.0-beta1
container_name: kafka-connect-01
hostname: kafka-connect-01
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "my-cloudkafka-brokers:9094"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_MECHANISM: "SCRAM-SHA-256"
CONNECT_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="hidden";'
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: username-c-cluster
CONNECT_CONFIG_STORAGE_TOPIC: username-c-configs
CONNECT_OFFSET_STORAGE_TOPIC: username-c-offsets
CONNECT_STATUS_STORAGE_TOPIC: username-c-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
#CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
#CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.storage.StringConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'DEBUG'
#CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=DEBUG,org.reflections=ERROR,org.eclipse.jetty.server=DEBUG'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/data/connect-jars'

连接问题:

[source-rabbitmq-00|task-0] [Producer clientId=connector-producer-source-rabbitmq-00-0] Connection with cloudkafka-broker disconnected (org.apache.kafka.common.network.Selector:607)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:330)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at java.lang.Thread.run(Thread.java:748)

当尝试将消息发布到 CloudKafka 代理时,从 RabbitMQ 源连接器发生(代理的 DNS 故意隐藏(。 当我使用以下 RabbitMQ 连接器配置时,我遇到了同样的问题:

配置 1:

curl -i -X PUT -H  "Content-Type:application/json" 
http://localhost:8083/connectors/source-rabbitmq-01/config 
-d '{
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"kafka.topic" : "username-myrabbitsinktopic",
"rabbitmq.queue" : "test-queue-01",
"rabbitmq.username": "user",
"rabbitmq.password": "pw",
"rabbitmq.host": "rabbitmq",
"rabbitmq.port": "5672",
"rabbitmq.virtual.host": "/",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"cloudkafka-brokers:9094",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="userpassword";",
"confluent.topic.security.protocol":"SASL_SSL",
"confluent.topic.sasl.mechanism":"SCRAM-SHA-256",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"offset.flush.timeout.ms":1200000000
}'

配置 2

curl -i -X PUT -H  "Content-Type:application/json" 
http://localhost:8083/connectors/source-rabbitmq-00/config 
-d '{
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"kafka.topic" : "username-myrabbitmqsourcetopic",
"rabbitmq.queue" : "test-queue-01",
"rabbitmq.username": "user",
"rabbitmq.password": "pw",
"rabbitmq.host": "rabbitmq",
"rabbitmq.port": "5672",
"rabbitmq.virtual.host": "/",
"bootstrap.servers":"cloudkafkabroker:9094",
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";",
"security.protocol":"SASL_SSL",
"sasl.mechanism":"SCRAM-SHA-256",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"offset.flush.timeout.ms":1200000000
}'

请告知如何配置此 rabbitmq 连接器以使用 cloudkafka 提供的代理。 谢谢。

我已经使用以下配置成功地将RabbitMQ连接到CloudKafka:

a( 对于卡夫卡连接器

{
"connector.class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"rabbitmq.port": "5672",
"rabbitmq.password": "passwd",
"rabbitmq.queue": "test-queue-01",
"rabbitmq.host": "rabbitmq",
"name": "source-rabbitmq-01",
"kafka.topic": "c0b9uv0s-mytopic",
"rabbitmq.username": "user",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"rabbitmq.virtual.host": "/"
}       

b( 在码头工人编写中

version: '3'
services:
kafka-connect-01:
image: connect-rabbitmq:latest
container_name: kafka-connect
hostname: kafka-connect
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "cloudkarfkanode:9094"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_MECHANISM: "SCRAM-SHA-256"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password";"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: user-w-cluster
CONNECT_CONFIG_STORAGE_TOPIC: user-w-configs
CONNECT_OFFSET_STORAGE_TOPIC: user-w-offsets
CONNECT_STATUS_STORAGE_TOPIC: user-w-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.storage.StringConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
#CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=DEBUG,org.reflections=ERROR'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=INFO,org.reflections=ERROR,org.eclipse.jetty.server=INFO'      
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/data/connect-jars'
CONNECT_PRODUCER_BOOTSTRAP_SERVERS: "cloudkafkanode:9094"
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SASL_MECHANISM: "SCRAM-SHA-256"
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password";"

command: 
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
- bash 
- -c 
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-rabbitmq:1.1.1
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run & 
#
sleep infinity
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
environment:
RABBITMQ_DEFAULT_USER: user
RABBITMQ_DEFAULT_PASS: password
RABBITMQ_DEFAULT_VHOST: "/"
ports:
- '15672:15672'
- '5672:5672'

修复上述错误的配置是:

CONNECT_PRODUCER_BOOTSTRAP_SERVERS: "cloudkafkanode:9094"
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SASL_MECHANISM: "SCRAM-SHA-256"
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password";"

我希望这将帮助其他人解决同样的问题。

最新更新